本篇主要介绍eventbus的使用以及源码分析
####Eventbus是什么
专门为Android设计的用于订阅,发布总线的库
andorid组件之间通信
。。。
现有的通信方式 同一进程线程间:
handler
共享静态变量
eventbus
进程间:
aidl
socket
广播
contentProvider
Messager
以上aidl跟广播也可以在同一个进程中使用,但一般同一个进程通信的话不太会用到
####为什么使用eventbus
相比于handler,handler通信是单向的,没有handler的线程可以向有handler的一方发消息 。常见的是子线程d可以给主线程发消息(子线程做完耗时操作之后通知主线程修改ui)。如果主线程要给子线程发消息的话,需要子线程new looper,实现handler,并让主线程持有该handler,推荐直接使用HandlerThread(包含looper的thread) 。相对来说代码量不小。
共享静态变量这种方法耗内存不推荐。
eventBus中子线程传递消息给主线程 实质使用的还是handler,但是它比使用handler的方式简单很多。下面会给出详细分析。
使用方法
定义一个pojo:message的实体类
注册接受消息的类并在在该类中定义接收消息的方法
发送消息
合适的地方取消注册
示例:
实体类:
1 2 3 4 5 6 7 8 9 package com.example.kj_eventbus;public class EventMessage { public String name; public EventMessage (String name) { this .name = name; } }
接收消息的类:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public class MainActivity extends AppCompatActivity { ... @Subscribe (threadMode = ThreadMode.MAIN) public void eventBus (EventMessage msg) { Toast.makeText(this , msg.name, Toast.LENGTH_SHORT).show(); } public void jump (View view) { EventBus.getDefault().register(this ); startActivity(new Intent(this , SendMsgAct.class)); } @Override protected void onDestroy () { super .onDestroy(); EventBus.getDefault().unregister(this ); } }
发送消息:
1 2 3 4 5 6 7 8 9 10 11 12 public class SendMsgAct extends AppCompatActivity { ... public void send (View view) { new Thread(new Runnable() { @Override public void run () { EventBus.getDefault().post(new EventMessage("你好朋羽毛!!" )); } }).start(); } }
源码解析 注解的使用 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 @Documented @Retention (RetentionPolicy.RUNTIME)@Target ({ElementType.METHOD})public @interface Subscribe { ThreadMode threadMode () default ThreadMode.POSTING ; boolean sticky () default false ; int priority () default 0 ; } public enum ThreadMode { POSTING, MAIN, MAIN_ORDERED, BACKGROUND, ASYNC }
该注解运行时生效,用于注解方法,标示接收消息的线程,消息的优先级,以及是否是粘性的消息
利用该注解可以在运行时通过反射获取到(类—方法—MessageType)对应的关系;
具体代码后面会给出分析
初始化 这里使用了单例 跟建造者 模式构建evenbus实例。但是为什么构造函数是public的呢?有知道的同学可以告我一哈~
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 static volatile EventBus defaultInstance;public static EventBus getDefault () { if (defaultInstance == null ) { synchronized (EventBus.class) { if (defaultInstance == null ) { defaultInstance = new EventBus(); } } } return defaultInstance; } public EventBus () { this (DEFAULT_BUILDER); } EventBus(EventBusBuilder builder) { ... sendSubscriberExceptionEvent = builder.sendSubscriberExceptionEvent; sendNoSubscriberEvent = builder.sendNoSubscriberEvent; throwSubscriberException = builder.throwSubscriberException; eventInheritance = builder.eventInheritance; executorService = builder.executorService; } public static EventBusBuilder builder () { return new EventBusBuilder(); }
注册类 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public void register (Object subscriber) { Class<?> subscriberClass = subscriber.getClass(); List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass); synchronized (this ) { for (SubscriberMethod subscriberMethod : subscriberMethods) { subscribe(subscriber, subscriberMethod); } } } public class SubscriberMethod { final Method method; final ThreadMode threadMode; final Class<?> eventType; final int priority; final boolean sticky; String methodString; } class Subscription { final Object subscriber; final SubscriberMethod subscriberMethod; }
该方法是注册入口,主要两步:
首先通过findSubscriberMethods 找到该类的SubscirberMethod (接收消息的方法的封装)集合;
之后调用subscribe 初始化subscriptionsByEventType (通过messageType找到subsciption)typesBySubscriber (通过注册类找到MessageType)
下面我们先跟踪下findSubscriberMethods:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 List<SubscriberMethod> findSubscriberMethods (Class<?> subscriberClass) { List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass); if (subscriberMethods != null ) { return subscriberMethods; } if (ignoreGeneratedIndex) { subscriberMethods = findUsingReflection(subscriberClass); } else { subscriberMethods = findUsingInfo(subscriberClass); } if (subscriberMethods.isEmpty()) { ... } else { METHOD_CACHE.put(subscriberClass, subscriberMethods); return subscriberMethods; } }
该方法先从cache中获取,之后通过apt或者反射获取。我们先分析反射(默认),也就是findUsingInfo()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 private List<SubscriberMethod> findUsingInfo (Class<?> subscriberClass) { FindState findState = prepareFindState(); findState.initForSubscriber(subscriberClass); while (findState.clazz != null ) { findState.subscriberInfo = getSubscriberInfo(findState); if (findState.subscriberInfo != null ) { SubscriberMethod[] array = findState.subscriberInfo.getSubscriberMethods(); for (SubscriberMethod subscriberMethod : array) { if (findState.checkAdd(subscriberMethod.method, subscriberMethod.eventType)) { findState.subscriberMethods.add(subscriberMethod); } } } else { findUsingReflectionInSingleClass(findState); } findState.moveToSuperclass(); } return getMethodsAndRelease(findState); }
之后进入findUsingReflectionInSingleClass
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 private void findUsingReflectionInSingleClass (FindState findState) { Method[] methods; ... methods = findState.clazz.getDeclaredMethods(); ... for (Method method : methods) { int modifiers = method.getModifiers(); if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0 ) { Class<?>[] parameterTypes = method.getParameterTypes(); if (parameterTypes.length == 1 ) { Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class); if (subscribeAnnotation != null ) { Class<?> eventType = parameterTypes[0 ]; if (findState.checkAdd(method, eventType)) { ThreadMode threadMode = subscribeAnnotation.threadMode(); findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,subscribeAnnotation.priority(), subscribeAnnotation.sticky())); } } } ... } } ... } }
之后会调用getMethodsAndRelease ;
1 2 3 4 5 6 7 8 9 10 11 12 13 private List<SubscriberMethod> getMethodsAndRelease (FindState findState) { List<SubscriberMethod> subscriberMethods = new ArrayList<>(findState.subscriberMethods); findState.recycle(); synchronized (FIND_STATE_POOL) { for (int i = 0 ; i < POOL_SIZE; i++) { if (FIND_STATE_POOL[i] == null ) { FIND_STATE_POOL[i] = findState; break ; } } } return subscriberMethods; }
最终返回的是该注册类中接收消息的方法集合。
接着我们进入第二步,subscribe:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 private void subscribe (Object subscriber, SubscriberMethod subscriberMethod) { Class<?> eventType = subscriberMethod.eventType; Subscription newSubscription = new Subscription(subscriber, subscriberMethod); CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType); if (subscriptions == null ) { subscriptions = new CopyOnWriteArrayList<>(); subscriptionsByEventType.put(eventType, subscriptions); } else { ... } int size = subscriptions.size(); for (int i = 0 ; i <= size; i++) { if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) { subscriptions.add(i, newSubscription); break ; } } List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber); if (subscribedEvents == null ) { subscribedEvents = new ArrayList<>(); typesBySubscriber.put(subscriber, subscribedEvents); } subscribedEvents.add(eventType); if (subscriberMethod.sticky) { ... } }
以上,regist分析完毕。
发送消息 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 public void post (Object event) { PostingThreadState postingState = currentPostingThreadState.get(); List<Object> eventQueue = postingState.eventQueue; eventQueue.add(event); if (!postingState.isPosting) { postingState.isMainThread = isMainThread(); postingState.isPosting = true ; ... try { while (!eventQueue.isEmpty()) { postSingleEvent(eventQueue.remove(0 ), postingState); } } finally { postingState.isPosting = false ; postingState.isMainThread = false ; } } } private final ThreadLocal<PostingThreadState> currentPostingThreadState = new ThreadLocal<PostingThreadState>() { @Override protected PostingThreadState initialValue () { return new PostingThreadState(); } }; final static class PostingThreadState { final List<Object> eventQueue = new ArrayList<>(); boolean isPosting; boolean isMainThread; Subscription subscription; Object event; boolean canceled; }
其中的isMainThread()要从mainThreadSupport的初始化说起:
点击进入isMainThread()
1 2 3 private boolean isMainThread () { return mainThreadSupport != null ? mainThreadSupport.isMainThread() : true ; }
可见是根据mainThreadSupport来判断的,接着我们找到mainThreadSupport初始化的地方
1 2 3 4 5 6 7 8 EventBus(EventBusBuilder builder) { ... mainThreadSupport = builder.getMainThreadSupport(); mainThreadPoster = mainThreadSupport != null ? mainThreadSupport.createPoster(this ) : null ; backgroundPoster = new BackgroundPoster(this ); asyncPoster = new AsyncPoster(this ); ... }
进入getMainThreadSupport
1 2 3 4 5 6 7 8 9 10 11 MainThreadSupport getMainThreadSupport () { if (mainThreadSupport != null ) { return mainThreadSupport; } else if (Logger.AndroidLogger.isAndroidLogAvailable()) { Object looperOrNull = getAndroidMainLooperOrNull(); return looperOrNull == null ? null : new MainThreadSupport.AndroidHandlerMainThreadSupport((Looper) looperOrNull); } else { return null ; } }
我们看下Logger.AndroidLogger.isAndroidLogAvailable()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public static class AndroidLogger implements Logger { static final boolean ANDROID_LOG_AVAILABLE; static { boolean android = false ; try { android = Class.forName("android.util.Log" ) != null ; } catch (ClassNotFoundException e) { } ANDROID_LOG_AVAILABLE = android; } public static boolean isAndroidLogAvailable () { return ANDROID_LOG_AVAILABLE; } }
返回的是ANDROID_LOG_AVAILABLE,这个值如果存在“android.util.Log”,那么为true,否则为false,意思就是如果是运行在android中,那么为true,否则为false;
接着我们看下getAndroidMainLooperOrNull();
1 2 3 4 5 Object getAndroidMainLooperOrNull () { ... return Looper.getMainLooper(); ... }
返回的是主线程的looper,一定不为空,
那么getMainThreadSupport的返回值就是 MainThreadSupport.AndroidHandlerMainThreadSupport((Looper) looperOrNull)
接着我们看下AndroidHandlerMainThreadSupport
1 2 3 4 5 6 7 8 9 10 11 12 13 14 class AndroidHandlerMainThreadSupport implements MainThreadSupport { private final Looper looper; public AndroidHandlerMainThreadSupport (Looper looper) { this .looper = looper; } @Override public boolean isMainThread () { return looper == Looper.myLooper(); } ... }
其实就是把AndroidHandlerMainThreadSupport中的looper初始化为主线程的looper。
此时我们在看isMainThread()。如果Looper.myLooper返回的跟主线程的looper一致,那么自然就是主线程了 。因为列子中是在子线程中发的消息,所以此时返回false。我们回到post方法中:
postingState的成员变量eventQueue中有我们发送的消息,isMainThread=false;
接着只要eventQueue不为空,就执行postSingleEvent(eventQueue.remove(0), postingState);
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 private void postSingleEvent (Object event, PostingThreadState postingState) throws Error { Class<?> eventClass = event.getClass(); boolean subscriptionFound = false ; if (eventInheritance) { List<Class<?>> eventTypes = lookupAllEventTypes(eventClass); int countTypes = eventTypes.size(); for (int h = 0 ; h < countTypes; h++) { Class<?> clazz = eventTypes.get(h); subscriptionFound |= postSingleEventForEventType(event, postingState, clazz); } } else { subscriptionFound = postSingleEventForEventType(event, postingState, eventClass); } if (!subscriptionFound) { if (logNoSubscriberMessages) { logger.log(Level.FINE, "No subscribers registered for event " + eventClass); } if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class && eventClass != SubscriberExceptionEvent.class) { post(new NoSubscriberEvent(this , event)); } } }
那么接着我们进入lookupAllEventTypes
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 private static List<Class<?>> lookupAllEventTypes(Class<?> eventClass) { synchronized (eventTypesCache) { List<Class<?>> eventTypes = eventTypesCache.get(eventClass); if (eventTypes == null ) { eventTypes = new ArrayList<>(); Class<?> clazz = eventClass; while (clazz != null ) { eventTypes.add(clazz); addInterfaces(eventTypes, clazz.getInterfaces()); clazz = clazz.getSuperclass(); } eventTypesCache.put(eventClass, eventTypes); } return eventTypes; } }
就是返回MessageType以及它的父类的List
返回之后接着执行postSingleEventForEventType(event, postingState, clazz)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 private boolean postSingleEventForEventType (Object event, PostingThreadState postingState, Class<?> eventClass) { CopyOnWriteArrayList<Subscription> subscriptions; synchronized (this ) { subscriptions = subscriptionsByEventType.get(eventClass); } if (subscriptions != null && !subscriptions.isEmpty()) { for (Subscription subscription : subscriptions) { postingState.event = event; postingState.subscription = subscription; boolean aborted = false ; try { postToSubscription(subscription, event, postingState.isMainThread); aborted = postingState.canceled; } finally { postingState.event = null ; postingState.subscription = null ; postingState.canceled = false ; } if (aborted) { break ; } } return true ; } return false ; }
这里通过subscriptionsByEventType 找到该MessageType对应的subscriptions,将它赋给postingState.subscription,将该MessageType赋给postingState.event,之后调用postToSubscription(subscription, event, postingState.isMainThread);
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 private void postToSubscription (Subscription subscription, Object event, boolean isMainThread) { switch (subscription.subscriberMethod.threadMode) { case POSTING: invokeSubscriber(subscription, event); break ; case MAIN: if (isMainThread) { invokeSubscriber(subscription, event); } else { mainThreadPoster.enqueue(subscription, event); } break ; case MAIN_ORDERED: if (mainThreadPoster != null ) { mainThreadPoster.enqueue(subscription, event); } else { invokeSubscriber(subscription, event); } break ; case BACKGROUND: if (isMainThread) { backgroundPoster.enqueue(subscription, event); } else { invokeSubscriber(subscription, event); } break ; case ASYNC: asyncPoster.enqueue(subscription, event); break ; default : throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode); } }
例子中threadMode是Main,发送消息是在子线程中,接下来会执行mainThreadPoster.enqueue(subscription, event)
这里就要从mainThreadPoster的初始化说起(Eventbus构造方法中):
1 mainThreadPoster = mainThreadSupport != null ? mainThreadSupport.createPoster(this ) : null ;
mainThreadSupport.createPoster(this)
1 2 3 4 5 6 7 8 class AndroidHandlerMainThreadSupport implements MainThreadSupport { ... @Override public Poster createPoster (EventBus eventBus) { return new HandlerPoster(eventBus, looper, 10 ); } }
我们上面提到过这里的looper是主线程的looper。我们接着进入new HandlerPoster(eventBus, looper, 10)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 public class HandlerPoster extends Handler implements Poster { private final PendingPostQueue queue; private final int maxMillisInsideHandleMessage; private final EventBus eventBus; private boolean handlerActive; protected HandlerPoster (EventBus eventBus, Looper looper, int maxMillisInsideHandleMessage) { super (looper); this .eventBus = eventBus; this .maxMillisInsideHandleMessage = maxMillisInsideHandleMessage; queue = new PendingPostQueue(); } public void enqueue (Subscription subscription, Object event) { PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event); synchronized (this ) { queue.enqueue(pendingPost); if (!handlerActive) { handlerActive = true ; if (!sendMessage(obtainMessage())) { throw new EventBusException("Could not send handler message" ); } } } } @Override public void handleMessage (Message msg) { boolean rescheduled = false ; try { long started = SystemClock.uptimeMillis(); while (true ) { PendingPost pendingPost = queue.poll(); if (pendingPost == null ) { synchronized (this ) { pendingPost = queue.poll(); if (pendingPost == null ) { handlerActive = false ; return ; } } } eventBus.invokeSubscriber(pendingPost); long timeInMethod = SystemClock.uptimeMillis() - started; if (timeInMethod >= maxMillisInsideHandleMessage) { if (!sendMessage(obtainMessage())) { throw new EventBusException("Could not send handler message" ); } rescheduled = true ; return ; } } } finally { handlerActive = rescheduled; } } }
HandlerPoster是EventBus中比较重要的类,它继承自handler,因为我们传入的looper是主线程的,所以该handler是主线程的handler。
此时回到我们的初衷:mainThreadPoster.enqueue(subscription, event);
这里会将subscription跟event转化为PendingPost,pendingpost是用来做消息复用的,具体如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 static PendingPost obtainPendingPost (Subscription subscription, Object event) { synchronized (pendingPostPool) { int size = pendingPostPool.size(); if (size > 0 ) { PendingPost pendingPost = pendingPostPool.remove(size - 1 ); pendingPost.event = event; pendingPost.subscription = subscription; pendingPost.next = null ; return pendingPost; } } return new PendingPost(event, subscription); }
核心思路就是如果pendingPostPool这个池子里有,那么就从池子里拿,没有才会new 对象。拿到PendingPost之后,将它放入PendingPostQueue 队列中。之后执行sendMessage(obtainMessage()) 发送一个空消息。
接着我们进入handleMessage。改方法会从PendingPostQueue 队列中不断的poll数据,知道队列为空。每个数据都会执行eventBus.invokeSubscriber(pendingPost);
1 2 3 4 5 6 7 8 9 10 11 12 13 14 void invokeSubscriber (PendingPost pendingPost) { Object event = pendingPost.event; Subscription subscription = pendingPost.subscription; PendingPost.releasePendingPost(pendingPost); if (subscription.active) { invokeSubscriber(subscription, event); } } void invokeSubscriber (Subscription subscription, Object event) { ... subscription.subscriberMethod.method.invoke(subscription.subscriber, event); ... }
改方法就是通过反射调用接收消息的方法。
以上,完成了子线程给主线程传递消息的整个过程。
如果接收消息也是子线程(也就是threadMode设置的是backGround),如果发送消息在主线程,那么会调用backgroundPoster.enqueue(subscription, event); 将线程切换到子线程中,否者直接反射调用即可。最后我们看下backgroundPoster的实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 final class BackgroundPoster implements Runnable , Poster { private final PendingPostQueue queue; private final EventBus eventBus; private volatile boolean executorRunning; BackgroundPoster(EventBus eventBus) { this .eventBus = eventBus; queue = new PendingPostQueue(); } public void enqueue (Subscription subscription, Object event) { PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event); synchronized (this ) { queue.enqueue(pendingPost); if (!executorRunning) { executorRunning = true ; eventBus.getExecutorService().execute(this ); } } } @Override public void run () { try { try { while (true ) { PendingPost pendingPost = queue.poll(1000 ); if (pendingPost == null ) { synchronized (this ) { pendingPost = queue.poll(); if (pendingPost == null ) { executorRunning = false ; return ; } } } eventBus.invokeSubscriber(pendingPost); } } catch (InterruptedException e) { eventBus.getLogger().log(Level.WARNING, Thread.currentThread().getName() + " was interruppted" , e); } } finally { executorRunning = false ; } } }
它本身是一个runnable,维护着一个队列,入队列之后执行eventBus.getExecutorService().execute(this) 把自己放入线程池中(cache线程池),此时就会执行run方法,而run方法会不断的从队列中poll数据,之后反射执行。
反射优化:apt 运行时使用反射对会有一定的性能损耗,使用apt(注解工具)通过编译时生成对应的代码,从而规避了运行时反射。
总结 实质:当发送某个消息时执行特定类的特定方法
原理:其实就是利用注解跟反射通过注册类的方式将(类—方法–消息类型)对应的关系表维护在Eventbus中,当post某个消息时查询该关系表,通过反射执行指定类的指定方法,切换线程使用handler跟线程池。
最后:evenbus中相关的知识点有很多:设计模式,线程池,handler,ThreadLocal,反射,注解,多线程…源码还是推荐大家简单看一下。