в предыдущих статьяхПодписка и публикация событий в Spring Cloud Bus (1)Вводятся связанные события шины сообщений. В этой статье в основном представлены прослушиватель событий шины сообщений, а также подписка и публикация сообщений.
прослушиватель событий
Spring Cloud Bus
, определение прослушивателя событий может быть реализовано какApplicationListener
интерфейс или использовать@EventListener
форма аннотаций. Давайте взглянем на диаграмму классов прослушивателя событий.
ApplicationListener
Есть две реализации интерфейса: прослушиватель обновленияRefreshListener
и слушатель изменения средыEnvironmentChangeListener
.
RefreshListener
RefreshListener
Соответствующее событиеRefreshRemoteApplicationEvent
,
public class RefreshListener
implements ApplicationListener<RefreshRemoteApplicationEvent> {
private ContextRefresher contextRefresher;
public RefreshListener(ContextRefresher contextRefresher) {
this.contextRefresher = contextRefresher;
}
@Override
public void onApplicationEvent(RefreshRemoteApplicationEvent event) {
Set<String> keys = contextRefresher.refresh();
log.info("Received remote refresh request. Keys refreshed " + keys);
}
}
Для обработки времени обновления вызовитеContextRefresher
изrefresh()
метод, в то время как тот, который определен в Spring Cloud ContextContextRefresher
Функция для обеспечения обновления контекста. Давайте взглянемrefresh()
метод.
public synchronized Set<String> refresh() {
Map<String, Object> before = extract(
this.context.getEnvironment().getPropertySources());
addConfigFilesToEnvironment();
Set<String> keys = changes(before,
extract(this.context.getEnvironment().getPropertySources())).keySet();
this.context.publishEvent(new EnvironmentChangeEvent(keys));
this.scope.refreshAll();
return keys;
}
Реализация очень проста: сначала получите ключ-значение предыдущей переменной среды, затем перезагрузите файл новой конфигурации среды, сравните набор карт старой и новой переменных среды, а затем опубликуйте новое изменение среды.EnvironmentChangeEvent
мероприятие.this.scope.refreshAll()
Уничтожает все bean-компоненты текущего экземпляра в этой области и вызывает обновление при следующем выполнении метода.
EnvironmentChangeListener
EnvironmentChangeListener
Соответствующий класс событияEnvironmentChangeRemoteApplicationEvent
.
public class EnvironmentChangeListener
implements ApplicationListener<EnvironmentChangeRemoteApplicationEvent> {
@Autowired
private EnvironmentManager env;
@Override
public void onApplicationEvent(EnvironmentChangeRemoteApplicationEvent event) {
Map<String, String> values = event.getValues();
for (Map.Entry<String, String> entry : values.entrySet()) {
env.setProperty(entry.getKey(), entry.getValue());
}
}
}
существуетRefreshListener
При реализации события вы можете знать, что реализация события, наконец, публикует новое событие.EnvironmentChangeListener
. В прослушивателе обновления создается карта измененных переменных среды, которая передается в прослушиватель изменений среды. Описанная выше обработка события изменения среды проходит через измененные атрибуты среды конфигурации и устанавливает новое значение атрибута в соответствующий ключ в среде локального приложения.
TraceListener
TraceListener
Реализация осуществляется через аннотацию@EventListener
В форме отслеживаемое событие: событие подтвержденияAckRemoteApplicationEvent
и отправить событиеSentApplicationEvent
.
@EventListener
public void onAck(AckRemoteApplicationEvent event) {
this.repository.add(getReceivedTrace(event));
}
@EventListener
public void onSend(SentApplicationEvent event) {
this.repository.add(getSentTrace(event));
}
protected Map<String, Object> getSentTrace(SentApplicationEvent event) {
Map<String, Object> map = new LinkedHashMap<String, Object>();
map.put("signal", "spring.cloud.bus.sent");
map.put("type", event.getType().getSimpleName());
map.put("id", event.getId());
map.put("origin", event.getOriginService());
map.put("destination", event.getDestinationService());
if (log.isDebugEnabled()) {
log.debug(map);
}
return map;
}
protected Map<String, Object> getReceivedTrace(AckRemoteApplicationEvent event) {
Map<String, Object> map = new LinkedHashMap<String, Object>();
map.put("signal", "spring.cloud.bus.ack");
map.put("event", event.getEvent().getSimpleName());
map.put("id", event.getAckId());
map.put("origin", event.getOriginService());
map.put("destination", event.getAckDestinationService());
if (log.isDebugEnabled()) {
log.debug(map);
}
return map;
}
В SentTrace в основном записываются значения атрибутов сигнала, типа типа события, идентификатора, исходного источника службы и пункта назначения службы назначения. В ReceivedTrace он указывает на подтверждение события и в основном записывает значения атрибутов сигнала, события типа события, идентификатора, исходного источника службы и пункта назначения службы назначения. Эта информация хранится в памяти по умолчанию и может быть доступна через/trace
Конечная точка получает самую последнюю информацию о событии, как показано на следующем рисунке:
{
"timestamp": 1517229555629,
"info": {
"signal": "spring.cloud.bus.sent",
"type": "RefreshRemoteApplicationEvent",
"id": "c73a9792-9409-47af-993c-65526edf0070",
"origin": "config-server:8888",
"destination": "config-client:8000:**"
}
},
{
"timestamp": 1517227659384,
"info": {
"signal": "spring.cloud.bus.ack",
"event": "RefreshRemoteApplicationEvent",
"id": "846f3a17-c344-4d29-93f3-01b73c5bf58f",
"origin": "config-client:8000",
"destination": "config-client:8000:**"
}
}
Что касается инициирования событий, мы объясним в следующем разделе в связи с подпиской и публикацией сообщений.
Подписка и публикация сообщений
Spring Cloud Bus
на основеSpring Cloud Stream
, подписываться и публиковать сообщения по определенной теме, а события доставляются другим экземплярам службы в виде сообщений.
определение канала
Поскольку он основан на потоке, давайте сначала посмотрим на определения каналов ввода и вывода.
public interface SpringCloudBusClient {
String INPUT = "springCloudBusInput";
String OUTPUT = "springCloudBusOutput";
@Output(SpringCloudBusClient.OUTPUT)
MessageChannel springCloudBusOutput();
@Input(SpringCloudBusClient.INPUT)
SubscribableChannel springCloudBusInput();
}
Как видите, шина определяетspringCloudBusInput
а такжеspringCloudBusOutput
Два канала для подписки и публикации соответственноspringCloudBus
Новости.
определение атрибута шины
Во-вторых, давайте взглянем на определение атрибута потока в шине. В базовом приложении мы знаем, что тема подписки на автобусspringCloudBus
, давайте взглянем на определения других атрибутов в шине.
@ConfigurationProperties("spring.cloud.bus")
public class BusProperties {
//环境变更相关的属性
private Env env = new Env();
// 刷新事件相关的属性
private Refresh refresh = new Refresh();
//与ack相关的属性
private Ack ack = new Ack();
//与追踪ack相关的属性
private Trace trace = new Trace();
//Spring Cloud Stream消息的话题
private String destination = "springCloudBus";
//标志位,bus是否可用
private boolean enabled = true;
...
}
Приведенный выше атрибут шины устанавливает некоторые значения по умолчанию, что соответствует фактам, мы ничего не делали.spring.cloud.bus
Конфигурация тоже работает нормально. Путем изменения соответствующих свойств в файле конфигурации можно расширить дополнительные функции шины. env,refresh,ack и trace соответствуют разным событиям.В конфигурационном файле есть атрибут switch, который включен по умолчанию. При необходимости мы можем отключить его.
Мониторинг и отправка сообщений
В двух предыдущих частях речь идет об определении потоковых каналов и основных атрибутах Наконец, давайте посмотрим, как отправлять и отслеживать сообщения по заданной теме в шине. Настроено в META-INF/spring.factoriesEnableAutoConfiguration
Элемент конфигурацииBusAutoConfiguration
, он будет автоматически загружаться в контейнер Spring при старте сервиса, а как отправлять и слушать сообщения по заданной теме, таков:
@Configuration
@ConditionalOnBusEnabled //bus启用的开关
@EnableBinding(SpringCloudBusClient.class) //绑定通道
@EnableConfigurationProperties(BusProperties.class)
public class BusAutoConfiguration implements ApplicationEventPublisherAware {
//注入source接口,用于发送消息
@Autowired
@Output(SpringCloudBusClient.OUTPUT)
private MessageChannel cloudBusOutboundChannel;
// 监听RemoteApplicationEvent事件
@EventListener(classes = RemoteApplicationEvent.class)
public void acceptLocal(RemoteApplicationEvent event) {
if (this.serviceMatcher.isFromSelf(event)
&& !(event instanceof AckRemoteApplicationEvent)) {
//当事件是来自自己的并且不是ack事件,则发送消息
this.cloudBusOutboundChannel.send(MessageBuilder.withPayload(event).build());
}
}
//消息的消费,也是事件的发起
@StreamListener(SpringCloudBusClient.INPUT)
public void acceptRemote(RemoteApplicationEvent event) {
if (event instanceof AckRemoteApplicationEvent) {
//ack事件
if (this.bus.getTrace().isEnabled() && !this.serviceMatcher.isFromSelf(event)
&& this.applicationEventPublisher != null) {
//当开启bus追踪且不是自己的ack事件,则通知所有的注册该事件的监听者,否则直接返回
this.applicationEventPublisher.publishEvent(event);
}
return;
}
//消费消息,该消息属于自己
if (this.serviceMatcher.isForSelf(event)
&& this.applicationEventPublisher != null) {
//不是自己发布的事件,正常处理
if (!this.serviceMatcher.isFromSelf(event)) {
this.applicationEventPublisher.publishEvent(event);
}
//消费之后,需要发送ack确认事件
if (this.bus.getAck().isEnabled()) {
AckRemoteApplicationEvent ack = new AckRemoteApplicationEvent(this,
this.serviceMatcher.getServiceId(),
this.bus.getAck().getDestinationService(),
event.getDestinationService(), event.getId(), event.getClass());
this.cloudBusOutboundChannel
.send(MessageBuilder.withPayload(ack).build());
this.applicationEventPublisher.publishEvent(ack);
}
}
//事件追踪相关,若是开启追踪事件则执行
if (this.bus.getTrace().isEnabled() && this.applicationEventPublisher != null) {
// 不论其来源,准备发送事件,发布了之后供本地消费
this.applicationEventPublisher.publishEvent(new SentApplicationEvent(this,
event.getOriginService(), event.getDestinationService(),
event.getId(), event.getClass()));
}
}
//...
}
@ConditionalOnBusEnabled
Annotation — это переключатель шины, который включен по умолчанию.@EnableBinding
границаSpringCloudBusClient
канал, определенный в . Когда служба приложения запускается, класс автоматической настройки загружает bean-компоненты для конечной точки API шины, обновления, отслеживания ACK и конфигурации переменных среды шины.@Output
Указывает, что цель привязки вывода вывода будет создана платформой, и сообщение будет отправлено по этому каналу.
Он также включает в себя два основных метода, перечисленных выше:acceptLocal
а такжеacceptRemote
.
acceptLocal
Это прослушиватель событий, реализованный на основе аннотаций, и тип отслеживаемых событий:RemoteApplicationEvent
, метод обработки этого события заключается в отправке сообщения, когда событие исходит от самого себя, а не является событием подтверждения.
@StreamListener
АннотацияSpring Cloud Stream
предоставляется для идентификации метода как@EnableBinding
Слушатель для связанного входного канала.acceptRemote
метод,
переданные параметрыRemoteApplicationEvent
это сообщение в потоке. Если это событие типа подтверждения, когда включено отслеживание событий и событие не приходит само по себе, событие будет опубликовано.Для событий типа подтверждения обработка завершена;
Публикует событие, если self необходимо обработать событие, а событие исходит не от него самого. Следует отметить, что когда отслеживание событий включено, создается событие подтверждения и событие публикуется; наконец, когда отслеживание событий включено, обработка здесь заключается в регистрации отправленного события, чтобы его можно было опубликовать для локального потребления. , независимо от его источника.
Суммировать
Эта статья находится вПредыдущийОсновываясь на представлении событий в Spring Cloud Bus в сочетании с исходным кодом, мы продолжаем знакомить с прослушивателями событий и тем, как подписка на события и публикация реализуются в шине сообщений. Шина сообщений часто используется для распространения изменений состояния и управления выдачей инструкций. Наиболее распространенным сценарием шины сообщений является обновление информации о конфигурации службы приложений, которую необходимо использовать совместно с Config Server.Конечно, реализация шины сообщений фактически основана на Spring Cloud Stream, который инкапсулирует различное промежуточное ПО MQ, а сгенерированные сообщения на самом деле являются push.Изменения в информации о конфигурации.