Принцип реализации платформы сообщений Guava EventBus

Java задняя часть исходный код Android EventBus
Принцип реализации платформы сообщений Guava EventBus

Концепция дизайна EventBus основана на шаблоне наблюдателя, вы можете обратиться кШаблон проектирования (1) — шаблон наблюдателяДавайте начнем с понимания этого шаблона проектирования.

1. Пример программы

Использование EventBus очень просто, сначала вам нужно добавитьGuavaзависит от вашего собственного проекта. Здесь мы проиллюстрируем на базовом примереEveentBusкак это используется.

public static void main(String...args) {
    // 定义一个EventBus对象,这里的Joker是该对象的id
    EventBus eventBus = new EventBus("Joker");
    // 向上述EventBus对象中注册一个监听对象	
    eventBus.register(new EventListener());
    // 使用EventBus发布一个事件,该事件会给通知到所有注册的监听者
    eventBus.post(new Event("Hello every listener, joke begins..."));
}

// 事件,监听者监听的事件的包装对象
public static class Event {
    public String message;
    Event(String message) {
        this.message = message;
    }
}

// 监听者
public static class EventListener {
    // 监听的方法,必须使用注解声明,且只能有一个参数,实际触发一个事件的时候会根据参数类型触发方法
    @Subscribe
    public void listen(Event event) {
        System.out.println("Event listener 1 event.message = " + event.message);
    }
}

Во-первых, здесь мы инкапсулируем объект событияEvent, объект слушателяEventListener. Затем мы используемEventBusконструктор создаетEventBusэкземпляр и зарегистрируйте в нем указанный выше экземпляр слушателя. Затем воспользуемся приведенным вышеEventBusЭкземпляр публикует событиеEvent. Затем использование зарегистрированных выше слушателей@SubscribeАннотация объявляет и имеет только одинEventМетод параметра типа будет запущен при запуске события.

Резюме: из приведенного выше использования мы можем видеть, что разница между EventBus и режимом наблюдателя заключается в том, что когда прослушиватель зарегистрирован, только когда используется метод@SubscribeОбъявление аннотации и параметры соответствуют опубликованному типу события, тогда этот метод будет активирован. Это означает, что один и тот же прослушиватель может прослушивать несколько типов событий, а также несколько раз прослушивать одно и то же событие.

2, анализ исходного кода Eventbus

2.1 Перед анализом

Что ж, в приведенном выше примере мы понимаем самое основное использование EventBus. Ниже мы анализируемGuavaКак этот API реализован у нас. Однако в первую очередь мы все же пытаемся подумать о том, как спроектировать этот API самостоятельно, и задать несколько вопросов, а затем обратиться с вопросами к исходному коду, чтобы найти ответ.

Если мы хотим спроектировать такой API, самый простой способ — расширить шаблон наблюдателя: каждый вызовEventBus.post()Когда метод используется, он проходит через все объекты-наблюдатели, а затем получает все их методы, чтобы определить, используется ли этот метод.@SubscribeИ является ли тип параметра методаpost()Последовательный способ публиковать типы событий, если согласуются, то мы будем использовать этот метод для запуска отражения. В режиме наблюдателя каждый наблюдатель должен реализовывать интерфейс, когда выпуск событий, мы просто вызовите метод интерфейса на линии, но этот предел EventBus более широко, то есть слушателей не нужно реализовывать любой интерфейс, если Используемый метод и параметр аннотация может совпадать.

Как видно из приведенного выше анализа, необходимо пройти не только по всем слушателям, но и по их методам, а затем использовать отражение для запуска этого метода после нахождения подходящего метода. Во-первых, когда количество зарегистрированных слушателей велико, эффективность цепного вызова невысока, тогда нам приходится использовать отражение для запуска метода сопоставления, поэтому эффективность определенно ниже. затем вGuavaизEventBusКак он решает эти две проблемы?

Также обратите внимание на следующее观察者и监听者, слушатель используется для обозначения того, что мы используемEventBus.register()Зарегистрированный объект, наблюдатель является объектом в EventBusSubscriber, Который инкапсулирует всю информацию слушателя, метода слушателя и тому подобное, например. Как правило, мы не работаем напрямуюSubscriberобъект, его права доступа доступны только в пакете EventBus.

2.2 Начать анализ

Во-первых, когда мы используемnewПри инициализации EventBus фактически вызывается следующий метод:

EventBus(String identifier, Executor executor, Dispatcher dispatcher, SubscriberExceptionHandler exceptionHandler) {
    this.subscribers = new SubscriberRegistry(this);
    this.identifier = (String)Preconditions.checkNotNull(identifier);
    this.executor = (Executor)Preconditions.checkNotNull(executor);
    this.dispatcher = (Dispatcher)Preconditions.checkNotNull(dispatcher);
    this.exceptionHandler = (SubscriberExceptionHandler)Preconditions.checkNotNull(exceptionHandler);
}

здесьidentifierэто строковый тип, аналогичный идентификатору EventBus;subscribersТип SubscriberRegistry, на самом деле, EventBus добавляет, удаляет и итерирует средство просмотра, которое будет использовать метод этого примера, все наблюдатели также поддерживают информацию в этом примере;executorЭто пул потоков, используемый в процессе распределения событий, который можно реализовать самостоятельно;dispatcherЭто подкласс типа Dispatcher, используемый для рассылки сообщений слушателям при публикации событий, имеет несколько реализаций по умолчанию для разных методов рассылки;exceptionHandlerЭто тип SubscriberExceptionHandler, который используется для обработки информации об исключении.В реализации EventBus по умолчанию журнал будет распечатываться при возникновении исключения.Конечно, мы также можем определить нашу собственную стратегию обработки исключений.

Итак, как видно из приведенного выше анализа, если мы хотим понять, как EventBus регистрируется и не регистрируется, и как выполнять обход для запуска событий, мы должны начать сSubscriberRegistryНачать. В самом деле, я также считаю, что реализация этого класса — самая захватывающая часть EventBus.

2.2.1 SubscriberRegistry

Согласно анализу в 2.1, нам нужно поддерживать несколько отображений в EventBus, чтобы найти и оповестить всех слушателей при публикации событий, в первую очередь事件类型->观察者列表Отображение. Мы сказали выше, события EventBus публикуются для каждого метода, мы будем соответствующей информацией о типе события, и все методы информации хранятся в одном объекте, это наблюдатель в EventBusSubscriber. Затем средство просмотра сопоставляется со списком по типу события при выпуске событий, пока список, по мнению наблюдателей, находит все типы событий и триггеры для отслеживания метода. В SubscriberRegistry это достигается путем сопоставления структуры данных:

private final ConcurrentMap<Class<?>, CopyOnWriteArraySet<Subscriber>> subscribers = Maps.newConcurrentMap();

Из приведенной выше формы определения мы видим, что здесь используется сопоставление типа класса события со списком подписчиков. Список подписчиков здесь использует коллекцию CopyOnWriteArraySet в Java, Он использует CopyOnWriteArrayList внизу и инкапсулирует его, то есть добавляет операции дедупликации в базовую коллекцию. Это подходящееБольше читайте и меньше пишитеКоллекция сцен, которые не будут блокироваться при чтении данных, Блокировка выполняется при записи данных, выполняется копирование массива.

Теперь, когда мы знаем, что сопоставление будет вставлено в приведенную выше структуру данных во время регистрации в SubscriberRegistry, мы можем посмотреть, как это делается.

в анализеregister()Перед методом давайте рассмотрим несколько методов, которые часто используются внутри SubscriberRegistry, и их принципы тесно связаны с вопросами, которые мы подняли выше. прежде всегоfindAllSubscribers()метод, который используется для получения всех наборов наблюдателей, соответствующих указанному слушателю. Вот его код:

private Multimap<Class<?>, Subscriber> findAllSubscribers(Object listener) {
    // 创建一个哈希表
    Multimap<Class<?>, Subscriber> methodsInListener = HashMultimap.create();
    // 获取监听者的类型
    Class<?> clazz = listener.getClass();
    // 获取上述监听者的全部监听方法
    UnmodifiableIterator var4 = getAnnotatedMethods(clazz).iterator(); // 1
    // 遍历上述方法,并且根据方法和类型参数创建观察者并将其插入到映射表中
    while(var4.hasNext()) {
        Method method = (Method)var4.next();
        Class<?>[] parameterTypes = method.getParameterTypes();
        // 事件类型
        Class<?> eventType = parameterTypes[0];
        methodsInListener.put(eventType, Subscriber.create(this.bus, listener, method));
    }
    return methodsInListener;
}

Обратите внимание здесьMultimapСтруктура данных, которая представляет собой структуру множества, предусмотренную в Guava, отличается от обычной хеш-таблицы тем, что может выполнять операции «один ко многим». Это используется для хранения сопоставления типов событий «один ко многим» с наблюдателями. Обратите внимание на код в следующем 1. Выше мы также упоминали, что при регистрации нового слушателя процесс использования отражения для получения всех методов и вынесения суждений очень расточительно снижает производительность, и вот ответ на этот вопрос:

здесьgetAnnotatedMethods()метод попробует изsubscriberMethodsCacheПолучите все методы регистрации слушателей (то есть, используя аннотации и имеющие только один параметр), следующее является определением этого метода:

private static ImmutableList<Method> getAnnotatedMethods(Class<?> clazz) {
    return (ImmutableList)subscriberMethodsCache.getUnchecked(clazz);
}

здесьsubscriberMethodsCacheОпределение:

private static final LoadingCache<Class<?>, ImmutableList<Method>> subscriberMethodsCache = CacheBuilder.newBuilder().weakKeys().build(new CacheLoader<Class<?>, ImmutableList<Method>>() {
    public ImmutableList<Method> load(Class<?> concreteClass) throws Exception { // 2
        return SubscriberRegistry.getAnnotatedMethodsNotCached(concreteClass);
    }
});

Механизм действия здесь таков: при употребленииsubscriberMethodsCache.getUnchecked(clazz)При попадании метода в указанный слушатель он сначала попытается получить его из кеша, если его нет в кеше, то выполнит код в 2-х местах. Позвоните SubscriberRegistry вgetAnnotatedMethodsNotCached()метод, чтобы получить эти методы слушателя. Здесь мы опускаем определение этого метода, подробности можно посмотреть в настройках исходного кода, на самом деле использовать рефлексию и выполнять некоторые проверки несложно.

На этом мы закончили анализfindAllSubscribers()метод, разберитесь: при регистрации слушателя вы сначала получите тип слушателя, а затем попытаетесь получить все методы прослушивания, соответствующие слушателю из кеша, если нет, то пройдитесь по методам класса, чтобы получить его , и добавить в кеш; Затем он пройдет через коллекцию методов, полученную выше, создаст наблюдателя на основе типа события (известного из параметров метода), слушателя и другой информации, а также事件类型-观察者Многие пары ключ-значение вставляются в таблицу сопоставления и возвращаются.

Далее, давайте взглянем на EventBus вregister()Код метода:

void register(Object listener) {
    // 获取事件类型-观察者映射表
    Multimap<Class<?>, Subscriber> listenerMethods = this.findAllSubscribers(listener);
    Collection eventMethodsInListener;
    CopyOnWriteArraySet eventSubscribers;
    // 遍历上述映射表并将新注册的观察者映射表添加到全局的subscribers中
	for(Iterator var3 = listenerMethods.asMap().entrySet().iterator(); var3.hasNext(); eventSubscribers.addAll(eventMethodsInListener)) {
        Entry<Class<?>, Collection<Subscriber>> entry = (Entry)var3.next();
        Class<?> eventType = (Class)entry.getKey();
        eventMethodsInListener = (Collection)entry.getValue();
        eventSubscribers = (CopyOnWriteArraySet)this.subscribers.get(eventType);
        // 如果指定事件对应的观察者列表不存在就创建一个新的
        if (eventSubscribers == null) {
            CopyOnWriteArraySet<Subscriber> newSet = new CopyOnWriteArraySet();
            eventSubscribers = (CopyOnWriteArraySet)MoreObjects.firstNonNull(this.subscribers.putIfAbsent(eventType, newSet), newSet);
        }
    }
}

в реестре подписчиковregister()метод сunregister()Метод аналогичен, и мы не будем его описывать. Смотрите ниже при звонкеEventBus.post()логика метода. Вот его код:

public void post(Object event) {
    // 调用SubscriberRegistry的getSubscribers方法获取该事件对应的全部观察者
    Iterator<Subscriber> eventSubscribers = this.subscribers.getSubscribers(event);
    if (eventSubscribers.hasNext()) {
        // 使用Dispatcher对事件进行分发
        this.dispatcher.dispatch(event, eventSubscribers);
    } else if (!(event instanceof DeadEvent)) {
        this.post(new DeadEvent(this, event));
    }
}

Как видно из вышеуказанного кода, когда вызов на самом делеEventBus.post()Когда используется этот метод, метод getSubscribers из SubscriberRegistry используется для получения всех наблюдателей, соответствующих событию, поэтому сначала нам нужно рассмотреть эту логику. Вот определение метода:

Iterator<Subscriber> getSubscribers(Object event) {
    // 获取事件类型的所有父类型和自身构成的集合
    ImmutableSet<Class<?>> eventTypes = flattenHierarchy(event.getClass()); // 3
    List<Iterator<Subscriber>> subscriberIterators = Lists.newArrayListWithCapacity(eventTypes.size());
    UnmodifiableIterator var4 = eventTypes.iterator();
    // 遍历上述事件类型,并从subscribers中获取所有的观察者列表
    while(var4.hasNext()) {
        Class<?> eventType = (Class)var4.next();
        CopyOnWriteArraySet<Subscriber> eventSubscribers = (CopyOnWriteArraySet)this.subscribers.get(eventType);
        if (eventSubscribers != null) {
            subscriberIterators.add(eventSubscribers.iterator());
        }
    }
    return Iterators.concat(subscriberIterators.iterator());
}

Обратите внимание на следующие 3 кода, которые используются для получения коллекции всех родительских классов текущего события, включая их собственные типы, то есть, если мы инициируем событие типа Interger, то методы прослушивания таких типов, как поскольку число и объект будут использоваться, можно получить это событие и вызвать его. Логика здесь очень простая, то есть по типу события находим соответствующий этому типу наблюдатель и все его родительские классы и возвращаем его.

2.2.2 Dispatcher

Далее давайте посмотрим, как выглядит реальная логика диспетчеризации событий.

отEventBus.post()Видно, что когда мы используем Dispatcher для распределения событий, нам нужно передать текущее событие и всех наблюдателей в метод в качестве параметров. Затем внутри метода выполните операцию отправки. Наконец, метод мониторинга слушателя запускается отражением, Эта часть логики находится вSubscriberВнутренне Dispatcher представляет собой стратегический интерфейс для отправки событий. В EventBus предусмотрены три реализации Dispatcher по умолчанию, которые используются для распределения событий в различных сценариях:

  1. ImmediateDispatcher: обход всех наблюдателей непосредственно в текущем потоке и распределение событий;
  2. LegacyAsyncDispatcher: асинхронный метод, есть два цикла, один первый и другой, первый используется для непрерывной вставки инкапсулированного объекта-наблюдателя в глобальную очередь, а второй используется для непрерывного удаления объекта-наблюдателя из очереди для распределения событий; в Фактически, EventBus имеет подкласс AsyncEventBus, который использует этот диспетчер для распределения событий.
  3. PerThreadQueuedDispatcher: этот диспетчер использует две локальные переменные потока для управления, когдаdispatch()При вызове метода он сначала получает очередь наблюдателей текущего потока и передает список входящих наблюдателей в очередь, а затем использует локальную переменную потока логического типа, чтобы определить, выполняет ли текущий поток операцию распределения. никакая операция распределения не выполняется, распределение событий выполняется путем обхода вышеуказанной очереди.

Вышеупомянутые три дистрибьютора в конечном итоге позвонятdispatchEvent()метод отправки событий:

final void dispatchEvent(final Object event) {
    // 使用指定的执行器执行任务
    this.executor.execute(new Runnable() {
        public void run() {
            try {
                // 使用反射触发监听方法
                Subscriber.this.invokeSubscriberMethod(event);
            } catch (InvocationTargetException var2) {
                // 使用EventBus内部的SubscriberExceptionHandler处理异常
                Subscriber.this.bus.handleSubscriberException(var2.getCause(), Subscriber.this.context(event));
            }
        }
    });
}

в вышеуказанном методеexecutorЯвляется ли исполнитель, это черезEventBusПолучено; также передается тип SubscriberExceptionHandler, обрабатывающий исключенияEventBusполученный. (Оказывается, здесь используются поля конструктора в EventBus!) Что касается вызова метода триггера отражения, то здесь не слишком сложная логика.

Кроме того, следует отметить, что у подписчика также есть класс слов SynchronizedSubscriber, который отличается от обычного подписчика тем, что его метод, запускаемый отражением, вызываетсяsychronizedМодификация ключевого слова, то есть его метод триггера заблокирован и потокобезопасен.

Суммировать:

На данный момент мы завершили анализ исходного кода EventBus. Кратко резюмируя:

EventBus поддерживает три и четыре кэш-карты:

  1. Отображение типов событий в списках наблюдателей (кэш);
  2. Сопоставление типов событий спискам методов прослушивателя (кэш);
  3. Отображение (кэш) типов событий в список типов событий и типов всех их суперклассов;
  4. Отображение наблюдателей на слушателей и сопоставление наблюдателей с методами мониторинга;

Подписчик Observer инкапсулирует прослушиватели и методы прослушивания, которые можно напрямую отражать и запускать. И если он сопоставляется с прослушивателем, необходимо определить тип вызываемого метода слушателя. Лично я считаю этот дизайн отличным, потому что нам больше не нужно поддерживать кеш сопоставления в EventBus, потому что сопоставление один к одному было выполнено в Subscriber.

Каждый раз, когда вы используете EventBus для регистрации и отменить регистрацию слушателя, он сначала будет получен из кэша, и отражение не будет использоваться каждый раз. Это может повысить эффективность приобретения, а также решить проблему эффективности, которую мы подняли в начале. Кажется неизбежен при использовании отражения для вызовов метода триггера.

Наконец, в EventBus используется множество структур данных, таких как MultiMap, CopyOnWriteArraySet и т. д., а также некоторые библиотеки инструментов кэширования и сопоставления, большинство из которых исходит из Guava.

Прочитав реализацию EventBus, я искренне чувствую, что инженеры Google действительно молодцы! И в Гуаве есть много более богатого контента, достойного нашего изучения!

Чтобы понять локальный обход потока, вы можете обратиться к другому моему сообщению в блоге:Использование ThreadLocal и реализация его исходного кода

Большое спасибо за внимание~