EventBus从使用到源码分析
EventBus是greenrobot在Android平台发布的以订阅-发布模式为核心的开源库。EventBus的库主要用来进行各个组件,各个线程之间的通信。在Android中五大组件之间的通信可以采用Intent,Handler,广播等。但是这样会使得代码复杂,使用EventBus可以极大简化组件之间的通信,而且效率高。
EventBus不是单例模式,我们一般使用它的单例模式是因为我们的订阅者和事件可以在一条总线上收发,但是如果使用多条总线,不同总线上发出的event其他总线是接受不到的。如果系统很复杂我们可以自己new一个bus出来
本文主要介绍了EventBus的使用和源码的分析
github地址:
https://github.com/greenrobot/EventBus
参考博客
http://www.jianshu.com/p/acfe78296bb5
参考博客的文章写的很好了,自己再写是对EventBus源码的学习总结
1.EventBus的使用
1.1 github的教程
compile 'org.greenrobot:eventbus:3.1.1'
1.2 一个简单的demo
1.定义一个事件类型
public class LeoEvent {
private String msg;
public LeoEvent(String msg) {
this.msg = msg;
}
public String getMsg() {
return msg;
}
public void setMsg(String msg) {
this.msg = msg;
}
}
2.注册观察者(Activity类)
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_leo_main);
EventBus.getDefault().register(this);
btn = findViewById(R.id.btn_leo_start);
tv = findViewById(R.id.tv_leo_content);
btn.setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View v) {
Intent intent = new Intent(LeoMainActivity.this , LeoSecondActivity.class);
startActivity(intent);
}
});
}
@Override
protected void onDestroy() {
super.onDestroy();
EventBus.getDefault().unregister(this);
}
@Subscribe(threadMode = ThreadMode.MAIN)
public void onResponse(LeoEvent msg){
tv.setText(msg.getMsg());
Toast.makeText(this, "receiver ok ... \n" + msg.getMsg(), Toast.LENGTH_SHORT).show();
}
3.发布事件
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_leo_second);
findViewById(R.id.btn_leo_post).setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View v) {
EventBus.getDefault().post(new LeoEvent("hello: "+count));
count++;
}
});
}
1.3订阅方法的注解@Subscribe
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD})
public @interface Subscribe {
ThreadMode threadMode() default ThreadMode.POSTING;
/**
* If true, delivers the most recent sticky event (posted with
* {@link EventBus#postSticky(Object)}) to this subscriber (if event available).
*/
boolean sticky() default false;
/** Subscriber priority to influence the order of event delivery.
* Within the same delivery thread ({@link ThreadMode}), higher priority subscribers will receive events before
* others with a lower priority. The default priority is 0. Note: the priority does *NOT* affect the order of
* delivery among subscribers with different {@link ThreadMode}s! */
int priority() default 0;
}
threadMode代表了订阅方法运行的线程
sticky代表是否是粘性事件,和粘性广播一样只会接收到最近发送的一次粘性事件,之前的会接受不到
priority代表优先级
public enum ThreadMode {
POSTING, //表示订阅方法运行在发送事件的线程,默认值
MAIN, //表示订阅方法运行在UI线程,由于UI线程不能阻塞,因此当使用MAIN的时候,订阅方法不应该耗时过长。
BACKGROUND, //表示订阅方法运行在后台线程
ASYNC //订阅方法与发送事件始终不在同一个线程,即订阅方法始终会使用新的线程来运行。
}
2.EventBus源码
EventBus的源码比较复杂,我们仅仅通过EventBus调用的几个方法为线索来进行主线路的源码阅读。
2.1 EventBus类,通过EventBus.getDefault()
/**
* by leo
* 单例模式
* */
public static EventBus getDefault() {
EventBus instance = defaultInstance;
if (instance == null) {
synchronized (EventBus.class) {
instance = EventBus.defaultInstance;
if (instance == null) {
instance = EventBus.defaultInstance = new EventBus();
}
}
}
return instance;
}
通过单例模式来确保我们默认使用的Bus是唯一的,其实EventBus类可以new出来。
下面来看EventBus的构造函数
public EventBus() {
this(DEFAULT_BUILDER);
}
//使用了建造者模式,一个复杂对象的构造与它的表示分离,使得通过不同的builder可以构造出不同的对象
EventBus(EventBusBuilder builder) {
...
//以event(即事件类)为key,以订阅列表(Subscription)为value
subscriptionsByEventType = new HashMap<>();
//以订阅者类为key,以event事件类为value
typesBySubscriber = new HashMap<>();
//保存的是粘性事件
stickyEvents = new ConcurrentHashMap<>();
...//一系列的builder方法赋值
}
subscriptionsByEventType变量,以定义的事件类为key,订阅方法的一个封装类的集合为value
typesBySubscriber变量,以订阅者的类型为key,该订阅者订阅方法中的所有事件的集合为value
上面的感念结合上面的demo做一个具体化,事件类就是LeoEvent类,订阅者就是上面的Activity类,订阅方法就是订阅者中的public void onResponse(LeoEvent msg){}方法
2.2 register(Object subscriber)
public void register(Object subscriber) {
//订阅者的类型,demo就是Activity.class
Class<?> subscriberClass = subscriber.getClass();
//得到订阅者的所有订阅方法
List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
synchronized (this) {
for (SubscriberMethod subscriberMethod : subscriberMethods) {
//注册观察者模式
subscribe(subscriber, subscriberMethod);
}
}
}
这里主要两个重要的方法,subscriberMethodFinder.findSubscriberMethods(subscriberClass)完成了获取订阅方法;subscribe(subscriber, subscriberMethod)注册所有的订阅方法和对应的订阅者
List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
//从缓存中取出subscriberMethodss,如果有则直接返回该已取得的方法
List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
if (subscriberMethods != null) {
return subscriberMethods;
}
//默认设为false
if (ignoreGeneratedIndex) {
subscriberMethods = findUsingReflection(subscriberClass);
} else {
//执行这里
subscriberMethods = findUsingInfo(subscriberClass);
}
if (subscriberMethods.isEmpty()) {
throw new EventBusException("Subscriber " + subscriberClass
+ " and its super classes have no public methods with the @Subscribe annotation");
} else {
//把得到的结果使用缓存存储起来,下次再来注册就可以直接取的
METHOD_CACHE.put(subscriberClass, subscriberMethods);
return subscriberMethods;
}
}
上面使用了缓存,每次进入该方法先从缓冲中获取数据如果没有再调用findUsingInfo(subscriberClass)获取,得到后加入缓存便于下次获取
/**
* by leo
* 在该类和父类中找到订阅方法找出来
* */
private List<SubscriberMethod> findUsingInfo(Class<?> subscriberClass) {
//准备一个FindState,该FindState保存了订阅者类的信息,findState的获取也是个技巧,下面会分析
FindState findState = prepareFindState();
//对FindState初始化
findState.initForSubscriber(subscriberClass);
//这个while从传入的类一直往上遍历父类直到父类是系统类,以java,javax,android开始的包
while (findState.clazz != null) {
//获得订阅者的信息,一开始会返回null
findState.subscriberInfo = getSubscriberInfo(findState);
if (findState.subscriberInfo != null) {
SubscriberMethod[] array = findState.subscriberInfo.getSubscriberMethods();
for (SubscriberMethod subscriberMethod : array) {
if (findState.checkAdd(subscriberMethod.method, subscriberMethod.eventType)) {
findState.subscriberMethods.add(subscriberMethod);
}
}
} else {
// do this
findUsingReflectionInSingleClass(findState);
}
//调到父类,如果父类不是系统类,就将clazz设为父类的所以继续循环遍历父类中的方法
findState.moveToSuperclass();
}
return getMethodsAndRelease(findState);
}
首先初始化了一个FindState类,该类保存了一些订阅者的信息,这里用到的技巧最后再说,接下来是个while循环,从该类一直往上遍历父类直到父类是系统类为止,对每个类调用findUsingReflectionInSingleClass(findState)
/**
* by leo
* 订阅方法的三个特点 : public修饰 一个参数 有@subscribe注解
* */
private void findUsingReflectionInSingleClass(FindState findState) {
Method[] methods;
try {
// This is faster than getMethods, especially when subscribers are fat classes like Activities
//很快 比 getMethods()方法
methods = findState.clazz.getDeclaredMethods();
} catch (Throwable th) {
// Workaround for java.lang.NoClassDefFoundError, see https://github.com/greenrobot/EventBus/issues/149
methods = findState.clazz.getMethods();
findState.skipSuperClasses = true;
}
//该类的所有方法中找到符合要求的注册方法
for (Method method : methods) {
//获取方法的修饰符,要求是Public的
int modifiers = method.getModifiers();
if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
//获取方法的参数类型,参数类型为1个的
Class<?>[] parameterTypes = method.getParameterTypes();
//如果参数个数为一个
if (parameterTypes.length == 1) {
//获取该方法的@Subscribe注解,如果有注解就是我们找到订阅方法
Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
if (subscribeAnnotation != null) {
//参数类型 即为 事件类型
Class<?> eventType = parameterTypes[0];
if (findState.checkAdd(method, eventType)) {
//从注解中提取threadMode
ThreadMode threadMode = subscribeAnnotation.threadMode();
//新建一个SubscriberMethod对象,并添加到findState的subscriberMethods这个集合内
findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,
subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
}
}
} else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
String methodName = method.getDeclaringClass().getName() + "." + method.getName();
throw new EventBusException("@Subscribe method " + methodName +
"must have exactly 1 parameter but has " + parameterTypes.length);
}
} else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
String methodName = method.getDeclaringClass().getName() + "." + method.getName();
throw new EventBusException(methodName +
" is a illegal @Subscribe method: must be public, non-static, and non-abstract");
}
}
}
首先得到类的所有方法,然后根据判断准则把订阅的方法加入集合中,订阅的方法有三个要求:public,一个参数,用@subscribe修饰
然后回到findUsingInfo()方法看最后的函数getMethodsAndRelease()
private List<SubscriberMethod> getMethodsAndRelease(FindState findState) {
List<SubscriberMethod> subscriberMethods = new ArrayList<>(findState.subscriberMethods);
//findState全部还原,接下来的循环中找到null的赋给他
findState.recycle();
...
return subscriberMethods; //订阅方法的集合
}
返回了订阅方法的集合,下面回到register()方法的subscribe()方法,对上面返回的每个方法都调用了一次
private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
//事件类型
Class<?> eventType = subscriberMethod.eventType;
//把subscriber和subscriberMethod的方法封装成对象
Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
// 如果该方法的事件类型前面存在过,subscriptionsByEventType会存在他的引用,负责闯进新的subscriptions并存到subscriptionsByEventType会存在他的引用
CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
if (subscriptions == null) {
subscriptions = new CopyOnWriteArrayList<>();
subscriptionsByEventType.put(eventType, subscriptions);
} else {
if (subscriptions.contains(newSubscription)) {
throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
+ eventType);
}
}
//把对象放到集合中
int size = subscriptions.size();
for (int i = 0; i <= size; i++) {
if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
subscriptions.add(i, newSubscription);
break;
}
}
根据subscriber(订阅者)来获取它的所有订阅事件
List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
if (subscribedEvents == null) {
subscribedEvents = new ArrayList<>();
typesBySubscriber.put(subscriber, subscribedEvents);
}
subscribedEvents.add(eventType);
//下面是对粘性事件的处理
...
}
对EventBus构造方法中的几个map进行操作,把方法放到相应的集合中
2.3 unRegister()
public synchronized void unregister(Object subscriber) {
//根据当前的订阅者来获取它所订阅的所有事件
List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber);
if (subscribedTypes != null) {
//根据所有的事件类型,都从两个重要的map中取消
for (Class<?> eventType : subscribedTypes) {
unsubscribeByEventType(subscriber, eventType);
}
//取消这个订阅类
typesBySubscriber.remove(subscriber);
} else {
logger.log(Level.WARNING, "Subscriber to unregister was not registered before: " + subscriber.getClass());
}
}
2.4 post()
public void post(Object event) {
//1、 获取一个postingState,里面有事件的集合,是否在主线程,订阅的方法描述等
PostingThreadState postingState = currentPostingThreadState.get();
List<Object> eventQueue = postingState.eventQueue;
//将事件加入队列中
eventQueue.add(event);
//第一次肯定是false
if (!postingState.isPosting) {
//判断当前线程是否是主线程
postingState.isMainThread = isMainThread();
postingState.isPosting = true;
if (postingState.canceled) {
throw new EventBusException("Internal error. Abort state was not reset");
}
try {
//只要队列不为空,就不断从队列中获取事件进行分发
while (!eventQueue.isEmpty()) {
postSingleEvent(eventQueue.remove(0), postingState);
}
} finally {
//把队列分发完成后,改postingState recycle
postingState.isPosting = false;
postingState.isMainThread = false;
}
}
}
private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
//获取eventType
Class<?> eventClass = event.getClass();
boolean subscriptionFound = false;
//该eventInheritance上面有提到,默认为true,即EventBus会考虑事件的继承树
//如果事件继承自父类,那么父类也会作为事件被发送
if (eventInheritance) {
//查找该事件的所有父类
List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
int countTypes = eventTypes.size();
//遍历所有事件
for (int h = 0; h < countTypes; h++) {
Class<?> clazz = eventTypes.get(h);
subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
}
} else {
subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
}
if (!subscriptionFound) {
if (logNoSubscriberMessages) {
logger.log(Level.FINE, "No subscribers registered for event " + eventClass);
}
if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
eventClass != SubscriberExceptionEvent.class) {
post(new NoSubscriberEvent(this, event));
}
}
}
private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
CopyOnWriteArrayList<Subscription> subscriptions;
synchronized (this) {
//通过事件类型得到订阅方法集合
subscriptions = subscriptionsByEventType.get(eventClass);
}
//针对该事件类型有订阅的方法
if (subscriptions != null && !subscriptions.isEmpty()) {
//响应所有的订阅方法
for (Subscription subscription : subscriptions) {
postingState.event = event;
postingState.subscription = subscription;
boolean aborted = false;
try {
//发送事件
postToSubscription(subscription, event, postingState.isMainThread);
aborted = postingState.canceled;
} finally {
postingState.event = null;
postingState.subscription = null;
postingState.canceled = false;
}
if (aborted) {
break;
}
}
return true;
}
return false;
}
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
//根据订阅方法的@subscribe中的threadMode值来确定不同的方法调用
switch (subscription.subscriberMethod.threadMode) {
//注册的订阅方法和POST在一个线程
case POSTING:
invokeSubscriber(subscription, event);
break;
//注册的订阅方法在MAIN线程
case MAIN:
if (isMainThread) {
invokeSubscriber(subscription, event);
} else {
mainThreadPoster.enqueue(subscription, event);
}
break;
case MAIN_ORDERED:
if (mainThreadPoster != null) {
mainThreadPoster.enqueue(subscription, event);
} else {
// temporary: technically not correct as poster not decoupled from subscriber
invokeSubscriber(subscription, event);
}
break;
case BACKGROUND:
if (isMainThread) {
backgroundPoster.enqueue(subscription, event);
} else {
invokeSubscriber(subscription, event);
}
break;
//注册的订阅方法与POST时钟不在一个线程
case ASYNC:
asyncPoster.enqueue(subscription, event);
break;
default:
throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
}
}
终于到了方法的的真正执行地方,我们这里看POSTING和MAIN这种情况,下面逐个分析invokeSubscriber(subscription, event)和mainThreadPoster.enqueue(subscription, event)
void invokeSubscriber(Subscription subscription, Object event) {
try {
//利用了反射
subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
} catch (InvocationTargetException e) {
handleSubscriberException(subscription, event, e.getCause());
} catch (IllegalAccessException e) {
throw new IllegalStateException("Unexpected exception", e);
}
}
利用了反射来进行方法调用,如果使用了MAIN模式,下面进行分析
mainThreadPoster.enqueue(subscription, event);
mainThreadPoster经过分析是new HandlerPoster(eventBus, looper, 10)
public void enqueue(Subscription subscription, Object event) {
//封装了subscription和event的一个对象
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this) {
//这个queue是PendingPostQueue,用来存储PendingPost的一个队列,把上面封装好的pendingPost加入队列
queue.enqueue(pendingPost);
if (!handlerActive) {
handlerActive = true;
//使用handle发送message,不用带任何参数只是一个信号,会到下面的handleMessage中
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
}
}
}
上面使用了handler,每次执行一个post都会封装称为pendingPost(延时Post,对应PendingIntent),这个时候加入一个队列,如何此时没有进行事件分发就使用Handler进行sendMessage,否则下次一块发送,这个时候由于Handler是MainHandLer,下面的handlerMessage就到了UI线程
public void handleMessage(Message msg) {
boolean rescheduled = false;
try {
long started = SystemClock.uptimeMillis();
//因为可能存在多个pendingPost
while (true) {
//从队列中获取
PendingPost pendingPost = queue.poll();
//判断是否为null,为null就再次获取
if (pendingPost == null) {
synchronized (this) {
// Check again, this time in synchronized
pendingPost = queue.poll();
if (pendingPost == null) {
handlerActive = false;
return;
}
}
}
//具体的操作
eventBus.invokeSubscriber(pendingPost);
long timeInMethod = SystemClock.uptimeMillis() - started;
//耗时判断
if (timeInMethod >= maxMillisInsideHandleMessage) {
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
rescheduled = true;
return;
}
}
} finally {
handlerActive = rescheduled;
}
}
void invokeSubscriber(PendingPost pendingPost) {
//从PendingPost中获取事件类型和订阅方法的封装
Object event = pendingPost.event;
Subscription subscription = pendingPost.subscription;
//把该PendingPost回收便于下一次使用,执行了recycler(内部变量设为null),下次不用在new PendingPost
PendingPost.releasePendingPost(pendingPost);
if (subscription.active) {
//调用同一个线程的方法,上面分析过了,就使用了反射
invokeSubscriber(subscription, event);
}
}
3.源码总结
EventBus中两个重要的Map型EventBus变量
private final Map<Class<?>, CopyOnWriteArrayList<Subscription>>subscriptionsByEventType;
subscriptionsByEventType的key就是事件类型,value是Subscription集合,那Subscription又是什么?Subscription对象封装了订阅类和订阅方法,所以这一个对象就可以执行订阅方法
private final Map<Object, List<Class<?>>> typesBySubscriber;
typesBySubscriber的key是订阅类,value是订阅事件的集合
3.1 EventBus.getDefault().register(this)
首先获取该订阅类的所有订阅方法(public , 一个参数 , @subscribe注解就说了该方法是订阅方法);
然后把所有的注册方法遍历和订阅类执行注册方法,就是把得到的订阅方法和订阅类填充到上面的两个map型变量中取
3.2 EventBus.getDefault().unregister(this)
从上面的map中去掉对应的订阅类和订阅方法
3.3 EventBus.getDefault().post(object)
先把event加到到一个队列中去,然后把队列中的event逐个处理,处理的方法就是根据上面的map,event作为key获取所有该event的订阅方法,然后获取到所有的该event订阅方法后就简单了,如果该订阅方法要求是POSTING(同一个线程),就直接使用反射执行该方法就好了。如果订阅方法要求MAIN,可以使用UIHandler把要执行的信号发送,接着handlerMessage就在主线程执行了。
4.阅读代码后的技巧
4.1使用map来充当一个缓存
private static final Map<Class<?>, List<SubscriberMethod>> METHOD_CACHE = new ConcurrentHashMap<>();
List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
//从缓存中取出subscriberMethodss,如果有则直接返回该已取得的方法
List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
//有缓冲就直接返回
if (subscriberMethods != null) {
return subscriberMethods;
}
...
subscriberMethods = findUsingInfo(subscriberClass);
...
if (subscriberMethods.isEmpty()) {
throw new EventBusException("Subscriber " + subscriberClass
+ " and its super classes have no public methods with the @Subscribe annotation");
} else {
//把得到的结果使用缓存存储起来,下次再来注册就可以直接取的
METHOD_CACHE.put(subscriberClass, subscriberMethods);
return subscriberMethods;
}
}
通过Map类,当前如果存在key的值直接返回,如果没有去执行操作得到结果后以一定key加入map,我们在进行数据库操作短时间为了减少查询的时间消耗可以使用这种方式
4.2 重复使用对象,避免new
public class LeoOptimizeDemo {
/**存储用过的LeoOptimizeDemo,下次需要时如果pool大小大于0,直接从这里给出不用new*/
private final static List<LeoOptimizeDemo> pool = new ArrayList<>();
/**模拟数据*/
private String msg;
public LeoOptimizeDemo(String msg) {
this.msg = msg;
}
/**每次从这里获取该类*/
static LeoOptimizeDemo obtainLeoOptimizeDemo(String msg){
synchronized (pool){
int size = pool.size();
//如果缓冲池有
if(size > 0){
LeoOptimizeDemo leoOptimizeDemo = pool.remove(size-1);
leoOptimizeDemo.msg = msg;
return leoOptimizeDemo;
}
}
//缓冲池没有直接new
return new LeoOptimizeDemo(msg);
}
/**每次使用完该类直接调用该方法清空数据后,加入到pool*/
static void releaseLeoOptimizeDemo(LeoOptimizeDemo leoOptimizeDemo){
leoOptimizeDemo.msg = null;
synchronized (pool){
if(pool.size() < 100 ){
pool.add(leoOptimizeDemo);
}
}
}
}