Решить проблему потери сообщений RabbitMQ и обеспечить надежность сообщений (1)

очередь сообщений RabbitMQ

Промежуточное программное обеспечение сообщений часто используется в работе для решения проблемы разъединения между системами или проблемы высокого параллелизма и сокращения пиков, но как обеспечить надежность сообщения всегда было большой проблемой.При каких обстоятельствах сообщение исчезает? Как предотвратить потерю сообщений? В этой статье давайте поговорим о том, как решается вопрос надежности сообщения RabbitMQ?

Эта статья разделена на три части

  1. Каковы сценарии потери сообщений RabbitMQ?
  2. Как избежать потери сообщений?
  3. Как спроектировать и развернуть ПО промежуточного слоя для обеспечения надежности сообщений?

Каковы сценарии потери сообщений RabbitMQ?

Во-первых, давайте посмотрим на процесс доставки цикла сообщения:

Разделим рисунок на три части, левая, средняя и правая.Каждая часть будет приводить к потере сообщения.Поговорим о том, как теряются сообщения на каждом этапе подробнее:

1. Производитель создает сообщение для сценария потери сообщения RabbitMQ Server.

1) Вызвано проблемами внешней среды: сервер RabbitMQ не может получать сообщения из-за потери сетевых пакетов, сбоя сети и т. д. Поскольку сеть в производственной среде очень сложная, дрожание сети и потеря пакетов очень распространены. об этой проблеме.Как она была решена.

2) На уровне кода, на уровне конфигурации неполный учет приводит к потере сообщения

Дело 1:
Как правило, производитель использует режим подтверждения для доставки сообщения.Если схема недостаточно строгая, например, сервер RabbitMQ отправит сообщение nack, чтобы уведомить производителя после получения сообщения об ошибке.Если производитель не может отслеживать сообщение или ничего не делает, есть риск потери сообщения;

Случай 2:
После того, как производитель отправит сообщение на биржу, отправленный маршрут не будет привязан к очереди, и сообщение будет потеряно.Конкретный пример будет описан ниже, чтобы гарантировать, что непредвиденная ситуация произойдет, даже если она произойдет, она находится в пределах контролируемый диапазон.

2. Сообщения, хранящиеся на RabbitMQ Server, теряются или имеют недостаточную надежность.

1) Сообщение не полностью сохраняется.При перезагрузке машины все сообщения будут потеряны, и даже Очередь исчезнет.

Если: вы сохраняете только сообщение, а обмен и очередь не сохраняются, это сохранение недействительно. Я помню, что приятель в компании забыл сохранить Очередь до и после перезапуска машины, Очередь исчезла, и, естественно, Сообщение также было потеряно.

2) Проблема одноузлового режима, если нода зависнет, сообщение будет недоступно, бизнес может быть парализован, и остается только ждать

Если реализована схема сохранения сообщений, сообщение будет сохраняться на жестком диске, и сообщение не будет потеряно после перезагрузки машины; но есть еще один крайний случай, диск сервера внезапно выходит из строя (компания столкнулась со многими проблем с диском), и сообщение постоянное. Его нельзя изменить, и оно не находится в состоянии высокой доступности. Следует тщательно рассмотреть производственную среду этого режима.

3) Обычный кластерный режим: когда узел зависает, сообщения на узле не могут быть использованы, а затронутый бизнес парализован.Он может только ждать, пока узел будет перезапущен и доступен (на основе сохраняемости сообщений)

Хотя эта модель немного улучшилась, с несколькими узлами достоверность сообщения по-прежнему не гарантируется.Почему?

Поскольку режим кластера RabbitMQ немного особенный, содержимое очереди существует только на определенном узле, а не на всех узлах.Все узлы хранят только структуру сообщения и метаданные (которые можно понимать как индекс, что также для улучшения производительность.Синхронизация содержимого на всех узлах требует дополнительных затрат). Ниже я нарисовал картинку, чтобы представить потерю сообщений в общем кластере:

Здесь есть три узла, обычно один дисковый узел и два узла памяти. Прежде всего, содержимое очереди 1 существует только на узле примечание 1, которое было исправлено при создании очереди. Примечания 2 и примечание 3 хранят только метаданные. Это должно Уточните, что Производитель отправляет сообщение на note2, note2 будет синхронизировать метаданные с другими узлами, а содержимое будет синхронизировано с note1.

Дальше думаем о проблеме Q1 на рисунке, note1 зависает, все Очереди этой ноды временно недоступны, и после восстановления нода будет доступна.

Поговорим о проблеме в Note 2 на картинке.Продюсер отправляет сообщение на note2.Note2 зависает перед синхронизацией note1.Как ваше настроение в это время? . . Конкретные стратегии будут обсуждаться позже.

4) Зеркальный режим: Вышеуказанные проблемы можно решить, но все еще бывают непредвиденные ситуации.

Например, в процессе сохранения персистентного сообщения на жесткий диск текущий узел очереди зависает, жесткий диск узла хранения снова выходит из строя, и сообщение теряется, что делать? Подробности будут даны ниже

3. Сообщение от сервера RabbitMQ к потребителю потеряно

  1. После того, как сторона потребителя получит соответствующее сообщение, сторона потребителя не успела обработать сообщение, и машина на стороне потребителя не работает.В это время, если сообщение не будет обработано должным образом, возникнет риск потери.I мы поговорим о том, как поступить в этой ситуации позже.На стороне потребителя также есть механизм подтверждения.

Как избежать потери сообщений?

Следующее также вводится с трех аспектов:

1. Производитель создает сообщения для гарантии надежности RabbitMQ Server.

2. Как обеспечить сохранение сообщений на RabbitMQ Server

3. Как не потерять сообщения от RabbitMQ Server для потребителей

1. Производитель создает сообщения для гарантии надежности RabbitMQ Server.

В этом процессе сообщение может быть потеряно, например, из-за потери сетевого пакета, сбоя сети и т. д. Как правило, если не предпринимать никаких мер, производитель не может понять, было ли сообщение отправлено на биржу. Если производитель может воспринять Если да, он может выполнить дальнейшие действия по обработке, такие как повторная доставка соответствующего сообщения, чтобы гарантировать надежность сообщения.

1.1 Обычно есть решение: механизм транзакций, предоставляемый протоколом AMQP.

Интерфейс Channel в клиенте RabbitMQ предоставляет несколько методов, связанных с механизмом транзакций:

channel.txSelect

channel.txCommit

channel.txRollback

Скриншот исходного кода выглядит следующим образом: общедоступный интерфейс Channel extendsShutdownNotifier {} интерфейс в пакете com.rabbitmq.client

Прежде чем производитель отправит сообщение, запустите транзакцию через channel.txSelect, а затем отправьте сообщение, В случае сбоя сервера доставки сообщений выполняется откат транзакции channel.txRollback, а затем повторная отправка. Если сервер получает сообщение, зафиксируйте транзакцию channel.txCommit.

Однако мало кто делает это, потому что это синхронная операция.После отправки сообщения отправитель будет заблокирован, чтобы дождаться ответа от сервера RabbitMQ, прежде чем продолжить отправку следующего сообщения.Пропускная способность и производительность созданного сообщения производителем будет значительно улучшено.

1.2 К счастью, RabbitMQ предоставляет улучшенное решение, механизм подтверждения отправителя (подтверждение издателя)

Во-первых, производитель устанавливает канал в режим подтверждения, вызывая метод channel.confirmSelect. После того, как канал перейдет в режим подтверждения, всем сообщениям, опубликованным на канале, будет присвоен уникальный идентификатор (начиная с 1). После того, как сообщение будет доставлено всем сопоставление После того, как очередь установлена, RabbitMQ отправит подтверждение (Basic.Ack) производителю (содержащее уникальный тег deliveryTag сообщения и несколько параметров), который сообщает производителю, что сообщение прибыло в пункт назначения правильно.

На самом деле существует три способа реализации режима подтверждения:

  1. Режим последовательного подтверждения: после того, как производитель отправляет сообщение, он вызывает метод waitForConfirms() и ожидает подтверждения от брокера.Если сервер возвращает false или не возвращается в течение периода ожидания, клиент повторно передает сообщение.
  2. Режим пакетного подтверждения: после того, как производитель отправляет пакет сообщений, он вызывает метод waitForConfirms() и ожидает подтверждения от брокера.
  3. Асинхронный режим подтверждения: укажите метод обратного вызова, который будет вызван производителем после того, как брокер подтвердит одно или несколько сообщений. Давайте рассмотрим эти три режима подтверждения по отдельности.

Серийный номер подтвердить

for(int i = 0;i<50;i++){
    channel.basicPublish(
            exchange, routingKey,
            mandatory, immediate,
            messageProperties,
            message.getContent()
    );
    if (channel.waitForConfirms()) {
        System.out.println("发送成功");
    } else {
        //发送失败这里可进行消息重新投递的逻辑
        System.out.println("发送失败");
    }
}

Режим пакетного подтверждения

for(int i = 0;i<50;i++){
    channel.basicPublish(
            exchange, routingKey,
            mandatory, immediate,
            messageProperties,
            message.getContent()
    );
}
if (channel.waitForConfirms()) {
    System.out.println("发送成功");
} else {
    System.out.println("发送失败");
}

Приведенный выше код является простой версией, рабочая среда определенно не отправляется циклически, а в соответствии с бизнес-ситуацией, Каждая клиентская программа должна публиковать сообщения периодически (каждые х секунд) или количественно (каждые х штук) или их комбинацию, а затем ждать подтверждения от сервера. По сравнению с обычным режимом подтверждения пакетная обработка может значительно повысить эффективность подтверждения.

Но нашли ли вы проблемы?

Проблема 1: Сложная логика пакетной отправки.

Вопрос 2: Как только подтверждение возвращает false или истекает время ожидания, клиенту необходимо повторно отправить все сообщения в этом пакете, что приведет к значительному количеству повторяющихся сообщений, а когда сообщения часто теряются, производительность подтверждения пакета должна быть не возрастающей, а падающей.

Асинхронный режим подтверждения

Channel channel = channelManager.getPublisherChannel(namespaceName);
ProxiedConfirmListener confirmListener = new ProxiedConfirmListener();//监听类
confirmListener.setChannelManager(channelManager);
confirmListener.setChannel(channel);
confirmListener.setNamespace(namespaceName);
confirmListener.addSuccessCallbacks(successCallbacks);
channel.addConfirmListener(confirmListener);
channel.confirmSelect();//开启confirm模式
AMQP.BasicProperties messageProperties = null;
if (message.getProperty() instanceof AMQP.BasicProperties) {
    messageProperties = (AMQP.BasicProperties) message.getProperty();
}
confirmListener.toConfirm(channel.getNextPublishSeqNo(), rawMsg);
for(int i = 0;i<50;i++){
    channel.basicPublish(
            exchange, routingKey,
            mandatory, immediate,
            messageProperties,
            message.getContent()
    );
}

Асинхронный режим требует от вас написания более сложной реализации кода, класса асинхронного мониторинга и мониторинга уведомлений на стороне сервера. Преимущества асинхронной производительности будут значительно улучшены. После отправки вы можете продолжать отправлять другие сообщения. MQServer уведомляет класс прослушивания ConfirmListener на стороне производства: пользователи могут наследовать интерфейс для реализации своих собственных классов реализации для обработки механизма подтверждения сообщений.Здесь код класса наследования опущен, которым является класс ProxiedConfirmListener выше: Вставьте интерфейс, который будет реализован ниже:

package com.rabbitmq.client;

import java.io.IOException;

/**
 * Implement this interface in order to be notified of Confirm events.
 * Acks represent messages handled successfully; Nacks represent
 * messages lost by the broker.  Note, the lost messages could still
 * have been delivered to consumers, but the broker cannot guarantee
 * this.
 */
public interface ConfirmListener {
    /**
    ** handleAck RabbitMQ消息接收成功的方法,成功后业务可以做的事情
    ** 发送端投递消息前,需要把消息先存起来,比如用KV存储,接收到ack后删除
    **/
    void handleAck(long deliveryTag, boolean multiple)
        throws IOException;

    //handleNack RabbitMQ消息接收失败的通知方法,用户可以在这里重新投递消息
    void handleNack(long deliveryTag, boolean multiple)
        throws IOException;
}

Вышеупомянутый интерфейс очень интересен, если это вам, то как его реализовать? Как хранить сообщения до их доставки и как acks и nacks обрабатывают сообщения?

Давайте посмотрим на процесс доставки сообщения асинхронного подтверждения:

Объясните эту картинку:

channel1 непрерывно отправляет 1, 2, 3 сообщения на RabbitMQ-Server, уведомление RabbitMQ-Server возвращает уведомление, которое содержит сообщение подтверждения, возвращенное производителю. deliveryTag в сообщении подтверждения содержит порядковый номер сообщения подтверждения, и есть также параметр Multiple =true, указывающий, что все сообщения до этого порядкового номера были обработаны. Таким образом, количество клиентских и серверных уведомлений уменьшается, а производительность класса повышается.

Сообщение, отправленное каналом 3, не удалось, и рабочей стороне необходимо повторно доставить сообщение о доставке, для чего требуется дополнительный код обработки. Так что же нужно сделать на производственной стороне? Поскольку это асинхронно, производственная сторона должна сохранить сообщение, а затем подтвердить, как поступить с сообщением, уведомленным сервером, поэтому проблема, с которой мы сталкиваемся:

Первое: сохраните сообщение перед отправкой

Второе: отслеживать ack и nack и выполнять обработку ответов

Так как хранить?

Согласно нашему анализу, хранилище SortedMap можно использовать для обеспечения упорядоченности, но есть проблема с высоким параллелизмом. В секунду могут быть доставлены тысячи или даже десятки тысяч сообщений, и если подтверждение сообщения приходится ждать сотни миллисекунд, может возникнуть риск переполнения памяти. Поэтому рекомендуется использовать хранилище KV.Хранилище KV обеспечивает высокий уровень параллелизма, высокую производительность и хорошую производительность.Однако для обеспечения высокой доступности KV единственным недостатком является внедрение стороннего промежуточного программного обеспечения, что увеличивает сложность.

Решил вышеописанную проблему, ниже будет еще одна проблема, еще один случай потери сообщения?

Механизм транзакции и механизм подтверждения издателя гарантируют, что сообщение может быть отправлено в RabbitMQ правильно. Значение «отправить в RabbitMQ» здесь означает, что сообщение отправлено в обмен RabbitMQ правильно. Если у обмена нет соответствующей очереди, Тогда сообщение тоже пропадет, что делать?

Здесь есть два решения,

1. Используйте обязательно, чтобы установить true

2. Используйте альтернативный обмен: внедрите сообщения, которые не направляются в очереди.

Давайте посмотрим на метод клиентского кода RabbitMQ.

Опубликовать метод сообщения в классе Channel

    void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body)
            throws IOException;
            
            

Объясните: в методе basicPublish обязательный и немедленный

/**
     * 当mandatory标志位设置为true时,如果exchange根据自身类型和消息routeKey无法找到一个符合条件的queue, 那么会调用basic.return方法将消息返回给生产者<br>
     * 当mandatory设置为false时,出现上述情形broker会直接将消息扔掉。
     */
    @Setter(AccessLevel.PACKAGE)
    private boolean mandatory = false;

    /**
     * 当immediate标志位设置为true时,如果exchange在将消息路由到queue(s)时发现对于的queue上没有消费者, 那么这条消息不会放入队列中。
     当immediate标志位设置为false时,exchange路由的队列没有消费者时,该消息会通过basic.return方法返还给生产者。
     * RabbitMQ 3.0版本开始去掉了对于immediate参数的支持,对此RabbitMQ官方解释是:这个关键字违背了生产者和消费者之间解耦的特性,因为生产者不关心消息是否被消费者消费掉
     */
    @Setter(AccessLevel.PACKAGE)
    private boolean immediate;
    

Поэтому для обеспечения достоверности сообщения необходимо задать логику отправки кода сообщения. Если нет отдельной формы, установите обязательное значение = false

Существует ключевой момент, который необходимо настроить при использовании обязательного значения true: как производитель получает сообщение, которое не направляется в соответствующую очередь правильно? При вызове channel.addReturnListener для добавления реализации прослушивателя ReturnListener, если отправленное сообщение не направляется в определенную очередь, ReturnListener получит сообщение мониторинга.

channel.addReturnListener(new ReturnListener() {
            public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP
                    .BasicProperties basicProperties, byte[] body) throws IOException {
                String message = new String(body);
                //进入该方法表示,没路由到具体的队列
                //监听到消息,可以重新投递或者其它方案来提高消息的可靠性。
                System.out.println("Basic.Return返回的结果是:" + message);
            }
 });

В это время кто-то спросил, что мне делать, если я не хочу усложнять логику программирования производителя, и не хочу, чтобы сообщение было потеряно? К счастью, RabbitMQ предоставляет нечто, называемое альтернативным обменом, что переводится как резервный коммутатор.Для чего это нужно? Проще говоря, он может хранить немаршрутизируемые сообщения в другой очереди обмена и обрабатывать их при необходимости.

Как этого достичь?

Простой, через вебуи можно управлять настройками фона.При создании новой биржи можно задать для нее Аргументы.Этот параметр -альтернативный-обмен.По сути,альтернативный-обмен-это обычная биржа.Наилучший тип-фанаут для легкое управление.

Когда вы отправляете сообщение на свой собственный обмен, если соответствующий ключ не маршрутизируется в очередь, оно будет автоматически передано в очередь, соответствующую альтернативному обмену, по крайней мере, сообщение не будет потеряно.

На следующей картинке показан процесс доставки:

Тогда у кого-то есть вопрос.Выше описаны два способа борьбы со схемой, что отправленное сообщение не может быть направлено в очередь. Каков эффект, если резервный переключатель используется с обязательным параметром?

ответ:Обязательный параметр недействителен

Поскольку места слишком много, я разделю его на другую статью, чтобы рассказать о следующем содержании.

2. Как обеспечить надежность сообщений и высокую доступность сообщений, хранящихся на RabbitMQ Server

3. Как не потерять сообщения от RabbitMQ Server для потребителей

Давайте поговорим о том, как большие заводы используются в производственной среде.

2,3 Для основных моментов перейдите к следующему описанию:Решить проблему потери сообщений RabbitMQ и обеспечить надежность сообщений (2)

END

Если у вас есть какие-либо успехи, пожалуйста, помогите их направить, и в будущем будут лучшие статьи.Ваша поддержка - самая большая мотивация для автора!

Добро пожаловать, чтобы обратить внимание на мою официальную учетную запись: практика архитекторов, получить эксклюзивные учебные ресурсы и ежедневный толчок галантерейных товаров.