Предисловие: Чжан Тянь, автор этой статьи, представляет собой выдержку из «Продвинутой архитектуры микросервисов Spring Cloud», написанную в соавторстве с автором, которая будет опубликована в августе. В этой статье извлекается содержимое приложения Spring Cloud Stream и пользовательского модуля привязки Rocketmq, а также в основном рассказывается о привязке RocketMQ, которая реализует Spring Cloud Stream.
Механизм Stream Binder
В предыдущей статье я представил основные концепции Spring Cloud Stream и его модель программирования. Кроме того, Spring Cloud Stream предоставляет интерфейс Binder для привязки к внешним очередям сообщений. В этой статье будут описаны основные концепции, основные компоненты и детали реализации Binder SPI. Binder SPI предоставляет механизм связывания для привязки к внешним очередям сообщений через ряд интерфейсов, классов инструментов и механизмов обнаружения. Ключевым моментом SPI является интерфейс Binder, который отвечает за обеспечение конкретной реализации связывания с внешними очередями сообщений.
public interface Binder<T, C extends ConsumerProperties, P extends ProducerProperties> {
Binding<T> bindConsumer(String name, String group, T inboundBindTarget, C consumerProperties);
Binding<T> bindProducer(String name, T outboundBindTarget, P producerProperties);
}
Типичная реализация пользовательского компонента Binder должна включать следующее:
- Класс, реализующий интерфейс Binder.
- Класс Spring @Configuration для создания экземпляров вышеуказанных типов.
- Файл META-INF/spring.binders в пути к классам, который содержит пользовательские классы конфигурации, связанные с Binder, например:
kafka:\
org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfiguration
Spring Cloud Stream выполняет задачу привязки канала и очереди сообщений на основе реализации Binder SPI. Различные типы ПО промежуточного слоя очередей сообщений реализуют разные связыватели. Например: Spring-Cloud-Stream-Binder-Kafka — это реализация Binder для Kafka, а Spring-Cloud-Stream-Binder-Rabbit — это реализация Binder для RabbitMQ.
Spring Cloud Stream использует механизм автоматической настройки Spring Boot для настройки Binder. Если реализация Binder найдена в пути к классам проекта, Spring Cloud Stream автоматически использует ее. Например, проекту Spring Cloud Stream необходимо связать связующее ПО промежуточного слоя RabbitMQ, а следующие зависимости добавляются в файл pom для упрощения реализации.
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
Binder For RocketMQ
Spring Cloud Stream предоставляет полный набор настраиваемых механизмов для доступа к различным очередям сообщений путем разработки Binder для каждой очереди сообщений для доступа к очереди сообщений. В настоящее время официально признанным связующим является связующее вещество RabbitMQ и связующее вещество kafka. Но разработчики могут сделать свой собственный Binder на основе механизма Stream Binder. Давайте создадим простой RocketMQ Binder.
класс конфигурации
Вам необходимо настроить класс конфигурации для RocketMQ в файле resources/META-INF/spring.binders, который будет использовать @Import для импорта класса конфигурации, созданного для RocketMQ.RocketMessageChannelBinderConfiguration
.
rocket:\
org.springframework.cloud.stream.binder.rocket.config.RocketServiceAutoConfiguration
RocketMessageChannelBinderConfiguration
Будут предоставлены два чрезвычайно важных экземпляра компонента, а именноRocketMessageChannelBinder
иRocketExchangeQueueProvisioner
.RocketMessageChannelBinder
В основном используется для привязки канала и очереди сообщений, иRocketExchangeQueueProvisioner
Он инкапсулирует связанные API-интерфейсы RocketMQ, которые можно использовать для создания основных компонентов очередей сообщений, таких как очереди, обмены и т. д.
@Configuration
public class RocketMessageChannelBinderConfiguration {
@Autowired
private ConnectionFactory rocketConnectionFactory;
@Autowired
private RocketProperties rocketProperties;
@Bean
RocketMessageChannelBinder rocketMessageChannelBinder() throws Exception {
RocketMessageChannelBinder binder = new RocketMessageChannelBinder(this.rocketConnectionFactory,
this.rocketProperties, provisioningProvider());
return binder;
}
@Bean
RocketExchangeQueueProvisioner provisioningProvider() {
return new RocketExchangeQueueProvisioner(this.rocketConnectionFactory);
}
}
RocketMessageChannelBinder
унаследованный абстрактный классAbstractMessageChannelBinder
и реализует функции #producerMessageHandler и #createConsumerEndpoint.
MessageHandler имеет возможность отправлять сообщения в очередь сообщений.Функция #createProducerMessageHandler предназначена для создания объекта MessageHandler для отправки сообщений выходного канала в очередь сообщений.
protected MessageHandler createProducerMessageHandler(
ProducerDestination destination,
ExtendedProducerProperties<RocketProducerProperties> producerProperties,
MessageChannel errorChannel)
throws Exception {
final AmqpOutboundEndpoint endpoint = new AmqpOutboundEndpoint(
buildRocketTemplate(producerProperties.getExtension(), errorChannel != null));
return endpoint;
}
MessageProducer может получать сообщения из очереди сообщений и отправлять сообщение во входной канал.
@Override
protected MessageProducer createConsumerEndpoint(ConsumerDestination consumerDestination, String group,
ExtendedConsumerProperties<RocketConsumerProperties> properties) throws Exception {
SimpleRocketMessageListenerContainer listenerContainer = new SimpleRocketMessageListenerContainer();
RocketInboundChannelAdapter rocketInboundChannelAdapter = new RocketInboundChannelAdapter(listenerContainer);
return rocketInboundChannelAdapter;
}
Процесс реализации функции приема сообщений
Подобно Binder RabbitMQ, необходимо реализовать следующую серию классов для реализации доставки сообщений из RocketMQ в соответствующий MessageChannel.
- RocketBlockingQueueConsumer.InnerConsumer реализует MessageListenerConcurrently для получения сообщений, доставленных RocketMQ.
- RocketBlockingQueueConsumer регистрирует InnerConsumer в DefaultMQPushConsumer RocketMQ для получения сообщений от RocketMQ и сохранения их в собственной очереди блокировки. Для получения SimpleRocketMessageListenerContainer.
- SimpleRocketMessageListenerContainer запускает поток для непрерывного получения сообщений от RocketBlockingQueueConsumer, а затем вызывает функцию обратного вызова RocketInboundChannelAdapter.Listener для передачи сообщения RocketInboundChannelAdapter.
- RocketInboundChannelAdapter.Listener используется SimpleRocketMessageListenerContainer в качестве обратного вызова для отправки сообщений в RocketInboundChannelAdapter.
- RocketInboundChannelAdapter принимает сообщение, переданное SimpleRocketMessageListenerContainer, а затем отправляет его в соответствующий MessageChannel через MessageTemplate. Таким образом передается функции, украшенной @StreamListener.
InnerConsumer получает сообщения RocketMQ
Интерфейс MessageListenerConcurrently, реализованный InnerConsumer, представляет собой интерфейс в RocketMQ для одновременного получения асинхронных сообщений.Этот интерфейс может получать асинхронные сообщения, отправленные RocketMQ. После получения сообщения InnerConsumer инкапсулирует сообщение как RocketDelivery и добавит его в очередь блокировки.
RocketBlockingQueueConsumer имеет очередь блокировки для хранения сообщений, доставленных RocketMQ в RocketBlockingQueueConsumer.InnerConsumer, а функция nextMessage может извлекать сообщение из очереди блокировки и возвращаться.
AsyncMessageProcessingConsumer получает сообщение
SimpleRocketMessageListenerContainer.AsyncMessageProcessingConsumer реализует интерфейс Runnable.В интерфейсе run() метод receiveAndExecute самого SimpleRocketMessageListenerContainer будет вызываться в бесконечном цикле.
@Override
public void run() {
if (!isActive()) {
return;
}
try {
//只要consumer的状态正常,就会一直循环
while (isActive(this.consumer) || this.consumer.hasDelivery() || !this.consumer.cancelled()) {
try {
boolean receivedOk = receiveAndExecute(this.consumer);
}
catch (ListenerExecutionFailedException ex) {
if (ex.getCause() instanceof NoSuchMethodException) {
throw new FatalListenerExecutionException("Invalid listener", ex);
}
}
catch (AmqpRejectAndDontRequeueException rejectEx) {
} catch (Throwable e) {
}
}
} catch (Exception e) {
}
finally {
if (getTransactionManager() != null) {
ConsumerChannelRegistry.unRegisterConsumerChannel();
}
}
this.start.countDown();
if (!isActive(this.consumer) || aborted) {
this.consumer.stop();
}
else {
restart(this.consumer);
}
}
Последняя функция функции #receiveAndExecute — вызвать nextMessage класса RocketBlockingQueueConsumer, а затем вызвать функцию messageListener.onMessage для передачи сообщения.
Инициализировать RocketBlockingQueueConsumer и AsyncMessageProcessingConsumer
Функция doStart SimpleRocketMessageListenerContainer инициализирует RocketBlockingQueueConsumer и запускает AsyncMessageProcessingConsumer SimpleRocketMessageListenerContainer для получения сообщений, доставленных RocketMQ из RocketBlockingQueueConsumer в бесконечном цикле.
private void doStart() {
synchronized (this.lifecycleMonitor) {
this.active = true;
this.running = true;
this.lifecycleMonitor.notifyAll();
}
synchronized (this.consumersMonitor) {
if (this.consumers != null) {
throw new IllegalStateException("A stopped container should not have consumers");
}
//初始化Consumer
int newConsumers = initializeConsumers();
if (this.consumers == null) {
return;
}
if (newConsumers <= 0) {
return;
}
Set<SimpleRocketMessageListenerContainer.AsyncMessageProcessingConsumer> processors =
new HashSet<>();
//对于每个RocketBlockingQueueConsumer启动一个
//AsyncMessageProcessingConsumer来执行任务
for (RocketBlockingQueueConsumer consumer : this.consumers) {
SimpleRocketMessageListenerContainer.AsyncMessageProcessingConsumer
processor = new SimpleRocketMessageListenerContainer.AsyncMessageProcessingConsumer(consumer);
processors.add(processor);
getTaskExecutor().execute(processor);
}
}
}
Отправить сообщение в MessageChannel
RocketInboundChannelAdapter реализует интерфейс MessageProducer. В основном он передает сообщение из SimpleRocketMessageListenerContainer в MessageChannel через MessageTemplate.
Далее идет реализация RocketInboundChannelAdapter.Listener, который является messageListener в функции RocketBlockingQueueConsumer.nextMessage.
public class Listener implements ChannelAwareMessageListener, RetryListener {
public void onMessage(Message message, Channel channel) throws Exception {
try {
this.createAndSend(message, channel);
} catch (RuntimeException var7) {
if (RocketInboundChannelAdapter.this.getErrorChannel() == null) {
throw var7;
}
RocketInboundChannelAdapter.this.getMessagingTemplate().send(RocketInboundChannelAdapter.this.getErrorChannel(), RocketInboundChannelAdapter.this.buildErrorMessage((org.springframework.messaging.Message)null, new ListenerExecutionFailedException("Message conversion failed", var7, message)));
}
}
private void createAndSend(Message message, Channel channel) {
org.springframework.messaging.Message<Object> messagingMessage = this.createMessage(message, channel);
RocketInboundChannelAdapter.this.sendMessage(messagingMessage);
}
private org.springframework.messaging.Message<Object> createMessage(Message message, Channel channel) {
Object payload = RocketInboundChannelAdapter.this.messageConverter.fromMessage(message);
org.springframework.messaging.Message<Object> messagingMessage = RocketInboundChannelAdapter.this.getMessageBuilderFactory().withPayload(payload).build();
return messagingMessage;
}
}
Менеджер RocketMQ
RocketProvisioningProvider реализует интерфейс ProvisioningProvider, который имеет две функции: provisionProducerDestination и provisionConsumerDestination, которые используются для создания ProducerDestination и ConsumerDestination соответственно. Реализация RocketProvisioningProvider аналогична RabbitProvisioningProvider. Просто связанные с RocketMQ API, реализованные RocketAdmin, используются при объявлении очередей, обменов и привязок.
Суммировать
В этой статье кратко представлена реализация связывателя Rocketmq Spring Cloud Stream, и не будет объясняться конкретный код из-за нехватки места. Заинтересованные читатели могут следить за кодом на GitHub. Согласно абстрактному интерфейсу Spring Cloud Stream, мы можем свободно реализовывать биндеры для различных очередей сообщений.
Адрес проекта на GitHub: https://github.com/ztelur/spring-cloud-stream-binder-rocket Рекомендуемое чтение:Приложение Spring Cloud Stream и пользовательское связующее устройство RocketMQ: модель программирования