Подписка и публикация событий в Spring Cloud Bus (2)

задняя часть сервер Spring

в предыдущих статьяхПодписка и публикация событий в Spring Cloud Bus (1)Вводятся связанные события шины сообщений. В этой статье в основном представлены прослушиватель событий шины сообщений, а также подписка и публикация сообщений.

прослушиватель событий

Spring Cloud Bus, определение прослушивателя событий может быть реализовано какApplicationListenerинтерфейс или использовать@EventListenerформа аннотаций. Давайте взглянем на диаграмму классов прослушивателя событий.

listener
слушатель
ApplicationListenerЕсть две реализации интерфейса: прослушиватель обновленияRefreshListenerи слушатель изменения средыEnvironmentChangeListener.

RefreshListener

RefreshListenerСоответствующее событиеRefreshRemoteApplicationEvent,

public class RefreshListener
		implements ApplicationListener<RefreshRemoteApplicationEvent> {
	private ContextRefresher contextRefresher;

	public RefreshListener(ContextRefresher contextRefresher) {
		this.contextRefresher = contextRefresher;
	}

	@Override
	public void onApplicationEvent(RefreshRemoteApplicationEvent event) {
		Set<String> keys = contextRefresher.refresh();
		log.info("Received remote refresh request. Keys refreshed " + keys);
	}
}

Для обработки времени обновления вызовитеContextRefresherизrefresh()метод, в то время как тот, который определен в Spring Cloud ContextContextRefresherФункция для обеспечения обновления контекста. Давайте взглянемrefresh()метод.

	public synchronized Set<String> refresh() {
		Map<String, Object> before = extract(
				this.context.getEnvironment().getPropertySources());
		addConfigFilesToEnvironment();
		Set<String> keys = changes(before,
				extract(this.context.getEnvironment().getPropertySources())).keySet();
		this.context.publishEvent(new EnvironmentChangeEvent(keys));
		this.scope.refreshAll();
		return keys;
	}

Реализация очень проста: сначала получите ключ-значение предыдущей переменной среды, затем перезагрузите файл новой конфигурации среды, сравните набор карт старой и новой переменных среды, а затем опубликуйте новое изменение среды.EnvironmentChangeEventмероприятие.this.scope.refreshAll()Уничтожает все bean-компоненты текущего экземпляра в этой области и вызывает обновление при следующем выполнении метода.

EnvironmentChangeListener

EnvironmentChangeListenerСоответствующий класс событияEnvironmentChangeRemoteApplicationEvent.

public class EnvironmentChangeListener
		implements ApplicationListener<EnvironmentChangeRemoteApplicationEvent> {
	@Autowired
	private EnvironmentManager env;

	@Override
	public void onApplicationEvent(EnvironmentChangeRemoteApplicationEvent event) {
		Map<String, String> values = event.getValues();
		for (Map.Entry<String, String> entry : values.entrySet()) {
			env.setProperty(entry.getKey(), entry.getValue());
		}
	}
}

существуетRefreshListenerПри реализации события вы можете знать, что реализация события, наконец, публикует новое событие.EnvironmentChangeListener. В прослушивателе обновления создается карта измененных переменных среды, которая передается в прослушиватель изменений среды. Описанная выше обработка события изменения среды проходит через измененные атрибуты среды конфигурации и устанавливает новое значение атрибута в соответствующий ключ в среде локального приложения.

TraceListener

TraceListenerРеализация осуществляется через аннотацию@EventListenerВ форме отслеживаемое событие: событие подтвержденияAckRemoteApplicationEventи отправить событиеSentApplicationEvent.

@EventListener
	public void onAck(AckRemoteApplicationEvent event) {
		this.repository.add(getReceivedTrace(event));
	}

	@EventListener
	public void onSend(SentApplicationEvent event) {
		this.repository.add(getSentTrace(event));
	}

	protected Map<String, Object> getSentTrace(SentApplicationEvent event) {
		Map<String, Object> map = new LinkedHashMap<String, Object>();
		map.put("signal", "spring.cloud.bus.sent");
		map.put("type", event.getType().getSimpleName());
		map.put("id", event.getId());
		map.put("origin", event.getOriginService());
		map.put("destination", event.getDestinationService());
		if (log.isDebugEnabled()) {
			log.debug(map);
		}
		return map;
	}

	protected Map<String, Object> getReceivedTrace(AckRemoteApplicationEvent event) {
		Map<String, Object> map = new LinkedHashMap<String, Object>();
		map.put("signal", "spring.cloud.bus.ack");
		map.put("event", event.getEvent().getSimpleName());
		map.put("id", event.getAckId());
		map.put("origin", event.getOriginService());
		map.put("destination", event.getAckDestinationService());
		if (log.isDebugEnabled()) {
			log.debug(map);
		}
		return map;
	}

В SentTrace в основном записываются значения атрибутов сигнала, типа типа события, идентификатора, исходного источника службы и пункта назначения службы назначения. В ReceivedTrace он указывает на подтверждение события и в основном записывает значения атрибутов сигнала, события типа события, идентификатора, исходного источника службы и пункта назначения службы назначения. Эта информация хранится в памяти по умолчанию и может быть доступна через/traceКонечная точка получает самую последнюю информацию о событии, как показано на следующем рисунке:

{
    "timestamp": 1517229555629,
    "info": {
        "signal": "spring.cloud.bus.sent",
        "type": "RefreshRemoteApplicationEvent",
        "id": "c73a9792-9409-47af-993c-65526edf0070",
        "origin": "config-server:8888",
        "destination": "config-client:8000:**"
    }
},
{
	"timestamp": 1517227659384,
	"info": {
	    "signal": "spring.cloud.bus.ack",
	    "event": "RefreshRemoteApplicationEvent",
	    "id": "846f3a17-c344-4d29-93f3-01b73c5bf58f",
	    "origin": "config-client:8000",
	    "destination": "config-client:8000:**"
	}
}

Что касается инициирования событий, мы объясним в следующем разделе в связи с подпиской и публикацией сообщений.

Подписка и публикация сообщений

Spring Cloud Busна основеSpring Cloud Stream, подписываться и публиковать сообщения по определенной теме, а события доставляются другим экземплярам службы в виде сообщений.

определение канала

Поскольку он основан на потоке, давайте сначала посмотрим на определения каналов ввода и вывода.

public interface SpringCloudBusClient {

String INPUT = "springCloudBusInput";

String OUTPUT = "springCloudBusOutput";

@Output(SpringCloudBusClient.OUTPUT)
MessageChannel springCloudBusOutput();

@Input(SpringCloudBusClient.INPUT)
SubscribableChannel springCloudBusInput();
}

Как видите, шина определяетspringCloudBusInputа такжеspringCloudBusOutputДва канала для подписки и публикации соответственноspringCloudBusНовости.

определение атрибута шины

Во-вторых, давайте взглянем на определение атрибута потока в шине. В базовом приложении мы знаем, что тема подписки на автобусspringCloudBus, давайте взглянем на определения других атрибутов в шине.

@ConfigurationProperties("spring.cloud.bus")
public class BusProperties {

//环境变更相关的属性
private Env env = new Env();
// 刷新事件相关的属性
private Refresh refresh = new Refresh();
//与ack相关的属性
private Ack ack = new Ack();
//与追踪ack相关的属性
private Trace trace = new Trace();
//Spring Cloud Stream消息的话题
private String destination = "springCloudBus";

//标志位,bus是否可用
private boolean enabled = true;

...
}

Приведенный выше атрибут шины устанавливает некоторые значения по умолчанию, что соответствует фактам, мы ничего не делали.spring.cloud.busКонфигурация тоже работает нормально. Путем изменения соответствующих свойств в файле конфигурации можно расширить дополнительные функции шины. env,refresh,ack и trace соответствуют разным событиям.В конфигурационном файле есть атрибут switch, который включен по умолчанию. При необходимости мы можем отключить его.

Мониторинг и отправка сообщений

В двух предыдущих частях речь идет об определении потоковых каналов и основных атрибутах Наконец, давайте посмотрим, как отправлять и отслеживать сообщения по заданной теме в шине. Настроено в META-INF/spring.factoriesEnableAutoConfigurationЭлемент конфигурацииBusAutoConfiguration, он будет автоматически загружаться в контейнер Spring при старте сервиса, а как отправлять и слушать сообщения по заданной теме, таков:

@Configuration
@ConditionalOnBusEnabled //bus启用的开关
@EnableBinding(SpringCloudBusClient.class) //绑定通道
@EnableConfigurationProperties(BusProperties.class)
public class BusAutoConfiguration implements ApplicationEventPublisherAware {

//注入source接口,用于发送消息
@Autowired
@Output(SpringCloudBusClient.OUTPUT)
private MessageChannel cloudBusOutboundChannel;

// 监听RemoteApplicationEvent事件
@EventListener(classes = RemoteApplicationEvent.class)
public void acceptLocal(RemoteApplicationEvent event) {
    if (this.serviceMatcher.isFromSelf(event)
            && !(event instanceof AckRemoteApplicationEvent)) {
        //当事件是来自自己的并且不是ack事件,则发送消息
    this.cloudBusOutboundChannel.send(MessageBuilder.withPayload(event).build());
    }
}
//消息的消费,也是事件的发起
@StreamListener(SpringCloudBusClient.INPUT)
public void acceptRemote(RemoteApplicationEvent event) {
    if (event instanceof AckRemoteApplicationEvent) {
        //ack事件
        if (this.bus.getTrace().isEnabled() && !this.serviceMatcher.isFromSelf(event)
                && this.applicationEventPublisher != null) {
            //当开启bus追踪且不是自己的ack事件,则通知所有的注册该事件的监听者,否则直接返回
            this.applicationEventPublisher.publishEvent(event);
        }
        return;
    }
    //消费消息,该消息属于自己
    if (this.serviceMatcher.isForSelf(event)
            && this.applicationEventPublisher != null) {
        //不是自己发布的事件,正常处理
        if (!this.serviceMatcher.isFromSelf(event)) {
            this.applicationEventPublisher.publishEvent(event);
        }
        //消费之后,需要发送ack确认事件
        if (this.bus.getAck().isEnabled()) {
            AckRemoteApplicationEvent ack = new AckRemoteApplicationEvent(this,
                    this.serviceMatcher.getServiceId(),
                    this.bus.getAck().getDestinationService(),
                    event.getDestinationService(), event.getId(), event.getClass());
            this.cloudBusOutboundChannel
                    .send(MessageBuilder.withPayload(ack).build());
            this.applicationEventPublisher.publishEvent(ack);
        }
    }
    //事件追踪相关,若是开启追踪事件则执行
    if (this.bus.getTrace().isEnabled() && this.applicationEventPublisher != null) {
        // 不论其来源,准备发送事件,发布了之后供本地消费
        this.applicationEventPublisher.publishEvent(new SentApplicationEvent(this,
                event.getOriginService(), event.getDestinationService(),
                event.getId(), event.getClass()));
    }
}

//...
}

@ConditionalOnBusEnabledAnnotation — это переключатель шины, который включен по умолчанию.@EnableBindingграницаSpringCloudBusClientканал, определенный в . Когда служба приложения запускается, класс автоматической настройки загружает bean-компоненты для конечной точки API шины, обновления, отслеживания ACK и конфигурации переменных среды шины.@OutputУказывает, что цель привязки вывода вывода будет создана платформой, и сообщение будет отправлено по этому каналу. Он также включает в себя два основных метода, перечисленных выше:acceptLocalа такжеacceptRemote.

acceptLocalЭто прослушиватель событий, реализованный на основе аннотаций, и тип отслеживаемых событий:RemoteApplicationEvent, метод обработки этого события заключается в отправке сообщения, когда событие исходит от самого себя, а не является событием подтверждения.

@StreamListenerАннотацияSpring Cloud Streamпредоставляется для идентификации метода как@EnableBindingСлушатель для связанного входного канала.acceptRemoteметод, переданные параметрыRemoteApplicationEventэто сообщение в потоке. Если это событие типа подтверждения, когда включено отслеживание событий и событие не приходит само по себе, событие будет опубликовано.Для событий типа подтверждения обработка завершена; Публикует событие, если self необходимо обработать событие, а событие исходит не от него самого. Следует отметить, что когда отслеживание событий включено, создается событие подтверждения и событие публикуется; наконец, когда отслеживание событий включено, обработка здесь заключается в регистрации отправленного события, чтобы его можно было опубликовать для локального потребления. , независимо от его источника.

Суммировать

Эта статья находится вПредыдущийОсновываясь на представлении событий в Spring Cloud Bus в сочетании с исходным кодом, мы продолжаем знакомить с прослушивателями событий и тем, как подписка на события и публикация реализуются в шине сообщений. Шина сообщений часто используется для распространения изменений состояния и управления выдачей инструкций. Наиболее распространенным сценарием шины сообщений является обновление информации о конфигурации службы приложений, которую необходимо использовать совместно с Config Server.Конечно, реализация шины сообщений фактически основана на Spring Cloud Stream, который инкапсулирует различное промежуточное ПО MQ, а сгенерированные сообщения на самом деле являются push.Изменения в информации о конфигурации.

Подписывайтесь на свежие статьи, приглашаю обратить внимание на мой публичный номер

微信公众号

Ссылаться на

Spring Cloud Bus-v1.3.3