Используя механизм подтверждения сообщения springboot + rabbitmq, я чувствую, что попал в яму

Java
Используя механизм подтверждения сообщения springboot + rabbitmq, я чувствую, что попал в яму

Эта статья размещена в личном блоге:www.chengxy-nds.top, совместное использование технических ресурсов и совместный прогресс

Недавно отдел призвал всех организовать больше сеансов обмена техническими данными, сказав, что это должно активировать техническую атмосферу компании, но я все видел и знаю, что эта ТМ для чистки.KPI. Тем не менее, это действительно хорошо: вместо этих скучных совещаний по спорам очень полезно для личностного роста проводить больше технических обменов.

Так что я взял на себя инициативу зарегистрироваться, чтобы участвовать в обмене, кхе-кхе~, на самом деле не для этого.KPI, Я просто хочу учиться с вами, ребята!

在这里插入图片描述
вставьте сюда описание изображения

На этот раз я делюсьspringboot + rabbitmqКак реализовать механизм подтверждения сообщения и немного опыта в реальной разработке, на самом деле, общий контент относительно прост, и иногда вещи настолько волшебны, что чем проще, тем легче они могут пойти не так.

можно увидеть б/уRabbitMQВ дальнейшем наши деловые связи стали значительно длиннее, хотя развязка между системами была достигнута, количество сценариев, которые могут привести к потере сообщений, также увеличилось. Например:

  • производитель сообщений -> сервер rabbitmq (отправка сообщения не удалась)

  • Сбой самого сервера rabbitmq приводит к потере сообщений

  • потребитель сообщений -> служба rabbitmq (не удалось использовать сообщение)

在这里插入图片描述Поэтому по возможности старайтесь не использовать middleware, если вы используете его ради использования, это только добавит проблем. После включения механизма подтверждения сообщения, несмотря на то, что точная доставка сообщения в значительной степени гарантирована из-за частых взаимодействий с подтверждением,rabbitmqОбщая эффективность становится ниже, а пропускная способность серьезно падает.Настоятельно не рекомендуется использовать механизм подтверждения сообщения для сообщений, которые не очень важны.


Далее реализуемspringboot + rabbitmqМеханизм подтверждения сообщения, а затем сделать конкретный анализ возникших проблем.

1. Подготовьте среду

1. Представьте пакет зависимостей rabbitmq

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2. Измените конфигурацию application.properties.

Его нужно включить в настройках发送端и消费端подтверждение сообщения.

spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

# 发送者开启 confirm 确认机制
spring.rabbitmq.publisher-confirms=true
# 发送者开启 return 确认机制
spring.rabbitmq.publisher-returns=true
####################################################
# 设置消费端手动 ack
spring.rabbitmq.listener.simple.acknowledge-mode=manual
# 是否支持重试
spring.rabbitmq.listener.simple.retry.enabled=true

3. Определите обмен и очередь

определить переключательconfirmTestExchangeи очередьconfirm_test_queueи привязать очередь к обмену.

@Configuration
public class QueueConfig {

    @Bean(name = "confirmTestQueue")
    public Queue confirmTestQueue() {
        return new Queue("confirm_test_queue", true, false, false);
    }

    @Bean(name = "confirmTestExchange")
    public FanoutExchange confirmTestExchange() {
        return new FanoutExchange("confirmTestExchange");
    }

    @Bean
    public Binding confirmTestFanoutExchangeAndQueue(
            @Qualifier("confirmTestExchange") FanoutExchange confirmTestExchange,
            @Qualifier("confirmTestQueue") Queue confirmTestQueue) {
        return BindingBuilder.bind(confirmTestQueue).to(confirmTestExchange);
    }
}

rabbitmqПодтверждение сообщения делится на две части: подтверждение отправки сообщения и подтверждение получения сообщения.

在这里插入图片描述
вставьте сюда описание изображения

2. Подтверждение отправки сообщения

Подтверждение отправки сообщения: используется для подтверждения производителяproducerотправить сообщениеbroker,brokerвключитьexchangeрепост в очередьqueueВо время процесса, было ли сообщение успешно доставлено.

Сообщение отproducerприбытьrabbitmq brokerсуществует одинconfirmCallbackРежим подтверждения.

Сообщение отexchangeприбытьqueueПроизошел сбой доставкиreturnCallbackЗадний режим.

Мы можем использовать эти дваCallbackОбеспечить 100% доставку товара.

1. ConfirmCallback режим подтверждения

сообщение, покаrabbitmq brokerСрабатывает при полученииconfirmCallbackПерезвони .

@Slf4j
@Component
public class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback {
    
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {

        if (!ack) {
            log.error("消息发送异常!");
        } else {
            log.info("发送者爸爸已经收到确认,correlationData={} ,ack={}, cause={}", correlationData.getId(), ack, cause);
        }
    }
}

реализовать интерфейсConfirmCallback, переписывая егоconfirm()метод с тремя параметрамиcorrelationData,ack,cause.

  • correlationData: внутри объекта есть только одинidАтрибут, используемый для указания уникальности текущего сообщения.
  • ack: сообщение доставленоbrokerстатус,trueУказывает на успех.
  • cause: Указывает причину сбоя доставки.

но сообщение былоbrokerПолучено может указывать только на то, что оно достигло сервера MQ, и не гарантирует, что сообщение будет доставлено адресату.queueвнутри. Итак, далее вам нужно использоватьreturnCallback.

2. Режим возврата обратного вызова ReturnCallback

Если сообщение не может быть доставлено адресатуqueueвызовет обратный вызовreturnCallback, раз кqueueЕсли доставка сообщения не удалась, здесь обычно записываются подробные данные о доставке текущего сообщения, что удобно для последующих операций, таких как повторная передача или компенсация.

@Slf4j
@Component
public class ReturnCallbackService implements RabbitTemplate.ReturnCallback {

    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        log.info("returnedMessage ===> replyCode={} ,replyText={} ,exchange={} ,routingKey={}", replyCode, replyText, exchange, routingKey);
    }
}

реализовать интерфейсReturnCallback, переписатьreturnedMessage()метод, метод имеет пять параметровmessage(Сообщение),replyCode(код ответа),replyText(содержание ответа),exchange(выключатель),routingKey(очередь).

Ниже приводится конкретное сообщение, отправленное вrabbitTemplateустановить вConfirmиReturnобратный звонок, мы передаемsetDeliveryMode()Сохраните сообщение и создайте его для последующего тестирования.CorrelationDataобъект, добавьтеidза10000000000.

@Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private ConfirmCallbackService confirmCallbackService;

    @Autowired
    private ReturnCallbackService returnCallbackService;

    public void sendMessage(String exchange, String routingKey, Object msg) {

        /**
         * 确保消息发送失败后可以重新返回到队列中
         * 注意:yml需要配置 publisher-returns: true
         */
        rabbitTemplate.setMandatory(true);

        /**
         * 消费者确认收到消息后,手动ack回执回调处理
         */
        rabbitTemplate.setConfirmCallback(confirmCallbackService);

        /**
         * 消息投递到队列失败回调处理
         */
        rabbitTemplate.setReturnCallback(returnCallbackService);

        /**
         * 发送消息
         */
        rabbitTemplate.convertAndSend(exchange, routingKey, msg,
                message -> {
                    message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                    return message;
                },
                new CorrelationData(UUID.randomUUID().toString()));
    }

3. Подтверждение получения сообщения

Подтверждение получения сообщения немного проще, чем подтверждение отправки сообщения, потому что есть только одно подтверждение сообщения (ack) процесс. использовать@RabbitHandlerМетод аннотации аннотации должен быть увеличенchannel(канал),messageдва параметра.

@Slf4j
@Component
@RabbitListener(queues = "confirm_test_queue")
public class ReceiverMessage1 {
    
    @RabbitHandler
    public void processHandler(String msg, Channel channel, Message message) throws IOException {

        try {
            log.info("小富收到消息:{}", msg);

            //TODO 具体业务
            
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

        }  catch (Exception e) {
            
            if (message.getMessageProperties().getRedelivered()) {
                
                log.error("消息已重复处理失败,拒绝再次接收...");
                
                channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 拒绝消息
            } else {
                
                log.error("消息即将再次返回队列处理...");
                
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); 
            }
        }
    }
}

Существует три метода получения сообщений, давайте проанализируем значение каждого метода.

1. Базовый доступ

basicAck: Указывает на успешное подтверждение.После использования этого метода получения сообщение будетrabbitmq brokerУдалить.

void basicAck(long deliveryTag, boolean multiple) 

deliveryTag: указывает порядковый номер доставки сообщения.После того, как каждое сообщение использовано или сообщение повторно доставлено,deliveryTagповысится. В ручном режиме подтверждения сообщения мы можем указатьdeliveryTagновости оack,nack,rejectи так далее.

multiple: Подтверждать ли партиями, значение равноtrueбудет одноразовымackВсе сообщения меньше текущегоdeliveryTagНовости.

Возьмите каштан:Предположим, я сначала отправляю три сообщенияdeliveryTagИх 5, 6, 7, но ни один из них не подтвердился, когда я отправляю четвертое сообщениеdeliveryTag8,multipleЕсли установлено значение true, все сообщения 5, 6, 7 и 8 будут подтверждены.

2. Базовый Нак

basicNack: Указывает на подтверждение ошибки.Этот метод обычно используется, когда бизнес сообщений потребления ненормальный, и сообщение может быть повторно доставлено в очередь.

void basicNack(long deliveryTag, boolean multiple, boolean requeue)

deliveryTag: Указывает порядковый номер доставки сообщения.

multiple: Подтверждать ли партиями.

requeue: значениеtrueСообщение будет повторно поставлено в очередь.

3. Базовый отказ

basicReject: отклонить сообщение, сbasicNackРазница в том, что нельзя выполнять пакетные операции, другие способы использования очень похожи.

void basicReject(long deliveryTag, boolean requeue)

deliveryTag: Указывает порядковый номер доставки сообщения.

requeue: значениеtrueСообщение будет повторно поставлено в очередь.

4. Тест

Отправьте сообщение, чтобы проверить, действует ли механизм подтверждения сообщения.Из результата выполнения видно, что отправитель успешно перезванивает после отправки сообщения, а потребитель успешно использует сообщение.在这里插入图片描述Используйте инструмент захвата пакетовWiresharkнаблюдатьrabbitmqМного изменений во взаимодействии протокола amqpackпроцесс.在这里插入图片描述

5. Наступить на бревно ямы

1. Нет подтверждения сообщения

Это очень нетехническая яма, но здесь очень легко ошибиться.

Включите механизм подтверждения сообщений, не забывайте потреблять сообщенияchannel.basicAck, в противном случае сообщение будет существовать всегда, что приведет к повторному потреблению.在这里插入图片描述

2. Неограниченная доставка сообщений

Когда я впервые столкнулся с механизмом подтверждения сообщения, код потребителя был написан следующим образом, идея очень проста: подтвердить сообщение после обработки бизнес-логики,int a = 1 / 0Повторно поставить сообщение в очередь после возникновения исключения.

@RabbitHandler
    public void processHandler(String msg, Channel channel, Message message) throws IOException {

        try {
            log.info("消费者 2 号收到:{}", msg);

            int a = 1 / 0;

            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

        } catch (Exception e) {

            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
        }
    }

Но есть проблема, как только появляется бизнес-кодbugВ 99,9% случаев оно не будет автоматически восстановлено, сообщение будет доставлено в очередь на неопределенное время, а потребитель будет бесконечно его выполнять, в результате чего получится бесконечный цикл.

在这里插入图片描述
вставьте сюда описание изображения

местныйCPUОн был заполнен в одно мгновение, вы можете себе представить, как я паниковал, когда в то время сервис рухнул в производственной среде.

在这里插入图片描述иrabbitmq managementЕсть только одно неподтвержденное сообщение.

在这里插入图片描述
вставьте сюда описание изображения

После тестирования и анализа обнаружено, что при повторной доставке сообщения в очередь сообщений сообщение не возвращается в хвост очереди, а все еще находится в начале очереди.

Потребитель немедленно обработает сообщение, бизнес-процесс выдаст исключение, сообщение будет повторно поставлено в очередь и так далее. Блокирует обработку очереди сообщений, в результате чего обычные сообщения не выполняются.

Наше решение в то время состояло в том, чтобы сначала ответить на сообщение. В это время очередь сообщений удалит сообщение. В то же время мы снова отправляем сообщение в очередь сообщений. Сообщение об исключении помещается в конец очередь сообщений, которая гарантирует, что сообщение не потеряется, но и для обеспечения нормального ведения бизнеса.

channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
// 重新发送消息到队尾
channel.basicPublish(message.getMessageProperties().getReceivedExchange(),
                    message.getMessageProperties().getReceivedRoutingKey(), MessageProperties.PERSISTENT_TEXT_PLAIN,
                    JSON.toJSONBytes(msg));

Однако этот метод не решает фундаментальной проблемы, и сообщение об ошибке все равно будет время от времени сообщаться. Позже количество повторных попыток сообщения оптимизируется и устанавливается. Когда лимит повторных попыток достигнут, вручную подтвердите сообщение, удалите сообщение из очереди и сохранить сообщение вMySQLИ нажмите будильник, выполните ручную обработку и временные задачи для компенсации.

3. Повторное потребление

Как обеспечить идемпотентность потребления MQ, это необходимо определить в соответствии с конкретным бизнесом, вы можете использоватьMySQL,илиredisСохраните сообщение и проверьте атрибут уникальности в сообщении.

demoизGitHubАдрес https://github.com/chengxy-nds/Springboot-Notebook/tree/master/springboot-rabbitmq-confirm


Оригинальность не так проста, содержание вывода волос горит, если есть потеря, пожалуйста, нажмите и поддержите его!

Разобраны и розданы друзьям сотни различных технических электронных книг. Подпишитесь на официальный аккаунт, чтобы ответить【666] Самовывоз. Мы создали группу технического обмена с некоторыми друзьями, чтобы обсуждать технологии и делиться технической информацией, стремясь учиться и развиваться вместе.Если вам интересно, отсканируйте код, чтобы присоединиться к нам!

В этой статье используетсяmdniceнабор текста