Очередь недоставленных сообщений RabbitMQ (5)

задняя часть RabbitMQ
Очередь недоставленных сообщений RabbitMQ (5)

Это первый раз, когда я участвую в Gengwen Challenge.23День, подробности о событии уточняйте:Обновить вызов

Со временем капли воды и камни изнашиваются 😄

Что такое очередь недоставленных сообщений

DLX ,全称为 Dead-Letter-Exchange, который можно назвать переключателем мертвой буквы. Когда сообщение становится мертвой буквой в очереди, его можно повторно отправить на другой обмен, которым является DLX.Очередь, связанная с DLX, называется очередью недоставленных сообщений..

Сообщения становятся мертвыми буквами, как правило, из-за следующих условий:

  • 1. Срок действия сообщения истекает, что является TTL, упомянутым автором в предыдущей статье. Время существования сообщения в очереди превышает установленное время TTL.

  • 2. Сообщение было отклонено и вызваноchannel.basicNackилиchannel.basicRejectспособ, ну и наборrequeueПараметрыfalse.

  • 3. Сообщение очереди достигает максимальной длины.

DLX это тоже обычный обмен, который ничем не отличается от общего обмена, его можно указать на любой очереди, по сути это установка свойств определенной очереди. Когда в этой очереди есть мертвые письмаRabbitMQЭто сообщение будет автоматически опубликовано в установленном DLX, а затем перенаправлено в другую очередь, очередь недоставленных сообщений. Сообщения в этой очереди можно отслеживать для соответствующей обработки.

Настройка очереди недоставленных сообщений

существуетchannel.queueDeclareнабор методовx-dead-letter-exchangeпараметр для добавления DLX в эту очередь. Далее описывается настройка и использование очередей недоставленных сообщений в первом и втором случаях.

срок действия сообщения истек

добавить конфигурацию

mq:
  queueBinding:
    queue: prod_queue_pay
    dlQueue: dl-queue
    exchange:
      name: exchang_prod_pay
      dlTopicExchange: dl-topic-exchange
    key: prod_pay
    dlRoutingKey: dl-routing-key

Создавайте обмен недоставленными сообщениями, очереди недоставленных сообщений и привязки между двумя


    @Value("${mq.queueBinding.exchange.dlTopicExchange}")
    private String dlTopicExchange;
    
    @Value("${mq.queueBinding.dlRoutingKey}")
    private String dlRoutingKey;
    
    @Value("${mq.queueBinding.dlQueue}")
    private String dlQueue;
    
    //创建死信交换机
    @Bean
    public TopicExchange dlTopicExchange(){
        return new TopicExchange(dlTopicExchange,true,false);
    }
    
    //创建死信队列
    @Bean
    public Queue dlQueue(){
        return new Queue(dlQueue,true);
    }
    
    //死信队列与死信交换机进行绑定
    @Bean
    public Binding BindingErrorQueueAndExchange(Queue dlQueue, TopicExchange dlTopicExchange){
        return BindingBuilder.bind(dlQueue).to(dlTopicExchange).with(dlRoutingKey);
    }

Вы можете видеть, что в приведенном выше коде нет ничего особенного, просто создайте переключатель и очередь и свяжите их вместе.

Создавайте бизнес-очереди, бизнес-обмены и привязки между двумя

    @Value("${mq.queueBinding.queue}")
    private String queueName;
    
    @Value("${mq.queueBinding.exchange.name}")
    private String exchangeName;
    
    @Value("${mq.queueBinding.key}")
    private String key;
    
    private final String dle = "x-dead-letter-exchange";
    private final String dlk = "x-dead-letter-routing-key";
    private final String ttl = "x-message-ttl";
    /**
     * 业务队列
     * @return
     */
    @Bean
    public Queue payQueue(){
        Map<String,Object> params = new HashMap<>();
         //设置队列的过期时间
         //队列中所有消息都有相同的过期时间
        params.put(ttl,10000);
        //声明当前队列绑定的死信交换机
        params.put(dle,dlTopicExchange);
        //声明当前队列的死信路由键 如果没有指定,则使用原队列的路由键:
        params.put(dlk,dlRoutingKey);
        
        return QueueBuilder.durable(queueName).withArguments(params).build();
    }
    
    @Bean
    public TopicExchange payTopicExchange(){
        return new TopicExchange(exchangeName,true,false);
    }
    
    //队列与交换机进行绑定
    @Bean
    public Binding BindingPayQueueAndPayTopicExchange(Queue payQueue, TopicExchange payTopicExchange){
        return BindingBuilder.bind(payQueue).to(payTopicExchange).with(key);
    }

Приведенный выше код ключа业务队列создание. Указано время истечения очереди и настроена очередь недоставленных сообщений.

код производителя

/*
 * 生产者
 */
@Component
@Slf4j
public class RabbitSender {

    @Value("${mq.queueBinding.exchange.name}")
    private String exchangeName;

    @Value("${mq.queueBinding.key}")
    private String key;

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void send(String msg){
        log.info("RabbitSender.send() msg = {}",msg);
        // 将消息发送给业务交换机
        rabbitTemplate.convertAndSend(exchangeName,key,msg);
    }

}

Обеспечьте внешний подход

    @Autowired
    private RabbitSender rabbitSender;

    @GetMapping
    public void test(@RequestParam String msg){
        rabbitSender.send(msg);
    }

Запустив службу, вы увидите, что очередь службы, переключатель службы, очередь недоставленных сообщений и переключатель недоставленных сообщений создаются одновременно. И вы можете увидеть метки DLX и DLK в бизнес-очереди.image.png image.pngЗатем вызовите интерфейс:http://localhost:8080/?msg=Бум, сообщение будет отправлено наprod_queue_payэта очередь.

image.pngЕсли ни один потребитель не потребляет сообщение в течение 10 секунд, срок действия сообщения определяется как просроченный. Поскольку DLX установлен, сообщения выбрасываются по истечении срока действия.dlxExchangeпереключатель, в соответствии с настроеннымdlRoutingKeyнайти сdlxExchangeсоответствующая очередьdlQueueПосле сохранения сообщения вdlxQueueв очереди мертвых писем.image.png

сообщение отклонено

Удалите время истечения срока действия бизнес-очереди.

/**
     * 业务队列
     * @return
     */
    @Bean
    public Queue payQueue(){
        Map<String,Object> params = new HashMap<>();
        //声明当前队列绑定的死信交换机
        params.put(dle,dlTopicExchange);
        //声明当前队列的死信路由键 如果没有指定,则使用原队列的路由键:
        params.put(dlk,dlRoutingKey);
        return QueueBuilder.durable(queueName).withArguments(params).build();
    }

новый файл конфигурации

rabbitmq:
    listener:
      simple:
        #消息确认方式 manual 手动确认 auto 自动确认 none 不管
        acknowledge-mode: manual

Добавить потребителей

/**
 * 消费者
 */
@Component
@Slf4j
public class RabbitReceiver {

    //测试消费者进行消费发送异常 是否进入死信队列
    @RabbitListener(queues = "${mq.queueBinding.queue}")
    public void infoConsumption(String data, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
        log.info("收到信息:{}",data);
        boolean ack = false;
        Exception exception = null;
        try {
            if(data.contains("888")){
               throw new RuntimeException("信息敏感");
            }
        } catch (RuntimeException e) {
            ack = true;
            exception = e;
        }
        if (ack){
            log.error("消息消费发生异常,error msg:{}", exception.getMessage(), exception);
         //注意第三个参数需要为false
        //true则重新放入原队列,否则丢弃或者进入死信队列。
            channel.basicNack(tag, false, false);
        } else {
        //进行手动确认信息已经被消费
            channel.basicAck(tag, false);
        }
    }
}

В приведенном выше кодеchannel.basicNackметод, если третий параметр установлен вtrue, возникнет проблема с бесконечными повторами. В следующей статье автор расскажет, как решить проблему бесконечных повторных попыток.

Удалите предыдущую очередь, затем перезапустите и повторно создайте бизнес-очередь. можно увидеть业务队列Метка имеет меньше TTL.

image.pngИнтерфейс запроса:http://localhost:8080/?msg=888Из-за конфиденциального сообщения бизнес будет ненормальным, и сообщение будет перенаправлено в очередь недоставленных сообщений.image.png

  • Если у вас есть какие-либо вопросы по этой статье или есть ошибки в этой статье, пожалуйста, оставьте комментарий. Если вы считаете, что эта статья была вам полезна, ставьте лайк и подписывайтесь на нее.