Что такое шаблон прослушивателя событий?
Во-первых, почему я подчеркиваю слово событие, потому что он является целью.В нашем развитии мы определенно видели кучу суффиксов, которые
Listener
Класс, это режим слушателя, режим слушателя - этоCS开发架构
, что является хорошей развязкой дизайна. Слушатель регистрируется в почтовом отделении и подписывается на определенное событие (заранее запланированное), почта будет публиковать сообщения по мере необходимости, а слушатель будет получать сообщения вовремя для обработки. их Во всей среде разработки Java JDK уже определил для нас интерфейс. Spring реализован на основе интерфейса JDK, а Guava — еще один метод реализации. Каждый из них имеет свои преимущества. Давайте посмотрим, является ли это обратным вызовом или блокировкой, или Guava , в некотором роде, Чем он отличается от режима наблюдателя?У нас есть возможность поговорить о
1. Собственная спецификация Java
1. EventObject
Объект события, ему нужен источник события, переданный конструктором
public class EventObject implements java.io.Serializable {
protected transient Object source;
public EventObject(Object source) {
if (source == null)
throw new IllegalArgumentException("null source");
this.source = source;
}
........... 其他省略
}
2. EventListener
Event listener, он отвечает за мониторинг событий, JAVA предоставляет пустой интерфейс, напишем
public interface EventListener {
}
3. Резюме
Мы обнаружили, что Java предоставляет только один объект события и один прослушиватель событий, поэтому нам нужно соблюдать эту спецификацию для разработки.
2. Спецификация Java разрабатывает шаблон прослушивателя на основе шаблона обратного вызова.
1. Источник событий — EventSource
При нормальных обстоятельствах он будет установлен на тип объекта, и нам не нужно его разрабатывать.Чтобы отразить роль шаблона проектирования, мы разработали
@ToString
@Setter
@Getter
public class EventSource {
private String name;
private String info;
}
2. Объект события — EventObject
Здесь мы унаследовали EventObject, просто реализовали его ненадолго и особо не занимались упаковкой
public class CoreEventObject extends EventObject {
public CoreEventObject(EventSource source) {
super(source);
}
}
3. Прослушиватель событий — EventListener
Здесь наши настоящие слушатели обычно должны быть оформлены как функциональный интерфейс, я узнал об этом из фреймворка Spring, потому что функциональные интерфейсы могут отражать обратные вызовы.
@FunctionalInterface
public interface CoreEventListener<E extends CoreEventObject> extends EventListener {
void onEventObject(E event);
}
4. Издатель событий — EventPublisher
Издатель события, поскольку нет опубликованного объекта события, где слушатель?
public class EventPublisher<E extends CoreEventObject> {
private CoreEventListener<E> listener;
public EventPublisher(CoreEventListener<E> listener) {
this.listener = listener;
}
public void publish(E object){
System.out.println("发布事件 : " + object);
// 传给 CoreEventListener
listener.onEventObject(object);
}
}
5. Тестовая демонстрация
public class TestDemo {
public static void main(String[] args) {
// 1. 创建一个事件发布者
EventPublisher<CoreEventObject> publisher = new EventPublisher<>(new CoreEventListener<CoreEventObject>() {
@Override
public void onEventObject(CoreEventObject event) {
System.out.println("接收到事件源 : " + event.getSource() + " , 当前线程 : " + Thread.currentThread().getName());
}
});
// 2. 发布一个事件对象
publisher.publish(getCoreEventObject());
}
private static CoreEventObject getCoreEventObject(){
..... 此处省略
return eventObject;
}
}
Выходной результат:
发布事件 : com.example.listener_design_pattern.CoreEventObject[source=EventSource(name=事件源, info=Sat Nov 09 14:34:50 CST 2019)]
接收到事件源 : EventSource(name=事件源, info=Sat Nov 09 14:34:50 CST 2019) , 当前线程 : main
Мы обнаружили, что мы успешно получили объект события и источник события, что является прелестью функции ловушки. Фактически, вы только что сделали релиз события и у вас нет намерения наблюдать за другими вещами. Вам нужен только слушатель, чтобы слушать, Таким образом, публикация и мониторинг вашего события полностью отделены друг от друга.На самом деле, нижний уровень представляет собой ссылку на адрес.
3. В чем проблема с функцией обратного вызова?
1. Наша проблема?
Во многих сценариях наши события публикации и события прослушивания полностью находятся в двух потоках, так как же нам получить объект события?
Если мы используем его просто, это будет написано так?
public class TestEventListener implements CoreEventListener<CoreEventObject> {
private CoreEventObject object;
@Override
public void onEventObject(CoreEventObject object) {
// 赋值给成员变量
this.object = object;
}
// 获取成员变量
public CoreEventObject getObject() {
return object;
}
}
есть тест:
public class TestDemo {
public static void main(String[] args) {
TestEventListener listener = new TestEventListener();
CoreEventObject object = listener.getObject();
// 先去拿 ,后去发布
System.out.println(object.getSource());
EventPublisher<CoreEventObject> publisher = new EventPublisher<>(listener);
publisher.publish(getCoreEventObject());
}
private static CoreEventObject getCoreEventObject(){
....
return eventObject;
}
}
выходной результат
Exception in thread "main" java.lang.NullPointerException
at com.example.listener_design_pattern.TestDemo.main(TestDemo.java:28)
Некоторые люди скажут, вы ошибаетесь, конечно, вы не можете получить это, потому что они еще не выпустили его, но в случае с многопоточностью и развязкой, как вы узнаете, когда выпуск другой стороны закончился , вы идете, чтобы получить его Что?Это требует многопоточности знания Java.Будущее напоминает нам, что это идея блокировки.Только когда слушатель действительно получает объект, мы можем его получить.
2. Решить проблему
Знайте раздел, о котором я упоминал ранееFutureTask
Как этого добиться, думаю проблема решена.
public class TestEventListener implements CoreEventListener<CoreEventObject> {
private CoreEventObject object;
/**
* 当 X = 0 ,代表 obj还没有初始化了
* 当 x = 1 , 代表 obj 以及初始化了 , 已经接收到了
*/
private static volatile int x = 0;
@Override
public void onEventObject(CoreEventObject object) {
this.object = object;
// 收到改成 1
x = 1;
}
public CoreEventObject getObject() {
while (true) {
if (x == 1) {
break;
}
}
// 拿到对象,再设置为1
x = 0;
return object;
}
}
Благодаря этому решению, будет выполнятьсяgetObject()
Поток все время заблокирован, то есть бесконечный цикл, у нас должен быть поток для выполнения этого метода,
public class TestDemo {
public static void main(String[] args) {
TestEventListener listener = new TestEventListener();
// 新建一个线程去接收
Thread thread = new Thread(() -> {
System.out.println("我开始接收对象 : " + System.currentTimeMillis());
CoreEventObject object = listener.getObject();
System.out.println("成功接收对象 : "+object.getSource());
});
thread.start();
// 新建一个线程去发布
EventPublisher<CoreEventObject> publisher = new EventPublisher<>(listener);
new Thread(()->{
publisher.publish(getCoreEventObject());
}).start();
}
private static CoreEventObject getCoreEventObject(){
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
EventSource source = new EventSource();
source.setName("事件源");
source.setInfo("" + System.currentTimeMillis());
return new CoreEventObject(source);
}
}
Выходной результат:
我开始接收对象 : 1573282924555
发布事件 : com.example.listener_design_pattern.CoreEventObject[source=EventSource(name=事件源, info=1573282925590)]
成功接收对象 : EventSource(name=事件源, info=1573282925590)
Когда мы его получили, мы потерли его в момент времени 1573282924555, а реальный объект был фактически выпущен в 1573282925590, что полностью соответствовало двум временным линиям, поэтому мы успешно решили проблему.
4. ApplicationListener в Spring
1. ApplicationEvent
Класс, который должен быть расширен всеми событиями приложения. Абстрактный, так как не имеет смысла публиковать общие события напрямую.
Этот класс принадлежит
application events
унаследовал. Причина абстракции заключается в прямой публикации этогоApplicationEvent
это бессмысленно.
2. ApplicationListener
Интерфейс, реализуемый прослушивателями событий приложения. Основан на стандартном интерфейсе java.util.EventListener для шаблона проектирования Observer.
Этот интерфейс используется всеми
application event listeners.
Реализовано на основе Javajava.util.EventListener
спецификация интерфейса
3. Начало работы
У нас есть требование, чтобы у нас была служба, которая будет постоянно получать информацию о конфигурации с удаленного устройства и публиковать информацию о конфигурации после внесения изменений.
1. Конфигурация — источник событий
@ToString
@Setter
@Getter
public class Config {
private String namespace;
private Map<String, Object> info;
}
2. ConfigEvent - объект события
// 这个注解,我们是根据Spring源码看到的 , 所以一致性,我就加了
@SuppressWarnings("serial")
public class ConfigEvent extends ApplicationEvent {
public ConfigEvent(Config source) {
super(source);
}
}
3. ConfigEventListener — прослушиватель событий
@Component
public class ConfigEventListener implements ApplicationListener<ConfigEvent> , Ordered, InitializingBean {
@Override
public void onApplicationEvent(ConfigEvent event) {
System.out.println("接收到更新信息 : " + event.getSource()+" , 当前线程 : "+Thread.currentThread().getName());
}
// 保证执行顺序 , 多个 ConfigEventListener就需要实现这个接口
@Override
public int getOrder() {
return Ordered.HIGHEST_PRECEDENCE;
}
// 初始化以后要做什么 ?
@Override
public void afterPropertiesSet() throws Exception {
System.out.println("初始化当前ConfigEventListener");
}
}
5. ConfigServer — служба центра конфигурации
@Service
public class ConfigServer {
// 注入applicationContext,因为只有他才可以执行发布事件
@Autowired
private ApplicationContext applicationContext;
// 这个是开启异步 ,后面会说到
// @Async
public void publishConfig(){
// 需要发布 --- > 改变的事件
System.out.println("发布事件成功 , 当前线程 : "+Thread.currentThread().getName());
applicationContext.publishEvent(getChange());
}
public ConfigEvent getChange(){
Config config = new Config();
config.setNamespace("application");
HashMap<String, Object> conf = new HashMap<>();
conf.put("server.port", 8088);
config.setInfo(conf);
return new ConfigEvent(config);
}
}
6. Запустите тест
@SpringBootApplication
public class SpringListenerApplication implements CommandLineRunner {
public static void main(String[] args) {
SpringApplication.run(SpringListenerApplication.class, args);
}
@Autowired
private ConfigServer server;
@Override
public void run(String... args) throws Exception {
server.publishConfig();
}
}
Выходной результат:
.....
初始化当前ConfigEventListener
....
发布事件成功 , 当前线程 : main
接收到更新信息 : Config(namespace=application, info={server.port=8088}) , 当前线程 : main
такSpring-Boot
Мониторинг событий по-прежнему очень прост, по аналогии сSpring
Одна причина, я думаю, это знают все, кто это знает, но другая проблема в том, что наши публикации и мониторинг являются одновременно основной нитью, нехорошо, в этом много событий?
4. Включите асинхронную публикацию
Требуются две аннотации@EnableAsync
Запустите асинхронную функцию и@Async
Метод выполняется асинхронно
发布事件成功 , 当前线程 : SimpleAsyncTaskExecutor-1
接收到更新信息 : Config(namespace=application, info={server.port=8088}) , 当前线程 : SimpleAsyncTaskExecutor-1
Мы обнаружили, что появилось выполнение пула потоков. Этот логический пул потоков можно настроить. Нам нужно только явно внедрить следующиеSimpleAsyncTaskExecutor
Бин сделает
@Bean
public SimpleAsyncTaskExecutor simpleAsyncTaskExecutor() {
SimpleAsyncTaskExecutor executor = new SimpleAsyncTaskExecutor();
// 需要传入一个 ThreadFactory实现类 , 所以看过我前面写的文章应该会写这个,比如 JUC- Executor那节
executor.setThreadFactory(new MyThreadFactory("anthony"));
return executor;
}
Выходной результат:
发布事件成功 , 当前线程 : anthony-1
接收到更新信息 : Config(namespace=application, info={server.port=8088}) , 当前线程 : anthony-1
5. Несколько слушателей выполняются последовательно
Вы можете реализовать, как яApplicationListener
а такжеOrdered
, или вы можете напрямую реализоватьSmartApplicationListener
Это все равно, ни хорошо, ни плохо
Слушатель первый:
@Component
public class ConfigEventListenerStart implements ApplicationListener<ConfigEvent> , Ordered, InitializingBean {
@Override
public void onApplicationEvent(ConfigEvent event) {
System.out.println("ConfigEventListenerStart 接收到更新信息 : " + event.getSource()+" , 当前线程 : "+Thread.currentThread().getName());
}
@Override
public int getOrder() {
return Ordered.HIGHEST_PRECEDENCE;
}
@Override
public void afterPropertiesSet() throws Exception {
System.out.println("初始化当前监听器 : " + this.toString());
}
}
Второй слушатель:
@Component
public class ConfigEventListenerEnd implements ApplicationListener<ConfigEvent> , Ordered, InitializingBean {
@Override
public void onApplicationEvent(ConfigEvent event) {
System.out.println("ConfigEventListenerEnd 接收到更新信息 : " + event.getSource()+" , 当前线程 : "+Thread.currentThread().getName());
}
@Override
public int getOrder() {
return Ordered.HIGHEST_PRECEDENCE-1;
}
@Override
public void afterPropertiesSet() throws Exception {
System.out.println("初始化当前监听器 : " + this.toString());
}
}
Выходной результат:
初始化当前监听器 : com.example.springlistener.listener.ConfigEventListenerEnd@6b54655f
初始化当前监听器 : com.example.springlistener.listener.ConfigEventListenerStart@665e9289
.....
发布事件成功 , 当前线程 : anthony-1
ConfigEventListenerStart 接收到更新信息 : Config(namespace=application, info={server.port=8088}) , 当前线程 : anthony-1
ConfigEventListenerEnd 接收到更新信息 : Config(namespace=application, info={server.port=8088}) , 当前线程 : anthony-1
5. EventBus в Гуаве
EventBus от Guava — хороший инструмент управления регистрацией и публикацией событий, относится к режиму push, который очень похож на Spring.
1. Зависимость
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>28.0-jre</version>
</dependency>
2. Быстрый старт
Главный объект,
EventBus
Шина событий, которая управляет всеми слушателями (или подписчиками) черезEventBus#register
илиEventBus#unRegister
Для управления, в то же время, слушатель должен отслеживать событие, которое основано на уровне метода (обратите внимание, что метод может иметь только один параметр, который является наблюдаемым событием), который необходимо добавить в метод.@Subscribe
Аннотация для указания мониторинга,EventBus
в состоянии пройтиEventBus#post
для публикации события, его получит слушатель соответствующего типа.EventBus
Может обрабатывать исключения
public class QuicklyStart {
public static void main(String[] args) {
// 创建一个 事件总线
EventBus bus = new EventBus(new SubscriberExceptionHandler() {
@Override
public void handleException(Throwable exception, SubscriberExceptionContext context) {
// 处理订阅者异常信息
System.out.println("异常信息 : "+exception.getMessage() + ", 异常事件 : " + context.getEvent());
}
});
// 注册你的监听器 , 其实更加准确来说是订阅者 , 他属于一种发布订阅模式
bus.register(new EventListener());
// 事件总线发布事件
bus.post("sb");
// 事件总线发布事件
bus.post(new Event("hello Guava"));
}
}
/**
* 事件源
*/
class Event {
String msg;
public Event(String msg) {
this.msg = msg;
}
@Override
public String toString() {
return "Event{" + "msg='" + msg + '\'' + '}';
}
}
/**
* 监听器
*/
class EventListener {
/**
* {@link Subscribe} 一个这个代表一个订阅者,EventBus会将符合的事件发布到对应的订阅者上 , 但是不支持java的基本数据类型, int 之类的
*
* @param event
*/
@Subscribe
public void onEvent(Event event) {
System.out.println("当前线程 : " + Thread.currentThread().getName() + ", 接收到事件 : " + event);
}
@Subscribe
public void onStringEvent(String event) {
error(); // 模拟异常
System.out.println("当前线程 : " + Thread.currentThread().getName() + ", 接收到事件 : " + event);
}
private void error() {
int i = 1 / 0;
}
}
Выход :
异常信息 : / by zero, 异常事件 : sb
当前线程 : main, 接收到事件 : Event{msg='hello Guava'}
3. Обоснование
На самом деле это очень просто, при первой регистрации:
/** Registers all subscriber methods on the given listener object. */
void register(Object listener) {
// 其实就是注解扫描 , 然后把元信息都给整出来
Multimap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener);
for (Entry<Class<?>, Collection<Subscriber>> entry : listenerMethods.asMap().entrySet()) {
Class<?> eventType = entry.getKey();
Collection<Subscriber> eventMethodsInListener = entry.getValue();
// 查找有没有
CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType);
// 没有创建一个对象
if (eventSubscribers == null) {
CopyOnWriteArraySet<Subscriber> newSet = new CopyOnWriteArraySet<>();
eventSubscribers =
MoreObjects.firstNonNull(subscribers.putIfAbsent(eventType, newSet), newSet);
}
// 添加进去 ,其实是 Subscriber对象 , 把method信息, 对象信息全部封装进去了
eventSubscribers.addAll(eventMethodsInListener);
}
}
Второй — опубликовать
public void post(Object event) {
// 根据事件获取对应的 Subscriber
Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event);
if (eventSubscribers.hasNext()) {
// 发布出去
dispatcher.dispatch(event, eventSubscribers);
} else if (!(event instanceof DeadEvent)) {
// the event had no subscribers and was not itself a DeadEvent
post(new DeadEvent(this, event));
}
}
@Override
void dispatch(Object event, Iterator<Subscriber> subscribers) {
checkNotNull(event);
checkNotNull(subscribers);
// 获取当前线程的队列 ,用ThreadLocal维护的线程安全, 其实是为了安全
Queue<Event> queueForThread = queue.get();
// 创建一个事件对象
queueForThread.offer(new Event(event, subscribers));
if (!dispatching.get()) {
dispatching.set(true);
try {
Event nextEvent;
while ((nextEvent = queueForThread.poll()) != null) {
while (nextEvent.subscribers.hasNext()) {
// 处理
nextEvent.subscribers.next().dispatchEvent(nextEvent.event);
}
}
} finally {
dispatching.remove();
queue.remove();
}
}
}
/** Dispatches {@code event} to this subscriber using the proper executor. */
final void dispatchEvent(final Object event) {
executor.execute(
new Runnable() {
@Override
public void run() {
try {
// 关键点 ,就是这
invokeSubscriberMethod(event);
} catch (InvocationTargetException e) {
// 出现异常直接就 ... 调用异常回调方法了
bus.handleSubscriberException(e.getCause(), context(event));
}
}
});
}
@VisibleForTesting
void invokeSubscriberMethod(Object event) throws InvocationTargetException {
try {
// 原来如此 , .........method.invok 真.... 所以可以抓取异常
method.invoke(target, checkNotNull(event));
} catch (IllegalArgumentException e) {
throw new Error("Method rejected target/argument: " + event, e);
} catch (IllegalAccessException e) {
throw new Error("Method became inaccessible: " + event, e);
} catch (InvocationTargetException e) {
if (e.getCause() instanceof Error) {
throw (Error) e.getCause();
}
throw e;
}
}
Так что это очень просто, принцип можно проанализировать с первого взгляда
6. Резюме
Мы нашли собственный реализованный слушатель иSpring
а такжеGuava
В чем разница между этими двумя реализациями, это не что иное как реализованный нами режим слушателя.Для управления слушателями мы этого не делали.Мы просто Издатель и Слушатель, отношения один к одному.Это очень плохо, 100 слушателей. Требуется 100 издателей, что не соответствует принципам шаблонов проектирования, поэтому см.Guava
, мы обнаружили, что то, что он делает, является не чем иным, как управлением слушателями, но есть одна деталь, я надеюсь, что все знают, для режима слушателя, в случае сбоя публикации события, как мы узнаем, так чтоGuava
По крайней мере у нас он не основан на механизме обратного вызова, а используетJava
изMethod#invoke
, это зависит от спроса, но обратный вызов более легкий,