Приложение Spring Cloud Stream и пользовательский привязчик RocketMQ: реализация привязки RocketMQ

задняя часть GitHub Spring RabbitMQ

Предисловие: Чжан Тянь, автор этой статьи, представляет собой выдержку из «Продвинутой архитектуры микросервисов Spring Cloud», написанную в соавторстве с автором, которая будет опубликована в августе. В этой статье извлекается содержимое приложения Spring Cloud Stream и пользовательского модуля привязки Rocketmq, а также в основном рассказывается о привязке RocketMQ, которая реализует Spring Cloud Stream.

Механизм Stream Binder

В предыдущей статье я представил основные концепции Spring Cloud Stream и его модель программирования. Кроме того, Spring Cloud Stream предоставляет интерфейс Binder для привязки к внешним очередям сообщений. В этой статье будут описаны основные концепции, основные компоненты и детали реализации Binder SPI. Binder SPI предоставляет механизм связывания для привязки к внешним очередям сообщений через ряд интерфейсов, классов инструментов и механизмов обнаружения. Ключевым моментом SPI является интерфейс Binder, который отвечает за обеспечение конкретной реализации связывания с внешними очередями сообщений.

public interface Binder<T, C extends ConsumerProperties, P extends ProducerProperties> {
    Binding<T> bindConsumer(String name, String group, T inboundBindTarget, C consumerProperties);
    Binding<T> bindProducer(String name, T outboundBindTarget, P producerProperties);
}

Типичная реализация пользовательского компонента Binder должна включать следующее:

  • Класс, реализующий интерфейс Binder.
  • Класс Spring @Configuration для создания экземпляров вышеуказанных типов.
  • Файл META-INF/spring.binders в пути к классам, который содержит пользовательские классы конфигурации, связанные с Binder, например:
kafka:\
org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfiguration

Spring Cloud Stream выполняет задачу привязки канала и очереди сообщений на основе реализации Binder SPI. Различные типы ПО промежуточного слоя очередей сообщений реализуют разные связыватели. Например: Spring-Cloud-Stream-Binder-Kafka — это реализация Binder для Kafka, а Spring-Cloud-Stream-Binder-Rabbit — это реализация Binder для RabbitMQ.

Spring Cloud Stream использует механизм автоматической настройки Spring Boot для настройки Binder. Если реализация Binder найдена в пути к классам проекта, Spring Cloud Stream автоматически использует ее. Например, проекту Spring Cloud Stream необходимо связать связующее ПО промежуточного слоя RabbitMQ, а следующие зависимости добавляются в файл pom для упрощения реализации.

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>

Binder For RocketMQ

Spring Cloud Stream предоставляет полный набор настраиваемых механизмов для доступа к различным очередям сообщений путем разработки Binder для каждой очереди сообщений для доступа к очереди сообщений. В настоящее время официально признанным связующим является связующее вещество RabbitMQ и связующее вещество kafka. Но разработчики могут сделать свой собственный Binder на основе механизма Stream Binder. Давайте создадим простой RocketMQ Binder.

класс конфигурации

Вам необходимо настроить класс конфигурации для RocketMQ в файле resources/META-INF/spring.binders, который будет использовать @Import для импорта класса конфигурации, созданного для RocketMQ.RocketMessageChannelBinderConfiguration.

rocket:\
org.springframework.cloud.stream.binder.rocket.config.RocketServiceAutoConfiguration

RocketMessageChannelBinderConfigurationБудут предоставлены два чрезвычайно важных экземпляра компонента, а именноRocketMessageChannelBinderиRocketExchangeQueueProvisioner.RocketMessageChannelBinderВ основном используется для привязки канала и очереди сообщений, иRocketExchangeQueueProvisionerОн инкапсулирует связанные API-интерфейсы RocketMQ, которые можно использовать для создания основных компонентов очередей сообщений, таких как очереди, обмены и т. д.

@Configuration
public class RocketMessageChannelBinderConfiguration {
    @Autowired
    private ConnectionFactory rocketConnectionFactory;
    @Autowired
    private RocketProperties  rocketProperties;
    @Bean
    RocketMessageChannelBinder rocketMessageChannelBinder() throws Exception {
        RocketMessageChannelBinder binder = new RocketMessageChannelBinder(this.rocketConnectionFactory,
                this.rocketProperties, provisioningProvider());
        return binder;
    }
    @Bean
    RocketExchangeQueueProvisioner provisioningProvider() {
        return new RocketExchangeQueueProvisioner(this.rocketConnectionFactory);
    }
}

RocketMessageChannelBinderунаследованный абстрактный классAbstractMessageChannelBinderи реализует функции #producerMessageHandler и #createConsumerEndpoint.

MessageHandler имеет возможность отправлять сообщения в очередь сообщений.Функция #createProducerMessageHandler предназначена для создания объекта MessageHandler для отправки сообщений выходного канала в очередь сообщений.

protected MessageHandler createProducerMessageHandler(
        ProducerDestination destination,             
        ExtendedProducerProperties<RocketProducerProperties> producerProperties,                    
        MessageChannel errorChannel) 
        throws Exception {
    final AmqpOutboundEndpoint endpoint = new AmqpOutboundEndpoint(
            buildRocketTemplate(producerProperties.getExtension(), errorChannel != null));
    return endpoint;
}

MessageProducer может получать сообщения из очереди сообщений и отправлять сообщение во входной канал.

@Override
protected MessageProducer createConsumerEndpoint(ConsumerDestination consumerDestination, String group,
                                                    ExtendedConsumerProperties<RocketConsumerProperties> properties) throws Exception {
    SimpleRocketMessageListenerContainer listenerContainer = new SimpleRocketMessageListenerContainer();
    RocketInboundChannelAdapter rocketInboundChannelAdapter = new RocketInboundChannelAdapter(listenerContainer);
    return rocketInboundChannelAdapter;
}

Процесс реализации функции приема сообщений

Подобно Binder RabbitMQ, необходимо реализовать следующую серию классов для реализации доставки сообщений из RocketMQ в соответствующий MessageChannel.

  1. RocketBlockingQueueConsumer.InnerConsumer реализует MessageListenerConcurrently для получения сообщений, доставленных RocketMQ.
  2. RocketBlockingQueueConsumer регистрирует InnerConsumer в DefaultMQPushConsumer RocketMQ для получения сообщений от RocketMQ и сохранения их в собственной очереди блокировки. Для получения SimpleRocketMessageListenerContainer.
  3. SimpleRocketMessageListenerContainer запускает поток для непрерывного получения сообщений от RocketBlockingQueueConsumer, а затем вызывает функцию обратного вызова RocketInboundChannelAdapter.Listener для передачи сообщения RocketInboundChannelAdapter.
  4. RocketInboundChannelAdapter.Listener используется SimpleRocketMessageListenerContainer в качестве обратного вызова для отправки сообщений в RocketInboundChannelAdapter.
  5. RocketInboundChannelAdapter принимает сообщение, переданное SimpleRocketMessageListenerContainer, а затем отправляет его в соответствующий MessageChannel через MessageTemplate. Таким образом передается функции, украшенной @StreamListener.

InnerConsumer получает сообщения RocketMQ

Интерфейс MessageListenerConcurrently, реализованный InnerConsumer, представляет собой интерфейс в RocketMQ для одновременного получения асинхронных сообщений.Этот интерфейс может получать асинхронные сообщения, отправленные RocketMQ. После получения сообщения InnerConsumer инкапсулирует сообщение как RocketDelivery и добавит его в очередь блокировки.

RocketBlockingQueueConsumer имеет очередь блокировки для хранения сообщений, доставленных RocketMQ в RocketBlockingQueueConsumer.InnerConsumer, а функция nextMessage может извлекать сообщение из очереди блокировки и возвращаться.

AsyncMessageProcessingConsumer получает сообщение

SimpleRocketMessageListenerContainer.AsyncMessageProcessingConsumer реализует интерфейс Runnable.В интерфейсе run() метод receiveAndExecute самого SimpleRocketMessageListenerContainer будет вызываться в бесконечном цикле.

@Override
public void run() {
    if (!isActive()) {
        return;
    }
    try {
        //只要consumer的状态正常,就会一直循环
        while (isActive(this.consumer) || this.consumer.hasDelivery() || !this.consumer.cancelled()) {
            try {
                boolean receivedOk = receiveAndExecute(this.consumer);
            }
            catch (ListenerExecutionFailedException ex) {
                if (ex.getCause() instanceof NoSuchMethodException) {
                    throw new FatalListenerExecutionException("Invalid listener", ex);
                }
            }
            catch (AmqpRejectAndDontRequeueException rejectEx) {
            } catch (Throwable e) {
            }
        }
    } catch (Exception e) {
    }
    finally {
        if (getTransactionManager() != null) {
            ConsumerChannelRegistry.unRegisterConsumerChannel();
        }
    }
    this.start.countDown();
    if (!isActive(this.consumer) || aborted) {
        this.consumer.stop();
    }
    else {
        restart(this.consumer);
    }
}

Последняя функция функции #receiveAndExecute — вызвать nextMessage класса RocketBlockingQueueConsumer, а затем вызвать функцию messageListener.onMessage для передачи сообщения.

Инициализировать RocketBlockingQueueConsumer и AsyncMessageProcessingConsumer

Функция doStart SimpleRocketMessageListenerContainer инициализирует RocketBlockingQueueConsumer и запускает AsyncMessageProcessingConsumer SimpleRocketMessageListenerContainer для получения сообщений, доставленных RocketMQ из RocketBlockingQueueConsumer в бесконечном цикле.

private void doStart() {
    synchronized (this.lifecycleMonitor) {
        this.active = true;
        this.running = true;
        this.lifecycleMonitor.notifyAll();
    }
    synchronized (this.consumersMonitor) {
        if (this.consumers != null) {
            throw new IllegalStateException("A stopped container should not have consumers");
        }
        //初始化Consumer
        int newConsumers = initializeConsumers();
        if (this.consumers == null) {
            return;
        }
        if (newConsumers <= 0) {
            return;
        }
        Set<SimpleRocketMessageListenerContainer.AsyncMessageProcessingConsumer> processors =
                new HashSet<>();
        //对于每个RocketBlockingQueueConsumer启动一个
        //AsyncMessageProcessingConsumer来执行任务
        for (RocketBlockingQueueConsumer consumer : this.consumers) {
            SimpleRocketMessageListenerContainer.AsyncMessageProcessingConsumer
                    processor = new SimpleRocketMessageListenerContainer.AsyncMessageProcessingConsumer(consumer);
            processors.add(processor);
            getTaskExecutor().execute(processor);
        }
    }
}

Отправить сообщение в MessageChannel

RocketInboundChannelAdapter реализует интерфейс MessageProducer. В основном он передает сообщение из SimpleRocketMessageListenerContainer в MessageChannel через MessageTemplate.

Далее идет реализация RocketInboundChannelAdapter.Listener, который является messageListener в функции RocketBlockingQueueConsumer.nextMessage.

public class Listener implements ChannelAwareMessageListener, RetryListener {
    public void onMessage(Message message, Channel channel) throws Exception {
        try {
            this.createAndSend(message, channel);
        } catch (RuntimeException var7) {
            if (RocketInboundChannelAdapter.this.getErrorChannel() == null) {
                throw var7;
            }
       RocketInboundChannelAdapter.this.getMessagingTemplate().send(RocketInboundChannelAdapter.this.getErrorChannel(), RocketInboundChannelAdapter.this.buildErrorMessage((org.springframework.messaging.Message)null, new ListenerExecutionFailedException("Message conversion failed", var7, message)));
        }
    }
    private void createAndSend(Message message, Channel channel) {
        org.springframework.messaging.Message<Object> messagingMessage = this.createMessage(message, channel);
        RocketInboundChannelAdapter.this.sendMessage(messagingMessage);
    }
    private org.springframework.messaging.Message<Object> createMessage(Message message, Channel channel) {
        Object payload = RocketInboundChannelAdapter.this.messageConverter.fromMessage(message);
        org.springframework.messaging.Message<Object> messagingMessage = RocketInboundChannelAdapter.this.getMessageBuilderFactory().withPayload(payload).build();
        return messagingMessage;
    }
}

Менеджер RocketMQ

RocketProvisioningProvider реализует интерфейс ProvisioningProvider, который имеет две функции: provisionProducerDestination и provisionConsumerDestination, которые используются для создания ProducerDestination и ConsumerDestination соответственно. Реализация RocketProvisioningProvider аналогична RabbitProvisioningProvider. Просто связанные с RocketMQ API, реализованные RocketAdmin, используются при объявлении очередей, обменов и привязок.

Суммировать

В этой статье кратко представлена ​​реализация связывателя Rocketmq Spring Cloud Stream, и не будет объясняться конкретный код из-за нехватки места. Заинтересованные читатели могут следить за кодом на GitHub. Согласно абстрактному интерфейсу Spring Cloud Stream, мы можем свободно реализовывать биндеры для различных очередей сообщений.

Адрес проекта на GitHub: https://github.com/ztelur/spring-cloud-stream-binder-rocket Рекомендуемое чтение:Приложение Spring Cloud Stream и пользовательское связующее устройство RocketMQ: модель программирования

Подписывайтесь на свежие статьи, приглашаю обратить внимание на мой публичный номер

微信公众号