Inventory MQ: неглубокая запись очереди сообщений RabbitMQ

Java
Inventory MQ: неглубокая запись очереди сообщений RabbitMQ

Общая документация:Каталог статей
Github : github.com/black-ant

предисловие

В этой статье я постараюсь рассказать о чуть более глубоком техническом моменте RabbitMQ, надеясь объяснить его ясно и ясно.

Введение RabbitMQ может относиться кОбзор MQ, я не буду здесь слишком подробно описывать, давайте поговорим о четырех из них по очереди.

1. Основное использование

1.1 Детали конфигурации

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

application.yml

spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest

1.2 Потребители

@RabbitListener(bindings = @QueueBinding(value = @Queue("DirectA"),key = "ONE",exchange = @Exchange(name = "DirectExchange", type = ExchangeTypes.DIRECT)))
public void processA(String message) {
    logger.info("------> DirectA 发送接收成功  :{}<-------", message);
}

1.3 Производители

rabbitTemplate.convertAndSend("DirectExchange", "ONE", "发送消息 :" + msg);

image.png

На картинке видно, что DirectExchange на самом деле привязан к трем очередям, поэтому мои многочисленные потребители получили сообщения.

1.4 Четыре режима передачи

fanout => сообщения, отправленные на этот обмен, отправляются во все связанные очереди

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue("FanoutA"),
            key = "ONE",
            exchange = @Exchange(name = "FanoutExchange", type = ExchangeTypes.FANOUT)
    ))
    public void processA(String message) {
        logger.info("------> FanoutA 发送接收成功  :{}<-------", message);
    }

сообщение direct => направляется в ту же очередь с BindingKey и RoutingKey

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue("DirectA"),
            key = "ONE",
            exchange = @Exchange(name = "DirectExchange", type = ExchangeTypes.DIRECT)
    ))
    public void processA(String message) {
        logger.info("------> DirectA 发送接收成功  :{}<-------", message);
    }

top => BindingKey использует нечеткое соответствие */#

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue("TopicA"),
            key = "ONE.*",
            exchange = @Exchange(name = "TopicExchange", type = ExchangeTypes.TOPIC)
    ))
    public void processA(String message) {
        logger.info("------> TopicA 发送接收成功  :{}<-------", message);
    }

заголовок => соответствует атрибуту заголовка содержимого сообщения

1.5 Низкоуровневое использование

1.5.1 Создание соединения для отправки сообщения

// 以下是一个非Spring发送的完整流程 ,在 SpringBoot 中 , 这些流程被代理了
ConnectionFactory connectionFactory = new CachingConnectionFactory();

AmqpAdmin admin = new RabbitAdmin(connectionFactory);
admin.declareQueue(new Queue("myqueue"));

AmqpTemplate template = new RabbitTemplate(connectionFactory);
template.convertAndSend("myqueue", "foo");

String foo = (String) template.receiveAndConvert("myqueue");

1.5.2 Создание привязки


// Direct
new Binding(someQueue, someDirectExchange, "foo.bar");

// Topic
new Binding(someQueue, someTopicExchange, "foo.*");

// Fanout
new Binding(someQueue, someFanoutExchange);

// 

1.5.3 Фабрика соединений

RabbitTemplate предоставляет три фабрики соединений:

  • PooledChannelConnectionFactory
    • Обычно используемая фабрика соединений на основе пула соединений (commons-pool2)
    • Поддерживает простое подтверждение издателя
  • ThreadChannelConnectionFactory
    • Операции области необходимы для обеспечения строгого порядка сообщений.
    • Поддерживает простое подтверждение издателя
    • Эта фабрика гарантирует, что все операции в одном потоке используют один и тот же канал.
  • CachingConnectionFactory
    • Через CacheMode можно открыть несколько подключений (общие подключения, отличные от пулов подключений).

    • Может быть подтверждено соответствующим издателем

    • Поддерживает простое подтверждение издателя

PS: для первых двух требуется пакет spring-rabbit

Три пула соединений обеспечивают различные функции, которые мы можем выбрать и настроить в соответствии с нашими собственными проектами.

Как создается PooledChannelConnectionFactory

@Bean
PooledChannelConnectionFactory pcf() throws Exception {
    ConnectionFactory rabbitConnectionFactory = new ConnectionFactory();
    rabbitConnectionFactory.setHost("localhost");
    PooledChannelConnectionFactory pcf = new PooledChannelConnectionFactory(rabbitConnectionFactory);
    pcf.setPoolConfigurer((pool, tx) -> {
        if (tx) {
            // configure the transactional pool
        }
        else {
            // configure the non-transactional pool
        }
    });
    return pcf;
}

// 我们来看一下源码 : 

C- ThreadChannelConnectionFactory
    C- ConnectionWrapper
        private final ThreadLocal<Channel> channels = new ThreadLocal<>();
        private final ThreadLocal<Channel> txChannels = new ThreadLocal<>();
        ?- 内部主要通过2个ThreadLocal 来存储 Channel , 之所以是2个其中一个是为事务通道准备的
        
        
C- PooledChannelConnectionFactory
    C- ConnectionWrapper
        private final ObjectPool<Channel> channels;
        private final ObjectPool<Channel> txChannels;
        ?- ObjectPool 是 CommonPool 的组件


// 

CachingConnectionFactory создает новое соединение

CachingConnectionFactory connectionFactory = new CachingConnectionFactory("somehost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");

Connection connection = connectionFactory.createConnection();


// PS : 注意其中的构造器 ,其提供了不限于一种的构造方式
public CachingConnectionFactory(@Nullable String hostname)
public CachingConnectionFactory(int port)
public CachingConnectionFactory(@Nullable String hostNameArg, int port)
public CachingConnectionFactory(URI uri)
public CachingConnectionFactory(com.rabbitmq.client.ConnectionFactory rabbitConnectionFactory)
private CachingConnectionFactory(com.rabbitmq.client.ConnectionFactory rabbitConnectionFactory,
			boolean isPublisherFactory)
// 以及提供了一个静态构造器
private static com.rabbitmq.client.ConnectionFactory newRabbitConnectionFactory()


// 添加自定义客户端连接属性
?- 访问基础连接工厂 , 设置自定义客户机属性
connectionFactory.getRabbitConnectionFactory().getClientProperties().put("thing1", "thing2");


// 再深入一点 : 
1 内部类 CacheMode : 缓存模式 , 包括 CHANNEL , CONNECTION

// CHANNEL 相关属性
- connectionName : 由 ConnectionNameStrategy 生成的连接的名称
- channelCacheSize : 当前配置的允许空闲的最大通道
- localPort : 连接的本地端口
- idleChannelsTx : 当前空闲的事务通道数(缓存)
- idleChannelsNotTx : 当前空闲的非事务性通道的数量(缓存)
- idleChannelsTxHighWater : 已并发空闲的事务通道的最大数量(缓存)
- idleChannelsNotTxHighWater : 非事务性通道的最大数目已并发空闲(缓存)

// CONNECTION 相关属性
- connectionName:<localPort> : 由 ConnectionNameStrategy 生成的连接的名称
- openConnections : 表示到代理的连接的连接对象的数量
- channelCacheSize : 当前配置的允许空闲的最大通道
- connectionCacheSize : 允许空闲的当前配置的最大连接
- idleConnections : 当前空闲的连接数
- idleConnectionsHighWater : 已并发空闲的最大连接数
- idleChannelsTx:<localPort> : 此连接当前空闲的事务通道数(缓存)
- idleChannelsNotTx:<localPort> : 此连接当前空闲的非事务性通道数(缓存)
- idleChannelsTxHighWater:<localPort> : 已并发空闲的事务通道的最大数量(缓存)
- idleChannelsNotTxHighWater:<localPort> : 非事务性通道的最大数目已并发空闲(缓存)


1.5.4 Отправка сообщения

void send(Message message) throws AmqpException;
void send(String routingKey, Message message) throws AmqpException;
void send(String exchange, String routingKey, Message message) throws AmqpException;

// 同样 , 在业务中我们可以使用更合理的发送方式
RabbitTemplate template = new RabbitTemplate(); 
template.setRoutingKey("queue.helloWorld"); 

// Template 同样可以定制 , 例如
template.setConfirmCallback(...);

// 这里可以定制配置
MessageProperties properties = new MessageProperties();
//... 省略
template.send(new Message("Hello World".getBytes(), properties));




// 同样 , 可以对 Message 进行详细处理
Message message = MessageBuilder.withBody("foo".getBytes())
    .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
    .setMessageId("123")
    .setHeader("bar", "baz")
    .build();
public static MessageBuilder withBody(byte[] body) 
    ?- 生成器创建的消息有一个主体,它是对参数的直接引用
    - return new MessageBuilder(body);
public static MessageBuilder withClonedBody(byte[] body) 
    ?- Arrays 新建了一个数组
    - return new MessageBuilder(Arrays.copyOf(body, body.length));
public static MessageBuilder withBody(byte[] body, int from, int to) 
    - return new MessageBuilder(Arrays.copyOfRange(body, from, to));
public static MessageBuilder fromMessage(Message message) 
    - return new MessageBuilder(message);
public static MessageBuilder fromClonedMessage(Message message) 
    - byte[] body = message.getBody();
    - return new MessageBuilder(Arrays.copyOf(body, body.length), message.getMessageProperties());
    
    

// MessageProperties 扩展方式
MessageProperties props = MessagePropertiesBuilder.newInstance()
    .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
    .setMessageId("123")
    .setHeader("bar", "baz")
    .build();
    
public static MessagePropertiesBuilder newInstance() 
    ?-  使用默认值初始化新的消息属性对象
public static MessagePropertiesBuilder fromProperties(MessageProperties properties) 
    ?- return new MessagePropertiesBuilder(properties) : 通过传入的 properties 创建
public static MessagePropertiesBuilder fromClonedProperties(MessageProperties properties) 
    ?- 内部进行了 builder.copyProperties(properties);

1.5.5 Получение сообщений

Message receive() throws AmqpException;
Message receive(String queueName) throws AmqpException;
Message receive(long timeoutMillis) throws AmqpException;
Message receive(String queueName, long timeoutMillis) throws AmqpException;

Object receiveAndConvert() throws AmqpException;
Object receiveAndConvert(String queueName) throws AmqpException;
Object receiveAndConvert(long timeoutMillis) throws AmqpException;
Object receiveAndConvert(String queueName, long timeoutMillis) throws AmqpException;

<R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> callback)throws AmqpException;
<R, S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback) throws AmqpException;
<R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> callback,String replyExchange, String replyRoutingKey) throws AmqpException;
<R, S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback,String replyExchange, String replyRoutingKey) throws AmqpException;
<R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> callback,ReplyToAddressCallback<S> replyToAddressCallback) throws AmqpException;
<R, S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback,ReplyToAddressCallback<S> replyToAddressCallback) throws AmqpException;

1.5.6 Включить аннотацию

@EnableRabbit

2. Анализ исходного кода

2.1 Базовый интерфейс


// Exchange 接口表示 AMQP Exchange,即消息生成器发送到的内容。
// 代理虚拟主机中的每个 Exchange 都有唯一的名称以及一些其他属性
public interface Exchange {
    String getName();
    // direct、 topic、 fanout 和 header
    String getExchangeType();
    boolean isDurable();
    boolean isAutoDelete();
    Map<String, Object> getArguments();
}

// Queue 类表示消息使用者从中接收消息的组件。
// 与各种 Exchange 类一样,我们的实现旨在作为这种核心 AMQP 类型的抽象表示
public class Queue  {
    private final String name;
    private volatile boolean durable;
    private volatile boolean exclusive;
    private volatile boolean autoDelete;
    private volatile Map<String, Object> arguments;
    public Queue(String name) {
        this(name, true, false, false);
    }
}

2.2 Краткое описание основных классов

Ранее я упоминал об основном использовании загрузки без Spring, которая в основном включаетConnectionFactory ,AmqpAdmin ,Queue, в этой колонке нужно говорить только о связанных классах предварительной обработки

2.2.1 Серия заводских соединений

Сначала рассмотрим эти три класса:

ConnectionFactory :

ConnectionFactory здесь в основном CachingConnectionFactory , который похож на кешированный пул соединений, CHANNEL (по умолчанию) возвращает одно и то же соединение из всех вызовов createConnection() и игнорирует вызовы Connection.close() и кэширует CHANNEL

По умолчанию кэшируется только один канал, а каналы для дальнейших запросов будут создаваться и обрабатываться по мере необходимости.

// 默认缓存大小
private static final int DEFAULT_CHANNEL_CACHE_SIZE = 25;
private static final String DEFAULT_DEFERRED_POOL_PREFIX = "spring-rabbit-deferred-pool-";
// channel 超时时间
private static final int CHANNEL_EXEC_SHUTDOWN_TIMEOUT = 30;


// 核心类
private Channel getChannel(ChannelCachingConnectionProxy connection, boolean transactional) {
    Semaphore permits = null; // 发现了一个神奇的工具类 , 可以理解为共享锁
    if (this.channelCheckoutTimeout > 0) {
        permits = obtainPermits(connection);
    }
    LinkedList<ChannelProxy> channelList = determineChannelList(connection, transactional);
    ChannelProxy channel = null;
    if (connection.isOpen()) {
        channel = findOpenChannel(channelList, channel);
    }
    if (channel == null) {
        try {
        
            channel = getCachedChannelProxy(connection, channelList, transactional);
        }catch (RuntimeException e) {
            //Semaphore 的释放 , 这是一个多线程信号量工具
        }
       return channel;
}


private ChannelProxy findOpenChannel(LinkedList<ChannelProxy> channelList,ChannelProxy channelArg) {
    ChannelProxy channel = channelArg;
    synchronized (channelList) {
        while (!channelList.isEmpty()) {
            // 此处训话获取 channelList 中第一个 channel , 并且从集合中移除
            channel = channelList.removeFirst();
                if (channel.isOpen()) {
                    break;
                } else {
                    cleanUpClosedChannel(channel);
                    channel = null;
                }
            }
        }
        return channel;
}

// CachingConnectionFactory 

2.2.2 Серия элементов структуры

2.2.3 Серия публикаций участников (админ)

AmqpAdmin

AmqpAdmin — это интерфейс, основным классом реализации которого является RabbitAdmin, реализующий переносимые операции управления AMQP.Очереди могут быть автоматически объявлены и связаны, пока RabbitAdmin существует в контексте приложения.

// 方法 declareExchanges : 
// 声明了一个 exchange , 其配置主要来源于注解或配置
Map<String, Object> arguments = exchange.getArguments();
channel.exchangeDeclare(
    exchange.getName(), 
    DELAYED_MESSAGE_EXCHANGE, 
    exchange.isDurable(),
    exchange.isAutoDelete(), 
    exchange.isInternal(), 
    arguments
);

//-----------------
// 方法 : declareQueues 声明了一个 Queue
// 方法 : declareBindings 声明了绑定关系

Позже видно, что при сканировании аннотации в начале соответствующая инициализация будет завершена через этот класс

RabbitAdmin

RabbitAdmin — относительно низкоуровневый класс, который в основном вызывается в этих местах

0001.jpg

Ядром RabbitAdmin является логика множественных объявлений,


// 节点一 : 先建立 Channel , 再通过 Callback 方法回调建立 Exchange
M- declareExchange(final Exchange exchange)
    // 注意 , 这里 excute 的参数是一个 ChannelCallback , 其目的是为了channle 建立后的回调
    this.rabbitTemplate.execute(channel -> {
        declareExchanges(channel, exchange);
        return null;
    });
    
M- declareExchanges
    ?- 该方法允许一次性传入多个 Exchange 
    - 先准备arguments 属性
    - 调用 channel.exchangeDeclare 实现声明操作

    
C- RabbitTemplate
    M- execute : RabbitTemplate 通过 retryTemplate 进行重试调用
        ?- 注意 doExecute , Channel 的建立主要还是在 RabbitTemplate 中完成的
        ?- 在 doExecute 中再 Callback 之前 RabbitAdmin 里面的操作 , 类似于一个回调
        - return this.retryTemplate.execute(
            (RetryCallback<T, Exception>) context -> doExecute(action, connectionFactory),
            (RecoveryCallback<T>) this.recoveryCallback);
            
// 节点二 : 删除 Exchange
M- deleteExchange(final String exchangeName) : 通过名字删除 Exchange
    - 先判断是否为默认 Exchange , 名称为""
    - 再直接通过channel.exchangeDelete 删除
    

// 节点三 : 建立 Queue , 和Exchange 同理
M- declareQueue(final Queue queue)
    - 同理 , 先建立 Channel 后回调
    - 调用  declareQueues(channel, queue); 完成Queue 声明
        - channel.queueDeclare : 通过 channel 对象完成 Queue 声明
        
M- declareQueue()
    ?- 声明一个服务器命名的独占的、autodelete的、非持久的队列
    ?- 这里是没有传入 Queue 对象的 , 直接创建一个 Chhenl , 然后创建一个 Queue
    -  this.rabbitTemplate.execute(Channel::queueDeclare)
        
M- deleteQueue(final String queueName)
    ?- 同样的 , 先创建一个 Channel , 再删除
    
M-  deleteQueue(final String queueName, final boolean unused, final boolean empty)
    ?- 更详细的删除方式 , 包括是否使用 , 是否为空来判断是否删除
    
M- purgeQueue 
    ?- 清除队列,可以选择不等待清除发生
    
    
// 节点四 : Binding 处理
M- declareBinding
    - 先建立 Channel 
    - declareBindings(channel, binding);
        - channel.queueBind(binding.getDestination(), binding.getExchange(), binding.getRoutingKey(),binding.getArguments());
        - channel.exchangeBind(binding.getDestination(), binding.getExchange(), binding.getRoutingKey(),binding.getArguments());
        
        
        
M- removeBinding(final Binding binding)
    - channel.queueUnbind(binding.getDestination(), binding.getExchange(), binding.getRoutingKey(),binding.getArguments());
    - channel.exchangeUnbind(binding.getDestination(), binding.getExchange(), binding.getRoutingKey(),binding.getArguments());
                                                

M- QueueInformation getQueueInfo(String queueName)
    - DeclareOk declareOk = channel.queueDeclarePassive(queueName);
    - return new QueueInformation(declareOk.getQueue(), declareOk.getMessageCount(),declareOk.getConsumerCount());



2.2.4 событие серия событий

В Кролике в основном есть следующие события:

  • AsyncConsumerStartedEvent: когда потребитель начинает
  • AsyncConsumerRestartedEvent: перезапустить потребителя после сбоя
  • AsyncConsumerTerminatedEvent: потребительские остановки
  • AsyncConsumerStoppedEvent: Потребитель остановлен (используется SimpleMessageListenerContainer)
  • ConsumeOkEvent: когда ConsumerOk получен от брокера
  • ListenerContainerIdleEvent :
  • MissingQueueEvent: потерянная очередь

2.2.5 Обработка транзакции транзакции

Основное различие между этими двумя методами заключается в том, что один заключается в предоставлении внешней транзакции (путем установки channelTransacted + TransactionManager), а другой — внутренней логики (путем установки channelTransacted + объявление транзакции (например, @Transaction)).

метод первый:
И в RabbitTemplate, и в SimpleMessageListenerContainer есть флаг channelTransacted, если флаг равен true, он сообщает платформе использовать канал транзакции и завершать все операции (отправить или получить) путем фиксации или отката (в зависимости от результата), и там является сигналом исключения Выполнить откат.

// 这种方式的体现主要在 Consumer 中, 以BlockingQueueConsumer 为例

// Step 1 : 创建 Consumer 的时候传入 isChannelTransacted
consumer = new BlockingQueueConsumer(getConnectionFactory(), getMessagePropertiesConverter(),
    this.cancellationLock, getAcknowledgeMode(), isChannelTransacted(), actualPrefetchCount,
    isDefaultRequeueRejected(), getConsumerArguments(), isNoLocal(), isExclusive(), queues);

// Step 2 : 最终在相关方法中调用
M- rollbackOnExceptionIfNecessary
    if (this.transactional) -> RabbitUtils.rollbackIfNecessary(this.channel);
    
    
// 事务的声明 : 
1 例如可以使用Spring 事务模型

// 事务的原理 : 
TODO 

Способ второй:
Предоставляет внешнюю транзакцию, содержащую реализацию Spring PlatformTransactionManager в качестве контекста для текущей операции. Если транзакция уже выполняется, когда инфраструктура отправляет или получает сообщение, а флаг channelTransacted имеет значение true, фиксация или откат транзакции обмена сообщениями будет отложена до конца текущей транзакции. Если флаг channelTransacted равен false, семантика транзакций не применяется к операциям обмена сообщениями (автоматически регистрируется)

@Bean
public RabbitTransactionManager rabbitTransactionManager() {
    return new RabbitTransactionManager(connectionFactory);
}

@Bean
public SimpleMessageListenerContainer messageListenerContainer() {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(rabbitConnectionFactory());
        container.setTransactionManager(rabbitTransactionManager());
        container.setChannelTransacted(true);
        container.setQueueName("some.queue");
        container.setMessageListener(exampleListener());
        return container;
}

// 深入一下用法 : 
C- SimpleMessageListenerContainer
    M- doReceiveAndExecute
        ?- 其中在一个 Catch 中能看到对 transactionManager 的调用
        ... catch (Exception ex) {
            - 判断是否有transactionManager
            - 判断是否开启了Callback
                RabbitResourceHolder resourceHolder = (RabbitResourceHolder)TransactionSynchronizationManager.getResource(getConnectionFactory());
                if (resourceHolder != null) {
                    consumer.clearDeliveryTags();
                } else {
                    consumer.rollbackOnExceptionIfNecessary(ex);
                }

2.3 Подробный анализ аннотаций RabbitListener

На самом деле, на протяжении всего процесса мы не проявляли инициативы по установлению Очереди или Обмена, так что же нам дала аннотация?

Давайте сначала посмотрим, какие методы представлены в аннотациях:

  • id :
  • containerFactory :класс контейнерной фабрики
  • queues :объявить очереди
  • queuesToDeclare :Если в контексте приложения есть RabbitAdmin, очередь будет использовать привязку по умолчанию к брокеру.
  • exclusive :
  • priority :
  • admin :Класс AmqpAdmin
  • bindings :система привязки
  • group :
  • returnExceptions :Возвращает исключение отправителю, исключение помещается в объект RemoteInvocationResult.
  • errorHandler :обработчик обработки исключений
  • concurrency :Установите параллелизм контейнера прослушивателя для этого прослушивателя
  • autoStartup :следует ли запускать при запуске ApplicationContext
  • executor :Устанавливает имя компонента-исполнителя задачи для этого контейнера слушателя; переопределяет любой исполнитель, установленный на фабрике контейнеров
  • **ackMode :**AcknowledgeMode
  • replyPostProcessor :

автоматическое сохранение

 @RabbitListener(bindings = @QueueBinding(
        value = @Queue(value = "myQueue", durable = "true"),
        exchange = @Exchange(value = "auto.exch", ignoreDeclarationExceptions = "true"),
        key = "orderRoutingKey")
  )

объявляет и связывает анонимную (эксклюзивную, автоматически удаляемую) очередь

// 这里 Queue 没有设置任何东西
  @RabbitListener(bindings = @QueueBinding(
        value = @Queue,
        exchange = @Exchange(value = "auto.exch"),
        key = "invoiceRoutingKey")
  )

Аннотация для получения заголовка

@RabbitListener(queues = "myQueue")
public void processOrder(Order order, @Header("order_type") String orderType)

Прослушивание нескольких очередей

@RabbitListener(queues = { "queue1", "queue2" } )

// 另外还可以使用 SPEL 表达式
@RabbitListener(queues = "#{'${property.with.comma.delimited.queue.names}'.split(',')}" )

ОтправитьЧтобы отправить дополнительное сообщение

  • @SendTo : полученное сообщение будет отправлено в указанное место назначения маршрутизации, и все пользователи, подписавшиеся на сообщение, смогут его получить, что является широковещательной рассылкой.
  • @SendToUser : назначение сообщения обрабатывается UserDestinationMessageHandler, который направляет сообщение в место назначения, соответствующее отправителю.
@SendTo("#{environment['my.send.to']}")


// 使用Bean 的方式
@RabbitListener(queues = "test.sendTo.spel")
@SendTo("#{spelReplyTo}")
public String capitalizeWithSendToSpel(String foo) {
    return foo.toUpperCase();
}

@Bean
public String spelReplyTo() {
    return "test.sendTo.reply.spel";
}

// SPEL
@SendTo("!{'some.reply.queue.with.' + result.queueName}")

// Spring Enviroment
@SendTo("#{environment['my.send.to']}")

Определить тип ответа

@RabbitListener(queues = "q1", messageConverter = "delegating",
        replyContentType = "application/json")

Основной класс один: RabbitListenerAnnotationBeanPostProcessor

Он будет вызываться через созданный контейнер прослушивателя сообщений AMQP в соответствии с параметрами аннотации.RabbitListenerContainerFactory, автоматически обнаруживая любые экземпляры RabbitListenerConfigurer в контейнере, что позволяет использовать настраиваемые реестры, фабрики контейнеров по умолчанию или детальный контроль над регистрацией конечных точек.

// RabbitListenerAnnotationBeanPostProcessor


// Step Start : postProcessAfterInitialization
// postProcessAfterInitialization 来自于 BeanPostProcessor , 他会在自定义初始化之前调用
public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {
    // 获取 ServletWebServerFactoryConfiguration
    Class<?> targetClass = AopUtils.getTargetClass(bean);
    // 获取每个类的 RabbitListener Metadata
    final TypeMetadata metadata = this.typeCache.computeIfAbsent(targetClass, this::buildMetadata);
    for (ListenerMethod lm : metadata.listenerMethods) {
        for (RabbitListener rabbitListener : lm.annotations) {
            // 如果存在注解 , 则处理
            processAmqpListener(rabbitListener, lm.method, bean, beanName);
        }
    }
    if (metadata.handlerMethods.length > 0) {
        processMultiMethodListeners(metadata.classAnnotations, metadata.handlerMethods, bean, beanName);
    }
    return bean;
}



// Step : findListenerAnnotations 扫描所有的注解
private RabbitListenerAnnotationBeanPostProcessor.TypeMetadata buildMetadata(Class<?> targetClass) {
        // 此处会对每个类都检测是否存在 @RabbitListener
        Collection<RabbitListener> classLevelListeners = findListenerAnnotations(targetClass);
        final boolean hasClassLevelListeners = classLevelListeners.size() > 0;
        final List<RabbitListenerAnnotationBeanPostProcessor.ListenerMethod> methods = new ArrayList<>();
        final List<Method> multiMethods = new ArrayList<>();
        ReflectionUtils.doWithMethods(targetClass, method -> {
            Collection<RabbitListener> listenerAnnotations = findListenerAnnotations(method);
            if (listenerAnnotations.size() > 0) {
                // 如果存在注解 , 则添加到集合中
                methods.add(new RabbitListenerAnnotationBeanPostProcessor.ListenerMethod(method,
                        listenerAnnotations.toArray(new RabbitListener[listenerAnnotations.size()])));
            }
            if (hasClassLevelListeners) {
                RabbitHandler rabbitHandler = AnnotationUtils.findAnnotation(method, RabbitHandler.class);
                if (rabbitHandler != null) {
                    multiMethods.add(method);
                }
            }
        }, ReflectionUtils.USER_DECLARED_METHODS);
        if (methods.isEmpty() && multiMethods.isEmpty()) {
            return RabbitListenerAnnotationBeanPostProcessor.TypeMetadata.EMPTY;
        }
        // 构建一个 TypeMetadata
        return new RabbitListenerAnnotationBeanPostProcessor.TypeMetadata(
                methods.toArray(new RabbitListenerAnnotationBeanPostProcessor.ListenerMethod[methods.size()]),
                multiMethods.toArray(new Method[multiMethods.size()]),
                classLevelListeners.toArray(new RabbitListener[classLevelListeners.size()]));
}

Как вы видели выше, processAmqpListener в конечном итоге будет вызываться в методе postProcessAfterInitialization для обработки аннотаций, давайте взглянем на метод processAmqpListener.

// processAmqpListener
protected void processAmqpListener(RabbitListener rabbitListener, Method method, Object bean, String beanName) {
    // 如果是代理 , 返回最终代理的方法
    Method methodToUse = checkProxy(method, bean);
    // 构建了一个 MethodRabbitListenerEndpoint , 该单元用于处理这个端点的传入消息
    MethodRabbitListenerEndpoint endpoint = new MethodRabbitListenerEndpoint();
    endpoint.setMethod(methodToUse);
    // Listener 注解流程
    processListener(endpoint, rabbitListener, bean, methodToUse, beanName);
}


// 我们继续深入 , 很长的一段 ,我们省略掉其中无意义的一部分 :
protected void processListener(MethodRabbitListenerEndpoint endpoint, RabbitListener rabbitListener, Object bean,
    	Object target, String beanName) {

    // endpoint 注入属性 
    endpoint.setBean(bean);
    endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);
    endpoint.setId(getEndpointId(rabbitListener));
    // 这里仅仅只是对 QueueName 进行校验和解析
    endpoint.setQueueNames(resolveQueues(rabbitListener));
    // 处理监听器
    endpoint.setConcurrency(resolveExpressionAsStringOrInteger(rabbitListener.concurrency(), "concurrency"));
    endpoint.setBeanFactory(this.beanFactory);
    //异常处理
    endpoint.setReturnExceptions(resolveExpressionAsBoolean(rabbitListener.returnExceptions()));
    
    Object errorHandler = resolveExpression(rabbitListener.errorHandler());
    if (errorHandler instanceof RabbitListenerErrorHandler) {
    	endpoint.setErrorHandler((RabbitListenerErrorHandler) errorHandler);
    }
    else if (errorHandler instanceof String) {
    	String errorHandlerBeanName = (String) errorHandler;
    	if (StringUtils.hasText(errorHandlerBeanName)) {
			endpoint.setErrorHandler(this.beanFactory.getBean(errorHandlerBeanName, RabbitListenerErrorHandler.class));
    	}
    }
    else {
    	throw new IllegalStateException("error handler mut be a bean name or RabbitListenerErrorHandler, not a "
        	+ errorHandler.getClass().toString());
    }
    String group = rabbitListener.group();
    if (StringUtils.hasText(group)) {
    	Object resolvedGroup = resolveExpression(group);
    	if (resolvedGroup instanceof String) {
			endpoint.setGroup((String) resolvedGroup);
    	}
    }
    String autoStartup = rabbitListener.autoStartup();
    if (StringUtils.hasText(autoStartup)) {
    	endpoint.setAutoStartup(resolveExpressionAsBoolean(autoStartup));
    }

    endpoint.setExclusive(rabbitListener.exclusive());
    String priority = resolve(rabbitListener.priority());
    if (StringUtils.hasText(priority)) {
		endpoint.setPriority(Integer.valueOf(priority));
    }

    resolveExecutor(endpoint, rabbitListener, target, beanName);
    resolveAdmin(endpoint, rabbitListener, target);
    // ACK 模式我们回头再看看
    resolveAckMode(endpoint, rabbitListener);
    resolvePostProcessor(endpoint, rabbitListener, target, beanName);
    RabbitListenerContainerFactory<?> factory = resolveContainerFactory(rabbitListener, target, beanName);

    this.registrar.registerEndpoint(endpoint, factory);
}

// 注册 Endpoint
public void registerEndpoint(RabbitListenerEndpoint endpoint,
			@Nullable RabbitListenerContainerFactory<?> factory) {
    AmqpListenerEndpointDescriptor descriptor = new AmqpListenerEndpointDescriptor(endpoint, factory);
    synchronized (this.endpointDescriptors) {
        if (this.startImmediately) { 
            this.endpointRegistry.registerListenerContainer(descriptor.endpoint, // NOSONAR never null
            resolveContainerFactory(descriptor), true);
        }else {
            this.endpointDescriptors.add(descriptor);
        }
    }
}

Посмотрите, как работает RabbitTemplate


// Step 1 : RabbitListenerEndpointRegistry Listener 启动
public void start() {
    for (MessageListenerContainer listenerContainer : getListenerContainers()) {
        startIfNecessary(listenerContainer);
    }
}

// 问题一 : getListenerContainers 的来源 >>
RabbitListenerEndpointRegistry  -->  Map<String, MessageListenerContainer>
// 注意 , 这里就和上文的扫描匹配上了 , 由 registerEndpoint 调用
public void registerListenerContainer(RabbitListenerEndpoint endpoint, RabbitListenerContainerFactory<?> factory,
				boolean startImmediately) {

    String id = endpoint.getId();
    synchronized (this.listenerContainers) {
        MessageListenerContainer container = createListenerContainer(endpoint, factory);
        this.listenerContainers.put(id, container);
        if (StringUtils.hasText(endpoint.getGroup()) && this.applicationContext != null) {
            List<MessageListenerContainer> containerGroup;
            if (this.applicationContext.containsBean(endpoint.getGroup())) {
                containerGroup = this.applicationContext.getBean(endpoint.getGroup(), List.class);
            }else {
                containerGroup = new ArrayList<MessageListenerContainer>();
                this.applicationContext.getBeanFactory().registerSingleton(endpoint.getGroup(), containerGroup);
            }
            containerGroup.add(container);
        }
        if (this.contextRefreshed) {
            container.lazyLoad();
        }
        if (startImmediately) {
            startIfNecessary(container);
        }
    }
}

// 扫描时 , 会将 MessageListenerContainer 放入 Map , 用于后续使用
// Step 2 : RabbitListenerEndpointRegistry Listener 运行
private void startIfNecessary(MessageListenerContainer listenerContainer) {
    if (this.contextRefreshed || listenerContainer.isAutoStartup()) {
        // 主要是 SimpleMessageListenerContainer
        // 继承了 AbstractMessageListenerContainer , start 在该类中
        listenerContainer.start();
    }
}

// Step 3 : start 方法

configureAdminIfNeeded();
// 校验 Queue 情况 ,主要是调用 RabbitAdmin , 校验失败的会 Channel shutdown
//  Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'type' for exchange 'HeaderExchange' in vhost '/': received 'direct' but current is 'headers', class-id=40, method-id=10)
checkMismatchedQueues();
    - 1 判断Exchange 情况并且输出日志 : !isDurable / isAutoDelete / isInfoEnabled
    - 2 判断 Queue 情况并且输出日志 : !isDurable / isAutoDelete / isExclusive / isInfoEnabled(开启日志)
    // 注意 , 这里就是之前苦苦寻找声明的地方 , 也可以理解为这里就开始连接了
    - 3 declareExchanges(channel, exchanges.toArray(new Exchange[exchanges.size()]));
        - channel.exchangeDeclare(exchange.getName(), DELAYED_MESSAGE_EXCHANGE, exchange.isDurable(),exchange.isAutoDelete(), exchange.isInternal(), arguments);
    - 4 declareQueues(channel, queues.toArray(new Queue[queues.size()]));
        - channel.queueDeclare(queue.getName(), queue.isDurable(),queue.isExclusive(), queue.isAutoDelete(), queue.getArguments());
    - 5 declareBindings(channel, bindings.toArray(new Binding[bindings.size()]));
        - channel.queueBind(binding.getDestination(), binding.getExchange(), binding.getRoutingKey(),binding.getArguments());
// 最后调用了一下子类的 doStart , 用于后续扩展
doStart();

Как вы, вероятно, можете видеть из вышеизложенного, как аннотация регистрирует сообщение, давайте посмотрим на процесс отправки и получения сообщения:

2.4 Отправка сообщения

Обычно мы используем следующий код для отправки сообщения:

rabbitTemplate.convertAndSend("DirectExchange", "ONE", "发送消息 :" + msg);

Давайте посмотрим, что происходит в этом:


// 中间有一个 Boolean 型 ,我们用伪代码表示
Boolean item1 = RabbitTemplate.this.returnCallback != null|| (correlationData != null && StringUtils.hasText(correlationData.getId()))
Boolean item2 = RabbitTemplate.this.mandatoryExpression.getValue(RabbitTemplate.this.evaluationContext, message, Boolean.class)
Boolean mandatory =  item1 && item2 ;
            

@Override
public void send(final String exchange, final String routingKey,
                    final Message message, @Nullable final CorrelationData correlationData)
                    throws AmqpException {
    execute(channel -> {
        // 核心方法 , 发送消息
        doSend(channel, exchange, routingKey, message,mandatory,correlationData);
        return null;
    }, obtainTargetConnectionFactory(this.sendConnectionFactorySelectorExpression, message));
}

// doSend 中我们同样去掉无关的
public void doSend(Channel channel, String exchangeArg, String routingKeyArg, Message message, // NOSONAR complexity
			boolean mandatory, @Nullable CorrelationData correlationData) throws IOException {

    // 这里准备了 Exchange 参数 和 routingKey 参数
    String exch = exchangeArg;
    String rKey = routingKeyArg;

    // 中间一大段都是对 MessageProperties 进行处理 , MessageProperties 存在 messageToUse 中
   
    sendToRabbit(channel, exch, rKey, mandatory, messageToUse);
    if (isChannelLocallyTransacted(channel)) {
        RabbitUtils.commitIfNecessary(channel);
    }
}

protected void sendToRabbit(Channel channel, String exchange, String routingKey, boolean mandatory,Message message) throws IOException {

    BasicProperties convertedMessageProperties = this.messagePropertiesConverter.fromMessageProperties(message.getMessageProperties(), this.encoding);
    // 这里就是常规的 RabbitMQ Client 操作了 , 到这里 发送就结束了
    // com.rabbitmq.client.impl.ChannelN
    channel.basicPublish(exchange, routingKey, mandatory, convertedMessageProperties, message.getBody());
    
    // basicPublish 核心如下 , channle 的逻辑我们可以未来再深入
    AMQCommand command = new AMQCommand((new Builder()).exchange(exchange).routingKey(routingKey).mandatory(mandatory).immediate(immediate).build(), props, body);
    this.transmit(command);

    
}

2.5 Получение сообщений

Прием сообщений должен осуществляться через классы прокси, найдем конкретный класс прокси:


// Step 1: SimpleMessageListenerContainer#run
// 其中有这样一段代码
while (isActive(this.consumer) || this.consumer.hasDelivery() || !this.consumer.cancelled()) {
    mainLoop();
}

// Step 2 : 持续监听 ,往里面追溯 , 可以发现有一段这样的代码
M- doReceiveAndExecute
try {
    executeListener(channel, message);
}

-> doExecuteListener
-> invokeListener(channel, message);
-> this.proxy.invokeListener(channel, data) // 到这里实际上还是看不到实际的处理类

// 一路debug , 最终会到这个方法中
protected void actualInvokeListener(Channel channel, Object data) {
    Object listener = getMessageListener();
    if (listener instanceof ChannelAwareMessageListener) {
        doInvokeListener((ChannelAwareMessageListener) listener, channel, data);
    }else if (listener instanceof MessageListener) {
			boolean bindChannel = isExposeListenerChannel() && isChannelLocallyTransacted();
			if (bindChannel) {
				RabbitResourceHolder resourceHolder = new RabbitResourceHolder(channel, false);
				resourceHolder.setSynchronizedWithTransaction(true);
				TransactionSynchronizationManager.bindResource(this.getConnectionFactory(),
						resourceHolder);
			}
			try {
				doInvokeListener((MessageListener) listener, data);
			}
			finally {
				if (bindChannel) {
					// unbind if we bound
					TransactionSynchronizationManager.unbindResource(this.getConnectionFactory());
				}
			}
		}
		else if (listener != null) {
			throw new FatalListenerExecutionException("Only MessageListener and SessionAwareMessageListener supported: "
					+ listener);
		}
		else {
			throw new FatalListenerExecutionException("No message listener specified - see property 'messageListener'");
		}
	}


2.6 Мониторинг сообщений

RabbitMQ предоставляет два прослушивателя сообщений: SimpleMessageListenerContainer (SMLC) и DirectMessageListenerContainer (DMLC), которые имеют следующие свойства:

  • ackTimeout: после установки messagesPerAck это значение используется как альтернатива ack, которое будет таким же, как время после последнего ack для обеспечения подлинности
  • acknowledgeMode: Существует три типа НЕТ, РУЧНОЙ, АВТО.
    • NONE : подтверждение не было отправлено
    • ВРУЧНУЮ: слушатель должен подтвердить все сообщения, вызвав Channel.basicAck().
    • AUTO : контейнер автоматически подтверждает сообщение, если MessageListener не выдает исключение
  • adviceChain: Массив уведомлений AOP для применения к выполнению прослушивателя.
  • afterReceivePostProcessors: Массив экземпляров MessagePostProcessor для вызова перед вызовом прослушивателя.
  • alwaysRequeueWithTxManagerRollback: true означает всегда запрашивать сообщения при откате при настройке менеджера транзакций.
  • autoDeclare: Если установлено значение true (по умолчанию), переобъявить все объекты AMQP (очереди, обмены) с помощью RabbitAdmin, если контейнер обнаружит, что хотя бы одна очередь отсутствует во время запуска (вероятно, потому, что это очередь с автоматическим удалением или очередь с истекшим сроком действия), привязка ) , но если очередь по какой-либо причине потеряна, повторная декларация продолжится.
  • autoStartup: указывает, что контейнер должен запускаться при запуске ApplicationContext, по умолчанию true.
  • batchSize: при использовании с acceptMode.AUTO контейнер пытается пакетировать это количество сообщений перед их отправкой.
  • batchingStrategy: стратегия пакетной обработки
  • channelTransacted: следует ли подтверждать все сообщения в транзакции (вручную или автоматически)
  • concurrency: количество одновременных потребителей на слушателя
  • concurrentConsumers: количество одновременных потребителей, изначально запущенных каждым слушателем
  • connectionFactory: ссылка на ConnectionFactory
  • consecutiveActiveTrigger: Минимальное количество последовательных сообщений (меньше этого числа не будет отправлен тайм-аут приема)
  • consumerBatchEnabled: включить пакетные сообщения
  • consumerStartTimeout: Время ожидания запуска потребительского потока в миллисекундах.
  • consumerTagStrategy: Реализация стратегии ConsumerTagStrategy
  • consumersPerQueue: количество потребителей, созданных для настроенной очереди.
  • errorHandler: ссылка на обработчик ошибок
  • exclusive: имеет ли единственный потребитель в этом контейнере эксклюзивный доступ к очереди
  • messagesPerAck: количество сообщений для получения между acks
  • maxConcurrentConsumers: максимальное количество одновременных потребителей для запуска по требованию
  • noLocal: установите значение true, чтобы отключить доставку сообщений, опубликованных на том же канале соединения, от сервера к потребителю.
  • rabbitAdmin: реализация RabbitAdmin
  • receiveTimeout: максимальное время ожидания каждого сообщения
  • shutdownTimeout: тайм-аут закрытия
  • startConsumerMinInterval: начать минимальный интервал потребления
  • stopConsumerMinInterval: Минимальный интервал закрытия потребителя
  • transactionManager: Управление транзакциями

Функция слушателя

Отсюда мы, вероятно, можем узнать общие функции слушателя:

  • Очередь прослушивания (несколько очередей)
  • автоматический старт
  • Функция автоматического запроса
  • Установить характеристики транзакции, диспетчер транзакций, атрибуты транзакций, параллелизм транзакций, открытая транзакция, откат сообщения
  • Установите количество потребителей, максимальное и минимальное количество, атрибуты пакетного потребления и т. д.
  • Установите модуль подтверждения и подтверждения, возвращаться ли в очередь, errorHandler
  • Установить стратегию генерации потребительских тегов, эксклюзивный режим, потребительские атрибуты
  • Установите конкретные преобразователи, преобразования и т. д.
  • и другие общие черты

DirectMessageListenerContainer

  • Каждый потребитель каждой очереди использует отдельный канал
  • Параллелизм контролируется rabbitClient
// 配置方式
@Bean
public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames("queueName1","queueName2");
        container.setMessageListener((MessageListener) message -> {
           //....
        });
        return container;
}

Взгляните на использование из исходного кода:

TODO

3. Точки углубленных знаний

3.1 Настройка SSL

Настройте свойства, связанные с SSL, настроив userSSL в RabbitConnectionFactoryBean, и обратите внимание на настройку следующих свойств:

  • private Resource sslPropertiesLocation;
  • private String keyStore;
  • private String trustStore;

За подробностями обращайтесь к официальному документу 4.1.2 @docs.spring.IO/spring-AM-QP…

3.2 Настройка кластера

AddressShuffleMode.RANDOM означает случайную установку порядка подключения, по умолчанию опрос выполняется один за другим.

  • NONE : не менять адреса до или после открытия соединения; пробовать соединения в фиксированном порядке
  • RANDOM: случайным образом перемешивайте адреса перед открытием соединений; пробуйте новые последовательные соединения
  • INORDER : перетасуйте адреса после открытия соединения, переместите первый адрес в конец (похоже на то, как вырезать карты ..)
@Bean
public CachingConnectionFactory ccf() {
    CachingConnectionFactory ccf = new CachingConnectionFactory();
    ccf.setAddresses("host1:5672,host2:5672,host3:5672");
    ccf.setAddressShuffleMode(AddressShuffleMode.RANDOM);
    return ccf;
}


// Step 1 : 这个 Address 最终会被设置到 address 属性 同时内部 publisherConnectionFactory 也会尝试设置
Address[] addressArray = Address.parseAddresses(addresses);
this.addresses = new LinkedList<>(Arrays.asList(addressArray));
this.publisherConnectionFactory.setAddresses(addresses);

// Step 2 : 模式判断
private synchronized com.rabbitmq.client.Connection connectAddresses(String connectionName)
			throws IOException, TimeoutException {

    List<Address> addressesToConnect = new ArrayList<>(this.addresses);
    
    // RANDOM 模式处理
    if (addressesToConnect.size() > 1 && AddressShuffleMode.RANDOM.equals(this.addressShuffleMode)) {
        Collections.shuffle(addressesToConnect);
    }

    com.rabbitmq.client.Connection connection = this.rabbitConnectionFactory.newConnection(this.executorService,
				addressesToConnect, connectionName);
    // INORDER 模式             
    if (addressesToConnect.size() > 1 && AddressShuffleMode.INORDER.equals(this.addressShuffleMode)) {
        this.addresses.add(this.addresses.remove(0));
    }
    return connection;
}


// Step 3 : 在 com.rabbitmq.client.ConnectionFactory 中 newConnection 


3.3 Настройка фабрики соединений

public class MyService {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void service(String vHost, String payload) {
        SimpleResourceHolder.bind(rabbitTemplate.getConnectionFactory(), vHost);
        rabbitTemplate.convertAndSend(payload);
        SimpleResourceHolder.unbind(rabbitTemplate.getConnectionFactory());
    }

}

3.4 Настройка общих функций RabbitTemplate

3.4.1 Функция повтора

RetryTemplate не является эксклюзивным для Rabbit, это инструмент в пакете org.springframework.retry.support.

@Bean
public RabbitTemplate rabbitTemplate() {

    RabbitTemplate template = new RabbitTemplate(connectionFactory());
    
    // 使用 RetryTemplate , 这个类是 org.springframework.retry.support 的类
    RetryTemplate retryTemplate = new RetryTemplate();
    
    // 回退功能 , 归属于类 org.springframework.retry.backoff
    ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
    backOffPolicy.setInitialInterval(500);
    backOffPolicy.setMultiplier(10.0);
    backOffPolicy.setMaxInterval(10000);
    
    // 重试 + 回退
    retryTemplate.setBackOffPolicy(backOffPolicy);
    template.setRetryTemplate(retryTemplate);
    
    return template;
}

Используйте метод обратного вызова


retryTemplate.execute(
    new RetryCallback<Object, Exception>() {

        @Override
        public Object doWithRetry(RetryContext context) throws Exception {
            context.setAttribute("message", message);
            return rabbitTemplate.convertAndSend(exchange, routingKey, message);
        }

    }, new RecoveryCallback<Object>() {

        @Override
        public Object recover(RetryContext context) throws Exception {
            Object message = context.getAttribute("message");
            Throwable t = context.getLastThrowable();
            // Do something with message
            return null;
        }
    });
}

Список исходного кода



3.4.2 Конфигурация обнаружения аномалий

Этот метод не рекомендуется в официальных документах, ведь обработка транзакций очень ресурсоемка.

// 先开始 事务处理
template.setChannelTransacted(true);

// 再在事务的方法中检测异常
txCommit()


3.4.3 Установить обратный вызов

  • RabbitTemplate поддерживает только один ConfirmCallback
template.setConfirmCallback(new MySelfConfirmCallback());

// ConfirmCallback 实现类
public class MySelfConfirmCallback implements RabbitTemplate.ConfirmCallback {

    private Logger logger = LoggerFactory.getLogger(this.getClass());

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        logger.info("------> CorrelationData is :{} <-------", correlationData);
    }
}

// CorrelationData 是客户机在发送原始消息时提供的对象
// ack : ture(ack) / false (nack)

// CorrelationData 提供了Future 接口用户异步获取数据
CorrelationData cd1 = new CorrelationData();
this.templateWithConfirmsEnabled.convertAndSend("exchange", queue.getName(), "foo", cd1);
cd1.getFuture().get(10, TimeUnit.SECONDS).isAck();

Углубитесь в исходный код

public SettableListenableFuture<Confirm> getFuture() {
    return this.future;
}

C- SettableListenableFuture
    - private final SettableTask<T> settableTask = new SettableTask<>()
        ?- 其中包含一个 SettableTask 对象  ,该对象中有个 Thread

private static class SettableTask<T> extends ListenableFutureTask<T> {
    @Nullable
    private volatile Thread completingThread;
}


    
C- Confirm
    - private final boolean ack;
    - private final String reason;

3.4.5 Настройка возврата

  • Установите CachingConnectionFactory#publisherReturns в true
  • Набор шаблонов RabbitTemplate ReturnsCallback
template.setReturnsCallback(new SelfReturnsCallback());

// 设置 ReturnCallBack
public class SelfReturnsCallback implements RabbitTemplate.ReturnsCallback {

    private Logger logger = LoggerFactory.getLogger(this.getClass());

    @Override
    public void returnedMessage(ReturnedMessage returned) {

    }
}

ReturnedMessage содержит следующие свойства:

  • message : само возвращаемое сообщение
  • answerCode : код, указывающий причину возврата
  • answerText : причина возврата
  • exchange : биржа, на которую было отправлено сообщение
  • routingKey : ключ маршрутизации для использования

Каждый RabbitTemplate поддерживает только один ReturnsCallback.

3.4.6 Независимое подключение

Установив свойство useblisherconnection, реализуйте соединение, отличное от того, которое используется контейнером прослушивателя, когда это возможно.

Таким образом, вы избегаете, когда производитель блокируется по какой-либо причине, блокируется потребитель

template.setUsePublisherConnection(true);

// 连接工厂为此维护第二个内部连接工厂
// 默认情况下,它与主工厂的类型相同,但是如果您希望使用不同的工厂类型发布,可以设置明确的类型。

// 源码一览 :
M- doSendAndReceiveWithDirect
if (this.usePublisherConnection && connectionFactory.getPublisherConnectionFactory() != null) {
    connectionFactory = connectionFactory.getPublisherConnectionFactory();
}


3.5 Событие Событие

  • AsyncConsumerStartedEvent : когда запускается потребитель
  • AsyncConsumerRestartedEvent: перезапустите потребитель после сбоя.
  • AsyncConsumerTerminatedEvent: потребитель остановлен
  • AsyncConsumerStoppedEvent: потребитель остановлен (используется SimpleMessageListenerContainer)
  • ConsumeOkEvent : когда ConsumerOk получен от брокера
  • ListenerContainerIdleEvent :
  • MissingQueueEvent: отсутствует очередь

3.6 Настройка конвертеров сообщений


// 通过注解 , 这是Bean 名
@RabbitListener(..., messageConverter = "jsonConverter")

@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    ...
    factory.setMessageConverter(new Jackson2JsonMessageConverter());
    ...
    return factory;
}

// PS : 
@Bean
public DefaultConversionService myConversionService() {
        DefaultConversionService conv = new DefaultConversionService();
        conv.addConverter(mySpecialConverter());
        return conv;
}


3.7 Настройка BatchListener

BatchListerner для получения целых пакетов за один вызов

@Bean
public SimpleRabbitListenerContainerFactory consumerBatchContainerFactory() {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(rabbitConnectionFactory());
    factory.setConsumerTagStrategy(consumerTagStrategy());
    factory.setBatchListener(true); // configures a BatchMessageListenerAdapter
    factory.setBatchSize(2);
    factory.setConsumerBatchEnabled(true);
    return factory;
}

// 接收方式
@RabbitListener(queues = "batch.1", containerFactory = "consumerBatchContainerFactory")
public void consumerBatch1(List<Message> amqpMessages) {
    this.amqpMessagesReceived = amqpMessages;
    this.batch1Latch.countDown();
}

@RabbitListener(queues = "batch.2", containerFactory = "consumerBatchContainerFactory")
public void consumerBatch2(List<org.springframework.messaging.Message<Invoice>> messages) {
    this.messagingMessagesReceived = messages;
    this.batch2Latch.countDown();
}

@RabbitListener(queues = "batch.3", containerFactory = "consumerBatchContainerFactory")
public void consumerBatch3(List<Invoice> strings) {
    this.batch3Strings = strings;
    this.batch3Latch.countDown();
}

3.8 Преобразование сообщений и сериализация

public RabbitTemplate getRabbitTemplate(){
    RabbitTemplate template = new RabbitTemplate();
    template.setMessageConverter(new Jackson2JsonMessageConverter());
    return template;
}


@Bean
public Jackson2JsonMessageConverter jsonMessageConverter() {
    Jackson2JsonMessageConverter jsonConverter = new Jackson2JsonMessageConverter();
    jsonConverter.setClassMapper(classMapper());
    return jsonConverter;
}

@Bean
public DefaultClassMapper classMapper() {
    DefaultClassMapper classMapper = new DefaultClassMapper();
    Map<String, Class<?>> idClassMapping = new HashMap<>();
    idClassMapping.put("thing1", Thing1.class);
    idClassMapping.put("thing2", Thing2.class);
    classMapper.setIdClassMapping(idClassMapping);
    return classMapper;
}

3.9 Управление транзакциями Rabbit MQ

Управление транзакциями контролируется свойством channelTransacted в SimpleMessageListenerContainer.Если установлено значение True, платформа использует канал транзакции и завершает все операции (отправку или получение) путем фиксации или отката (в зависимости от результата), а сигнал исключения выдается на откат

    @Bean
    public SimpleMessageListenerContainer messageListenerContainer() {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(rabbitConnectionFactory());
        container.setTransactionManager(transactionManager());
        container.setChannelTransacted(true);
        container.setQueueName("some.queue");
        container.setMessageListener(exampleListener());
        return container;
    }
    
// transactionManager 主要是 RabbitTransactionManager, 其实现类 AbstractPlatformTransactionManager
    

3.10 Пре- и постобработка

  • RabbitTemplate
    • setBeforePublishPostProcessors()
    • setAfterReceivePostProcessors()
  • SimpleMessageListenerContainer
    • setAfterReceivePostProcessors()
    public RabbitTemplate getRabbitTemplate1(){
        RabbitTemplate template = new RabbitTemplate();
        template.setAfterReceivePostProcessors();
        template.setBeforePublishPostProcessors();
        return template;
    }

3.11 Автоматическое восстановление

RabbitAdmin повторно объявляет любые инфраструктурные компоненты (очереди и другие) при повторном установлении соединения, поэтому он не полагается на автоматическое восстановление, которое теперь предоставляется библиотекой amqp-client.

C- com.rabbitmq.client.ConnectionFactory
    M- setAutomaticRecoveryEnabled : client 的自动恢复开关

3.12 Метод повтора

Способ настройки повторов упоминался в предыдущем шаблоне RabbitTemplate, вот другие повторы:

пакетная повторная попытка

Класс реализации пакетных повторных попыток на основе MessageBatchRecoverer

3.13 Многоагентный подход

Мультиагент означает объявление нескольких наборов инфраструктуры (фабрики соединений, администраторы, фабрики контейнеров).


// Node 1 : 设置多工厂
@Bean
CachingConnectionFactory cf1() {
    return new CachingConnectionFactory("localhost");
}

@Bean
CachingConnectionFactory cf2() {
    return new CachingConnectionFactory("otherHost");
}

@Bean
SimpleRoutingConnectionFactory rcf(CachingConnectionFactory cf1,CachingConnectionFactory cf2) {
    SimpleRoutingConnectionFactory rcf = new SimpleRoutingConnectionFactory();
    rcf.setDefaultTargetConnectionFactory(cf1);
    rcf.setTargetConnectionFactories(Map.of("one", cf1, "two", cf2));
    return rcf;
}


// Node 2 : 设置多 Admin

@Bean("factory1-admin")
RabbitAdmin admin1(CachingConnectionFactory cf1) {
    return new RabbitAdmin(cf1);
}

@Bean("factory2-admin")
RabbitAdmin admin2(CachingConnectionFactory cf2) {
    return new RabbitAdmin(cf2);
}

@Bean
public RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry() {
    return new RabbitListenerEndpointRegistry();
}

@Bean
public RabbitListenerAnnotationBeanPostProcessor postProcessor(RabbitListenerEndpointRegistry registry) {
    MultiRabbitListenerAnnotationBeanPostProcessor postProcessor
                = new MultiRabbitListenerAnnotationBeanPostProcessor();
    postProcessor.setEndpointRegistry(registry);
    postProcessor.setContainerFactoryBeanName("defaultContainerFactory");
    return postProcessor;
}


// Node 3 : 多 ContainFacotory

 @Bean
public SimpleRabbitListenerContainerFactory factory1(CachingConnectionFactory cf1) {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(cf1);
    return factory;
}

@Bean
public SimpleRabbitListenerContainerFactory factory2(CachingConnectionFactory cf2) {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(cf2);
    return factory;
}

// Node Other

 @Bean
RabbitTemplate template(RoutingConnectionFactory rcf) {
    return new RabbitTemplate(rcf);
}

@Bean
ConnectionFactoryContextWrapper wrapper(SimpleRoutingConnectionFactory rcf) {
    return new ConnectionFactoryContextWrapper(rcf);
}

    
    

4. Классовые отношения

  • SimpleMessageListenerContainer
  • RabbitAdmin

TODO: нужно улучшить

V. Решения

5.1 Кластерные решения и вопросы эффективности

проблема эффективности коммуникации

RabbitMQ передает данные через виртуальный канал соединения (канал), установленный поверх TCP.Официальное определение канала: всегоОблегченные соединения, использующие одно TCP-соединение.
PS: не рекомендуется открывать несколько TCP-соединений одновременно, так как это потребляет системные ресурсы и усложняет настройку брандмауэра.

Концепция канала:

Каждая операция протокола, выполняемая клиентом, происходит на канале. Связь по конкретному каналу полностью независима от связи по другому каналу, поэтому каждый метод протокола имеет идентификатор канала (также известный как номер канала), который используется как прокси-сервером, так и клиентом, чтобы определить, для какого канала используется метод.

Каналы существуют только в контексте соединения, а не независимо друг от друга. Когда соединение закрывается, все каналы соединения также закрываются.

Жизненный цикл канала:

Приложение открывает канал сразу после успешного открытия соединения.

ConnectionFactory cf = new ConnectionFactory();
Connection conn = cf.createConnection();
Channel ch = conn.createChannel();
ch.close();

Так же, как связи, каналы означают долгую жизнь. То есть нет необходимости открывать канал для каждой операции, что очень неэффективно, т.к. открытие канала — это обход сети туда-обратно.

Клиентская библиотека предоставляет способ наблюдения за исключениями канала и реагирования на них. Например, в Java-клиенте есть способ зарегистрировать обработчик ошибок и получить доступ к причине закрытия (закрытия) канала.

Типы кластеров и способы их развертывания

1. Обычный кластерный режим

  • Интеграция: Несколько серверов развертывают узлы отдельно, каждый узел сохраняет метаданные или экземпляр очереди, при доступе к узлу только с метаданными очереди он извлекает данные из узла с экземпляром.
  • Увеличение пропускной способности
  • Не очень доступный

2. Режим зеркального кластера

  • Интеграция: Несколько серверов развертывают узлы отдельно, один узел будет синхронизировать данные с другими узлами, поэтому каждый узел содержит все данные сообщений.
  • Высокая доступность
  • Высокая производительность

Вы можете обратиться к этой карте:Наггетс.Талант/редактор/Конечно Афан…

image.png

image.png

5.2 Проблема потери сообщений

Обычно выделяют три типа потерь:

  1. Сообщение теряется в процессе отправки, и потребитель не получает сообщение
  2. Потребители получают сообщения и смогут потреблять их в будущем.
  3. Потребитель потребляет сообщение, но потребление не завершается успешно

Тема 1: Сообщение теряется в процессе отправки, и потребитель не получает сообщение

Ядром проблемы потери является подтверждение. RabbitMQ является подтверждением отправителя. После того, как канал установлен в режим подтверждения, всем сообщениям будет присвоен идентификатор канала. После завершения потребления идентификатор успешного потребления будет возвращен в режиссер.

При внутренней ошибке возвращается сообщение nack

void confirm(CorrelationData correlationData, boolean ack, String cause);
CorrelationData  : 客户机在发送原始消息时提供的对象
ack : false -> nack (not ack )

Режим подтверждения отправителя является асинхронным, и приложение-производитель может продолжать отправлять сообщения, ожидая подтверждения.PS: RabbitMQ предоставляет функцию транзакций

Вопрос 2: Потребители получают сообщения и не смогут их использовать в будущем

Обычно причина этого в том, что сообщение помещается в память, и времени на его потребление нет.Лучшее решение — персистентность.

  1. Параметры очереди не сохраняются при их создании
  2. При отправке сообщения сделайте сообщение постоянным

Проблема 3: Потребитель потребляет сообщение, но потребление не завершается успешно

ACK обрабатывается потребителем через бизнес-уровень.

Суммировать :Сохранение постоянного сообщения и сообщения ACK Confirm на бизнес-уровне может в основном решить большинство проблем с надежностью.

5.3 Проблема повторного потребления

Во время создания сообщения MQ внутренне генерирует сообщение для каждого сообщения, отправленного производителем.inner-msg-id,как основа для дедупликации и идемпотентности(доставка сообщения завершается ошибкой и выполняется повторная передача), чтобы избежать дублирования сообщений, попадающих в очередь

При использовании сообщения необходимо, чтобы в теле сообщения был bizId (уникальный в глобальном масштабе для одного и того же бизнеса, такой как идентификатор платежа, идентификатор заказа, идентификатор сообщения и т. д.) в качестве основы для дедупликации и идемпотентности, чтобы чтобы избежать повторного использования одного и того же сообщения.

Распространение сообщений:
Если на очередь подписан хотя бы один потребитель, сообщения будут отправляться потребителям в циклическом режиме.

5.4 Проблемы с порядком сообщений

Когда RabbitMQ имеет несколько потребителей на узле, несмотря на наличие механизма подтверждения, все потребление не блокируется, что приводит к разной производительности каждого узла и разной эффективности потребления.Быстрые узлы, вероятно, будут иметь избыточное потребление

Одолжите лестницу этого старика, чтобы увидеть @woohoo.process on.com/view/5 из 5 часть 6…

image.png

решение :
1. Несколько очередей соответствуют одному потребителю
2. Единая очередь и один потребитель

5.5 Проблема сбоя потребления

  1. Гарантированное сохранение сообщения
  2. Обработка ошибок потребления и обратный вызов (ErrorHandler)

5.6 Проблема ограничения тока потребления

RabbitMQ предоставляет функцию qos (Quality of Service): при неавтоматическом подтверждении сообщений, если определенное количество сообщений не используется, новые сообщения не будут использоваться.

  • prefetchSize: предельный размер одного сообщения, обычно 0
  • prefetchCount: Сообщите rabbitmq, чтобы он не отправлял потребителю более N сообщений за раз, пока потребитель не подтвердит активный ответ.
  • глобальный: true\false, следует ли применять вышеуказанные настройки к каналу, то есть являются ли вышеуказанные настройки уровнем канала или уровнем потребления, как правило, false

AbstractMessageListenerContainerФункционал следующих методов представлен в

6. Показатели эффективности

TODO

Суммировать

Я чувствую, что документ не достиг ожидаемого эффекта.Ожидается найти настраиваемую точку через официальный документ + источник исходного кода, но этот эффект не достигается путем его записи.Подумав об этом, это в основном потому, что Кроличья система слишком большая, официальная Документ очень длинный, и трудно найти точку прорыва.Есть много-много вещей, которые не упомянуты во всей статье, и исходный код не очень подробный.

В ближайшие несколько дней я решил с этим побороться, и посмотреть будет ли это отдельной главой или только углубленно в этой главе.Удачи!

Список изменений:

20210407: Улучшите часть Транзакции и улучшите другие мелкие детали.
20210408: Диаграмма классов завершения плана