Надежность RabbitMQ, повторное потребление, последовательность, решение для невыполненных сообщений

задняя часть RabbitMQ

предисловие

Предыдущая статья представилаЗачем вводить очереди сообщений?Внедрение MQ решило для нас некоторые проблемы, но в то же время оно принесло некоторые сложные проблемы, которые являются ключевыми моментами, которые необходимо решать в крупных проектах, и, что более важно, часто задаются интервью.На самом деле можно сказать, что очередь сообщений не может гарантировать 100% надежность! Соответствующий механизм, предоставляемый RabbitMQ, предназначен только для уменьшения вероятности потери сообщения или для обеспечения функции ведения журнала после потери сообщения.При решении этих проблем необходимо понимать, что на самом деле у небольших компаний малый объем бизнеса и низкий параллелизм, вряд ли эти проблемы возникнут... Даже если они возникают изредка, разработчики могут вручную исправить обработку данных. Поэтому мы можем объединить реальные бизнес-сценарии компании, чтобы увидеть, нужно ли решать эти задачи.

надежность сообщения

На примере создания заказа может появиться такой бизнес-сценарий

image.png

  • MQ зависает, и сообщение не отправляется. Что делать, если нижестоящие системы нескольких купонов и баллов после создания заказа не производят взаиморасчетов?
  • MQ высокодоступен, сообщение отправлено, но бизнес расчета купона сообщает об ошибке? Поскольку это асинхронно, откатить его непросто.
  • Сообщение отправляется нормально, потребитель также его получает, система продавца и система купонов работают нормально, а бизнес по очкам сообщает об ошибке, а очки не рассчитываются, тогда данные этого заказа несовместимы.

Чтобы решить вышеуказанные проблемы, необходимо убедиться, что сообщение должно потребляться надежно, тогда мы можем проанализировать, какие шаги сообщения пойдут не так. RabbitMQ отправляет такое сообщение:

image.png

Сообщение отправляется производителем на указанный обмен и направляется в связанную очередь в соответствии с правилами маршрутизации, а затем передается потребителю. Во время этого процесса могут быть сценарии, когда сообщение идет не так:

  • Сообщение производителя не дошло до коммутатора, что эквивалентно потере сообщения производителем.
  • Коммутатор не направляет сообщение в очередь, что эквивалентно потере сообщения производителем.
  • Простой RabbitMQ приводит к потере сообщений в очередях и очередях, что эквивалентно потере сообщений в RabbitMQ
  • Потребительское потребление ненормально, а бизнес не выполняется, что равносильно потере потребителями сообщений.

Производитель теряет сообщение

RabbitMQ предоставляет механизмы подтверждения и отката.Присутствует асинхронный механизм мониторинга.Каждый раз при отправке сообщения, если оно успешно/неуспешно отправлено на коммутатор, может срабатывать мониторинг, а также будет мониторинг при маршрутизации от коммутатора в очередь не удается. Вам нужно только включить эти два механизма мониторинга.В качестве примера возьмем интеграцию SpringBoot с RabbitMQ.

Внедрить стартер зависимостей

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>

конфигурационный файл

rabbitmq:
    publisher-returns: true
    publisher-confirm-type: correlated #新版本 publisher-confirms: true 已过时

Затем напишите обратный вызов слушателя

@Configuration
@Slf4j
public class RabbitMQConfig {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void enableConfirmCallback() {
        //confirm 监听,当消息成功发到交换机 ack = true,没有发送到交换机 ack = false
        //correlationData 可在发送时指定消息唯一 id
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            if(!ack){
                //记录日志、发送邮件通知、落库定时任务扫描重发
            }
        });
        
        //当消息成功发送到交换机没有路由到队列触发此监听
        rabbitTemplate.setReturnsCallback(returned -> {
            //记录日志、发送邮件通知、落库定时任务扫描重发
        });
    }
}

При тестировании можно умышленно написать неправильное имя коммутатора и ключ маршрутизации при отправке сообщения, и тогда оно перезвонит только что написанному нами методу мониторинга, потому что покажет нам конкретную причину, по которой оно не было отправлено на коммутатор ; возвращаемый объект содержит информацию, связанную с сообщением.

На самом деле, насколько мне известно, некоторые компании не будут ретранслировать на эти два монитора, почему? Цена слишком высока... Во-первых, очень низка возможность потерять сам RabbitMQ, а во-вторых, если вам нужно сбросить библиотеку, а затем использовать запланированное задание для сканирования и повторной отправки, вам потребуется разработать кучу кода, распределенные задачи по расписанию... Во-вторых, сканирование задач по расписанию определенно увеличит задержку сообщения, что не обязательно. Реальный бизнес-сценарий состоит в том, чтобы записать лог, что удобно для отслеживания проблемы, и попутно отправить электронное письмо соответствующему персоналу.Если крайне редко производитель теряет сообщение, то разработка может дополнить данные в база данных.

RabbitMQ теряет сообщения

Без включенного сохраняемости все очереди и сообщения исчезнут после перезапуска RabbitMQ, поэтому мы устанавливаем сохраняемость при создании очереди, а затем устанавливаем сохраняемость сообщений при отправке сообщений (просто установите для deliveryMode значение 2). Вообще говоря, настойчивость необходима в реальном бизнесе.

Потребители теряют сообщения

Так называемая потеря сообщений на стороне потребителя означает, что сторона потребителя выполняет бизнес-код и сообщает об ошибке, поэтому дело, которое должно быть выполнено, фактически не выполняется. Например, если заказ создан успешно и сообщается об ошибке расчета купона, по умолчанию RabbitMQ будет думать, что сообщение было использовано, пока он отправляет сообщение потребителю и удаляет его из очереди, но купон не был урегулирован, что эквивалентно сообщению Lost in disguise. Такая ситуация до сих пор очень распространена, ведь наши разработчики не могут гарантировать, что их код не будет сообщать об ошибках, такого рода проблемы необходимо решать. В противном случае пользователь размещает заказ, купон не списывается, и ваша оценка эффективности за этот месяц пропадает...

RabbitMQ предоставляет нам механизм ответа потребителя (подтверждения).По умолчанию этот механизм является автоматическим ответом. Пока сообщение передается потребителю, он автоматически подтверждает, а затем RabbitMQ удаляет сообщение в очереди. После включения ручного подтверждения RabbitMQ удалит сообщение из очереди только после того, как мы вызовем API на стороне потребителя для подтверждения ручного подтверждения.

Сначала включите ручное подтверждение в файле конфигурации

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: manual #手动应答

Затем вручную ответьте на сообщение о выходе в потребительском коде.

    @RabbitListener(queues = "queue")
    public void listen(String object, Message message, Channel channel) {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        log.info("消费成功:{},消息内容:{}", deliveryTag, object);
        try {
            /**
             * 执行业务代码...
             * */
            channel.basicAck(deliveryTag, false);
        } catch (IOException e) {
            log.error("签收失败", e);
            try {
                channel.basicNack(deliveryTag, false, true);
            } catch (IOException exception) {
                log.error("拒签失败", exception);
            }
        }
    }

Шагая по яме

Если вы используете код вышеприведенной схемы в производственной среде, как только произойдет ошибка потребления, вы столкнетесь с ошибкой. Поскольку третий параметр метода basicNack указывает, возвращаться ли в очередь, если вы заполните false, сообщение будет отброшено напрямую, что равносильно тому, что не гарантируется надежность сообщения. Если вы заполните значение true, при возникновении ошибки потребления сообщение будет возвращено в начало очереди сообщений, продолжится отправка потребителю и продолжит потребление сообщения.Обычно ошибка кода не будет разрешено повторной попыткой, поэтому это сообщение произойдет: продолжать потребляться, продолжать сообщать об ошибках, возвращаться в очередь, продолжать потребляться... бесконечный цикл

Так что реальная сцена это вообще три варианта

  • Когда потребление терпит неудачу, это сообщение сохраняется в Redis, и количество потребления записывается.Если потребление терпит неудачу три раза, сообщение отбрасывается, а журнал сохраняется в базе данных.
  • Заполните false напрямую, не возвращайтесь в очередь, записывайте логи, отправляйте электронные письма и ждите ручной обработки разработчиками.
  • Не включайте ручное подтверждение, повторите попытку с сообщениями, предоставленными SpringBoot.

Повторы сообщений, предоставляемые SpringBoot

На самом деле, во многих сценариях нет необходимости включать режим ответа потребителя, потому что SpringBoot предоставляет нам механизм повтора: когда бизнес-метод, выполняемый потребителем, сообщает об ошибке, он повторяет попытку выполнения бизнес-метода потребителя.

Включите механизм повторных попыток, предоставляемый SpringBoot.

spring:
  rabbitmq:
    listener:
      simple:
        retry:
          enabled: true
          max-attempts: 3 #重试次数

потребительский код

    @RabbitListener(queues = "queue")
    public void listen(String object, Message message, Channel channel) throws IOException {
        try {
            /**
             * 执行业务代码...
             * */
            int i = 1 / 0; //故意报错测试
        } catch (Exception e) {
            log.error("签收失败", e);
            /**
             * 记录日志、发送邮件、保存消息到数据库,落库之前判断如果消息已经落库就不保存
             * */
            throw new RuntimeException("消息消费失败");
        }
    }

Обратите внимание, что вы должны вручную генерировать исключение, потому что SpringBoot инициирует повторную попытку на основе возникновения неперехваченных исключений в методе.. Стоит отметить, что этот повтор предоставляется SpringBoot для повторного выполнения метода потребителя вместо того, чтобы позволить RabbitMQ повторно отправить сообщение.

Сводка по надежности сообщений

На самом деле, после тщательного исследования вы обнаружите, что так называемая надежность сообщений сама по себе не может быть гарантирована... Так называемые различные механизмы надежности предназначены только для предоставления запрашиваемых журналов на случай потери сообщений в будущем, но эти механизмы стоят немного (огромных денег). ) стоимость действительно способна снизить вероятность потери сообщения

порядок сообщений

В некоторых бизнес-сценариях сообщения должны обрабатываться последовательно, например, при использовании канала для подписки на двоичный журнал MySQL для обновления Redis.Обычно мы отправляем изменения данных, подписанные каналом, в очередь сообщений.

image.png

Если последовательное использование RabbitMQ не гарантируется, в Redis могут появиться грязные данные.

один экземпляр потребителя

На самом деле сама очередь в порядке, но экземпляры службы производственной среды, как правило, представляют собой кластеры.Когда потребителем является несколько экземпляров, сообщения в очереди будут распределяться по всем экземплярам для потребления (одно и то же сообщение может быть отправлено только одному Consumer instance ), поэтому нет гарантии потребления порядка сообщений, потому что вы не можете гарантировать, какая машина быстрее выполнит бизнес-код на стороне потребителя.

image.png

Таким образом, для бизнеса, которому необходимо гарантировать последовательное потребление, мы можем развернуть только один экземпляр потребителя, затем настроить RabbitMQ для отправки только одного сообщения за раз, а затем включить ручное подтверждение, конфигурация выглядит следующим образом.

spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1 #每次只推送一个消息
        acknowledge-mode: manual

Таким образом, RabbitMQ будет выталкивать из очереди только одно сообщение за раз.После завершения обработки мы подтверждаем ответ и потребляем следующее, чтобы обеспечить порядок сообщений.

Несколько экземпляров потребителей

В случае экземпляров RabbitMQ с множественным потреблением очень сложно обеспечить порядок сообщений, а деталей много.Одним предложением: я не буду...

Повторное потребление сообщения (идемпотентность)

Это также распространенный сценарий в бизнесе производственной среды.Мой блог использует RabbitMQОчень странно, что журнал часто показывает, что сообщение потребляется дважды.

У нас есть два подхода к решению проблемы повторного потребления сообщений: первый — предотвратить повторное использование сообщения потребителем, а второй — разрешить многократное использование, но это не повлияет на мои бизнес-данные.

Убедитесь, что потребитель выполняет его только один раз

Вообще говоря, повторное потребление сообщений потребляется несколько раз за короткий момент.Мы можем использовать Redis для хранения уникального идентификатора потребляемого сообщения, а затем определить, существует ли уже этот идентификатор в Redis, до выполнения потребительского бизнеса. Например, после того, как в заказе используется купон, необходимо уведомить систему купонов, чтобы увеличить поток использования. Здесь вы можете использовать номер заказа + идентификатор купона в качестве уникальной идентификации. В начале бизнеса сначала оценивается, существует ли уже этот идентификатор в redis, и если он уже существует, то представитель обработал его. Если он не существует, поместите его в Redis, чтобы установить время истечения срока действия и выполнить бизнес.

    Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent("orderNo+couponId");
    //先检查这条消息是不是已经消费过了
    if (!Boolean.TRUE.equals(flag)) {
        return;
    }
    //执行业务...
    //消费过的标识存储到 Redis,10 秒过期
    stringRedisTemplate.opsForValue().set("orderNo+couponId","1", Duration.ofSeconds(10L));

Разрешить потребителю выполняться несколько раз, чтобы гарантировать, что данные не будут затронуты

  • Ограничение уникального ключа базы данных

Если потребительский бизнес является новой операцией, мы можем использовать ограничение уникального ключа базы данных, такое как номер купона в таблице потока купонов.Если есть повторное потребление, будут вставлены две записи с одним и тем же номером купона, а база данных выдаст нам ошибку.Убедитесь, что данные базы данных не будут вставлены в два.

  • Идея оптимистичной блокировки базы данных

Если потребительский бизнес — это операция обновления, можно добавить в бизнес-таблицу поле версии.Каждое обновление принимает версию как условие, а после обновления — версию +1. Поскольку innoDB в MySQL является блокировкой строк, когда один из запросов успешно обновляется, может прийти другой запрос. Поскольку версия с номером версии стала равной 2, количество строк, затронутых оператором SQL, которые должны быть обновлены, равно 0, что приведет к не влияет на данные базы данных.

сообщение (стекирование) отставание

Так называемый отставание сообщений обычно возникает из-за того, что скорость потребления потребителя намного ниже, чем скорость отправки сообщений производителем, что приводит к большому количеству сообщений, которые не могут быть обработаны в очереди RabbitMQ.

На самом деле, я не знаю, почему мне так нравится задавать вопросы в интервью... Поскольку скорость потребителей не может угнаться за производителями, достаточно увеличить скорость потребителей! Лично я думаю, что есть следующие идеи

  • Надлежащим образом ограничить ток интерфейса сообщений производителя (не рекомендуется, это повлияет на работу пользователя)
  • Разверните еще несколько потребительских экземпляров (рекомендуется)
  • Надлежащим образом увеличьте количество предварительных выборок, чтобы потребитель мог получать больше сообщений за раз (рекомендуется, можно использовать со вторым решением).

Эпилог

Если эта статья была вам полезна, не забудьте поставить лайк и подписаться. Ваша поддержка является движущей силой для меня продолжать творить!