Механизм повторных попыток RabbitMQ (11)

RabbitMQ
Механизм повторных попыток RabbitMQ (11)

Это первый раз, когда я участвую в Gengwen Challenge.30День, подробности о событии уточняйте:Обновить вызов

Со временем капли воды и камни изнашиваются 😄

предисловие

Потребители могут столкнуться с исключениями в процессе обработки сообщений, так что же делать с этим аномальным сообщением в данный момент?

RabbitMQЕсть два методаchannel.basicNackилиchannel.basicRejectСообщение может быть возвращено в исходную очередь, чтобы можно было выполнить повторную попытку. Однако, если второе потребление снова ненормально, то потребление было ненормальным все время. Поскольку нет четкого количества повторных попыток, это создаст бесконечные повторные попытки, что является фатальной проблемой.

В этой статье использовалосьspring-rabbitПриходит сretryфункцию для решения этой проблемы.

кодирование

полагаться

starter-amqpсодержитspring-rabbit.

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <version>1.18.16</version>
    <scope>compile</scope>
</dependency>

настроить

Требует простая конфигурация, чтобы включить

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: auto  # 消息确认方式,其有三种配置方式,分别是none、manual(手动ack) 和auto(自动ack) 默认auto
        retry:
          enabled: true  #监听重试是否可用
          max-attempts: 5   #最大重试次数 默认为3
          initial-interval: 2000  # 传递消息的时间间隔 默认1s
    host: 47.105.*
    port: 5672
    virtual-host: /*-1
    username: *
    password: *
mq:
  queueBinding:
    queue: prod_queue_pay
    exchange:
      name: exchang_prod_pay
      type: topic
    key: prod_pay

Создайте бизнес-очередь, переключитесь

@Configuration
public class RabbitConfig {

    @Value("${mq.queueBinding.queue}")
    private String queueName;
    @Value("${mq.queueBinding.exchange.name}")
    private String exchangeName;
    @Value("${mq.queueBinding.key}")
    private String key;
    /**
     * 业务队列
     * @return
     */
    @Bean
    public Queue payQueue(){
        Map<String,Object> params = new HashMap<>();
        return QueueBuilder.durable(queueName).withArguments(params).build();

    }
    
    @Bean
    public TopicExchange payTopicExchange(){
        return new TopicExchange(exchangeName,true,false);
    }
    //队列与交换机进行绑定
    @Bean
    public Binding BindingPayQueueAndPayTopicExchange(Queue payQueue, TopicExchange payTopicExchange){
        return BindingBuilder.bind(payQueue).to(payTopicExchange).with(key);
    }
}

режиссер

@Component
@Slf4j
public class RabbitSender {

    @Value("${mq.queueBinding.exchange.name}")
    private String exchangeName;

    @Value("${mq.queueBinding.key}")
    private String key;

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void send(String msg){
        log.info("RabbitSender.send() msg = {}",msg);
        // 将消息发送给业务交换机
        rabbitTemplate.convertAndSend(exchangeName,key,msg);
    }

}

потребитель

@Component
@Slf4j
public class RabbitReceiver {
  int count  = 0;

    //测试重试
    @RabbitListener(queues = "${mq.queueBinding.queue}")
    public void infoConsumption(String data, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
        log.info("重试次数 = {}",count++);
        int i = 10 /0;
        channel.basicAck(tag,false);
    }
}

Обеспечьте внешний подход

@Controller
public class TestController {

    @Autowired
    private RabbitSender rabbitSender;

    @GetMapping
    public void test(@RequestParam String msg){
        rabbitSender.send(msg);
    }
}

Затем вызовите интерфейс:http://localhost:8080/?msg=Бум, сообщение будет отправлено наprod_queue_payэта очередь. Затем повторите попытку 5 раз.image.pngВременной интервал между каждой повторной попыткой составляет 2 секунды, что соответствует конфигурации.

注意: 重试并不是 RabbitMQ 重新发送了消息到了队列,仅仅是消费者内部进行了重试,换句话说就是重试跟mq没有任何关系。上述消费者代码不能添加try{}catch(){},一旦捕获了异常,在自动 ack 模式下,就相当于消息正确处理了,消息直接被确认掉了,不会触发重试的。Конечно, это не значит, что вы не можете добавитьtry{}catch(){}, но исключение не может быть обработано. Это можно записать следующим образом:

 @RabbitListener(queues = "${mq.queueBinding.queue}")
    public void infoConsumption(String data, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
        log.info("重试次数 = {}",count++);
        try {
            // 处理主要业务
            int i = 10 /0;
        } catch (Exception e) {
            // 处理业务失败,还要进行其他操作,比如记录失败原因
            log.info("记录失败原因 ====>");
            throw new RuntimeException("手动抛出");
        }
        channel.basicAck(tag,false);
    }

image.png

MessageReCoverer

В приведенном выше примере мы также обнаружили проблему в тесте, то есть после 5 повторных попыток консоль выводит ненормальный лог стека, а затем данные в очереди также аккуются (т.к. я настроил авто, модель автоматического акка). Если вы настроитеmanual(Ручное подтверждение), результаты будут следующими:

image.pngПосле пяти попыток потребление находится в неподтвержденном состоянии. Потому что вам нужно вручную акк! В следующий раз, когда служба будет перезапущена, она продолжит использовать это сообщение.

Во-первых, давайте посмотрим, что представляет собой этот журнал исключений:

org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: 
Retry Policy Exhausted

Причина приведенного выше исключения заключается в том, что сборкаSimpleRabbitListenerContainerFactoryConfigurerкласс используетсяMessageRecovererинтерфейс, этот интерфейс имеетrecoverМетод используется для обработки сообщения после завершения повторной попытки.Исходный код выглядит следующим образом:

public final class SimpleRabbitListenerContainerFactoryConfigurer
		extends AbstractRabbitListenerContainerFactoryConfigurer<SimpleRabbitListenerContainerFactory> {

	@Override
	public void configure(SimpleRabbitListenerContainerFactory factory, ConnectionFactory connectionFactory) {
		PropertyMapper map = PropertyMapper.get();
		RabbitProperties.SimpleContainer config = getRabbitProperties().getListener().getSimple();
		configure(factory, connectionFactory, config);  >> 1
		map.from(config::getConcurrency).whenNonNull().to(factory::setConcurrentConsumers);
		map.from(config::getMaxConcurrency).whenNonNull().to(factory::setMaxConcurrentConsumers);
		map.from(config::getBatchSize).whenNonNull().to(factory::setBatchSize);
	}

}

Примечание помечено как>> 1изconfigureметод

ListenerRetry retryConfig = configuration.getRetry();
if (retryConfig.isEnabled()) {
    RetryInterceptorBuilder<?, ?> builder = (retryConfig.isStateless()) ? RetryInterceptorBuilder.stateless()
        : RetryInterceptorBuilder.stateful();
    RetryTemplate retryTemplate = new RetryTemplateFactory(this.retryTemplateCustomizers)
        .createRetryTemplate(retryConfig, RabbitRetryTemplateCustomizer.Target.LISTENER);
    builder.retryOperations(retryTemplate);
    MessageRecoverer recoverer = (this.messageRecoverer != null) ? this.messageRecoverer
        : new RejectAndDontRequeueRecoverer(); //<1>
    builder.recoverer(recoverer);
    factory.setAdviceChain(builder.build());

Внимательно следить<1>Код в , по умолчанию используетсяRejectAndDontRequeueRecovererКласс, этот класс уже появился, обратите внимание на картинки перед автором. По названию класса видно, что роль класса реализацииОтклонено, и сообщение не будет отправлено обратно в очередьТо есть после повторения, если это не успешно, считается, что сообщение не может быть сохранено, чтобы отказаться от него. Мы можем посмотреть на этот конкретный класс реализации:

public class RejectAndDontRequeueRecoverer implements MessageRecoverer {
    protected Log logger = LogFactory.getLog(RejectAndDontRequeueRecoverer.class); // NOSONAR protected
    @Override
    public void recover(Message message, Throwable cause) {
        if (this.logger.isWarnEnabled()) {
                this.logger.warn("Retries exhausted for message " + message, cause);
        }
        throw new ListenerExecutionFailedException("Retry Policy Exhausted",
                                new AmqpRejectAndDontRequeueException(cause), message);
    }
}

Приведенный выше исходный код дает источник исключения.

MessageRecovererтолько один интерфейсrecoverметод, обратный вызов для сообщений, которые были использованы, но все повторные попытки оказались неудачными.image.png

переписанныйrecoverСуществует четыре класса методов,MessageBatchRecovererЭто за пределами объема этой статьи.image.pngиRejectAndDontRequeueRecovererФункцию уже видели, она ведь дефолтная. Затем есть два других класса реализации, а именноRepublishMessageRecovererиImmediateRequeueMessageRecoverer, что означает соответственноПовторно опубликуйте сообщение и немедленно вернитесь в исходную очередь., давайте проверим действие этих двух классов реализации по отдельности.

RepublishMessageRecoverer

Повторно отправить сообщение в указанную очередь. Сначала создайте очередь, затем привяжите ее к коммутатору и установите после привязкиMessageRecoverer. существуетRabbitConfigУвеличение класса кода. иочередь недоставленных сообщенийВыглядит примерно так же.

@Autowired
private RabbitTemplate rabbitTemplate;


private static String errorTopicExchange = "error-topic-exchange";
private static String errorQueue = "error-queue";
private static String errorRoutingKey = "error-routing-key";

//创建异常交换机
@Bean
public TopicExchange errorTopicExchange(){
    return new TopicExchange(errorTopicExchange,true,false);
}

//创建异常队列
@Bean
public Queue errorQueue(){
    return new Queue(errorQueue,true);
}
//队列与交换机进行绑定
@Bean
public Binding BindingErrorQueueAndExchange(Queue errorQueue,TopicExchange errorTopicExchange){
    return BindingBuilder.bind(errorQueue).to(errorTopicExchange).with(errorRoutingKey);
}


//设置MessageRecoverer
@Bean
public MessageRecoverer messageRecoverer(){
    //AmqpTemplate和RabbitTemplate都可以
    return new RepublishMessageRecoverer(rabbitTemplate,errorTopicExchange,errorRoutingKey);
}

Запускаем сервис, снова вызываем интерфейс и смотрим результат:

image.pngКак видно из консоли, мы используем настроенную нами конфигурациюRepublishMessageRecoverer, и сообщение повторяется 5 раз напрямую с новымroutingKeyОтправлено на конфигурацию коммутатора, на этот раз для просмотра страницы мониторинга вы можете увидеть оригинал не имеет сообщения в очереди, конфигурация очереди ненормального присутствия в сообщении.image.png

ImmediateRequeueMessageRecoverer

использоватьImmediateRequeueMessageRecoverer, сообщения, которые не удалось повторить, будут немедленно возвращены в исходную очередь. ИсправлятьmessageRecovererметод

@Bean
public MessageRecoverer messageRecoverer(){
    return new ImmediateRequeueMessageRecoverer();
}

Запускаем сервис, снова вызываем интерфейс и смотрим результат:

image.pngПосле 5 попыток вернитесь в очередь, а затем снова используйте, продолжайте повторять 5 попыток и повторяйте, пока сообщение не будет использовано нормально.

Суммировать

Через вышеупомянутый тест, для сообщения, которое все еще ненормально после повторения, вы можете использоватьRepublishMessageRecoverer, отправлять сообщения в другие очереди, а затем обрабатывать их специально для новой очереди.

очередь недоставленных сообщений

В дополнение к вышеперечисленномуRepublishMessageRecoverer, вы также можете использовать очередь недоставленных сообщений для обработки сообщений о неудачных повторных попытках. Это тоже наш обычный способ.

Создавайте обмен недоставленными сообщениями, очереди недоставленных сообщений и привязки между двумя

продолжатьRabbitConfigДобавить конфигурацию

private static String dlTopicExchange = "dl-topic-exchange";
private static String dlQueue = "dl-queue";
private static String dlRoutingKey = "dl-routing-key";

//创建交换机
@Bean
public TopicExchange dlTopicExchange(){
    return new TopicExchange(dlTopicExchange,true,false);
}
//创建队列
@Bean
public Queue dlQueue(){
    return new Queue(dlQueue,true);
}
//队列与交换机进行绑定
@Bean
public Binding BindingDlQueueAndExchange(Queue dlQueue, TopicExchange dlTopicExchange){
    return BindingBuilder.bind(dlQueue).to(dlTopicExchange).with(dlRoutingKey);
}

死信交换机的定义和普通交换机的定义完全相同,队列绑定死信交换机与绑定普通交换机的方式完全相同,死信交换机就是一个普通的交换机,只是换了一个叫法而已,没有什么特殊之处。

Изменить настройку

Изменить конфигурацию бизнес-очереди, а также предоставленную ранееMessageReCovererСделайте комментарий, иначе переключатель недоставленных писем не вступит в силу и будет использовать настроенную нами конфигурацию.MessageReCovererглавный.

    /**
     * 绑定死信交换机需要给队列设置如下两个参数     
     * 业务队列
     * @return
     */
    @Bean
    public Queue payQueue(){
        Map<String,Object> params = new HashMap<>();
        //声明当前队列绑定的死信交换机
        params.put("x-dead-letter-exchange",dlTopicExchange);
        //声明当前队列的死信路由键
        params.put("x-dead-letter-routing-key",dlRoutingKey);
        return QueueBuilder.durable(queueName).withArguments(params).build();
    }
    
     //设置MessageRecoverer
    //@Bean
    //public MessageRecoverer messageRecoverer() {
        //AmqpTemplate和RabbitTemplate都可以
        //return new ImmediateRequeueMessageRecoverer();
   // }
  

Перед запуском службы необходимо удалить созданную ранее очередь, т.к. конфигурация очереди на этот раз была изменена. После успешного запуска вы можете видеть, что очередь обслуживания, очередь недоставленных сообщений, переключатель обслуживания и переключатель недоставленных сообщений создаются одновременно.

image.png

image.pngЛоготипы DLX и DLK появляются в бизнес-очереди, что означает, что коммутатор недоставленных сообщений и ключ маршрутизации недоставленных сообщений были связаны.В это время вызывается производитель для отправки сообщения.После того, как потребитель повторит попытку 5 раз , из-заMessageCoverКласс внедрения по умолчаниюRejectAndDontRequeueRecoverer, а поскольку бизнес-очередь привязана к очереди недоставленных сообщений, сообщение будет удалено из бизнес-очереди и одновременно отправлено в очередь недоставленных сообщений.

image.png image.png

использованная литература

Механизм повторных попыток RabbitMQ

  • Если у вас есть какие-либо вопросы по этой статье или есть ошибки в этой статье, пожалуйста, оставьте комментарий. Если вы считаете, что эта статья была вам полезна, ставьте лайк и подписывайтесь на нее.