Обзор
В RabbitMQ даже после того, как очередь, обмен, сообщение и т. д. настроены как постоянные, все равно нет гарантии, что данные не будут потеряны на 100%. Чтобы понять, что сообщение не потеряно, нам нужно обрабатывать его со стороны потребителя и со стороны производителя одновременно. В этой статье сначала представлена сторона потребителя.В AMPQ-0-9-1 определен механизм подтверждения сообщений от потребителей к RabbitMQ.Этот механизм может гарантировать, что сообщения могут правильно достигать стороны потребителя от RabbitMQ. В этой статье рассказывается, как реализовать механизм подтверждения сообщений на стороне потребителя в RabbitMQ, включая следующее содержимое.
- 1 Механизм реализации потребителей
- 2 Реализация кода на стороне потребителя
- 3 Используйте wireshark для подпакета и анализа ключевых пакетов подтверждения сообщения
- 4 Примечания по использованию механизма подтверждения сообщения
Механизм подтверждения доставки потребителю
Способ подтверждения на стороне потребителя
В RabbitMQ есть два метода подтверждения:
-
1 Метод автоматического подтверждения: RabbitMQ успешно отправляет сообщение (то есть сообщение успешно записывается в TCP-сокет) и сразу же считает, что доставка была обработана правильно, независимо от того, успешно ли обработала доставку сторона потребителя.
В режиме автоподтверждения сообщение считается успешно доставленным после его отправки, что также известно как «выстрелил-забыл».
преимущество: пропускная способность в этом режиме очень высока.
недостаток: A. Возможна ситуация, когда доставка теряется.В отличие от режима ручного подтверждения, если TCP-соединение или канал потребителя закрываются до того, как сообщение будет успешно обменено, сообщение будет потеряно.B.Проблема перегрузки на стороне потребителя . В ручном режиме квитирования можно установить максимальное количество сообщений, которые будут обрабатываться за один раз, в то время как в автоматическом режиме это значение установить нельзя. Следовательно, потребители могут аварийно завершать работу из-за того, что сообщения не могут быть обработаны вовремя, накапливаются в памяти и память исчерпана. C. Этот режим рекомендуется использовать только в сценариях, где потребители могут быстро и стабильно обрабатывать доставленные сообщения. -
2 Метод ручной обработки: после того, как потребитель получил сообщение, после ручного вызова basic.ack/basic.nack/basic.reject, RabbitMQ считает доставку успешной только после получения этих сообщений.
Методы ручного подтверждения сообщений:
§ basic.ackдля положительного подтверждения
§ basic.nackдля отрицательного подтверждения (примечание: это расширение RabbitMQ для AMQP 0-9-1)
§ basic.rejectИспользуется для отрицательных подтверждений, но имеет ограничение по сравнению с basic.nack: одновременно может быть отклонено только одно сообщение.
Вышеупомянутые три метода на стороне потребителя указывают, что сообщение было доставлено правильно, но basic.ack указывает, что сообщение было правильно обработано, а basic.nack и basic.reject указывают, что оно не было правильно обработано, но сообщение по-прежнему необходимо удалить в RabbitMQ.
Эффективность доставки при ручном режиме подтверждения несколько ниже, чем при автоматическом, но она может компенсировать недостаток автоматического режима подтверждения.
Пакетное подтверждение доставки вручную
Помимо подтверждения одного сообщения вручную, вы также можете подтвердить одновременно несколько сообщений. Чтобы уменьшить сетевой трафик, вы можете выполнять пакетное подтверждение вручную. При ответе установите для поля Multiple в basic.nack значение true, чтобы одновременно подтвердить, что delivery_tag и сообщение о доставке меньше, чем значение delivery_tag. от 5 до 8 будут подтверждены после успешного выполнения метода. Если для параметра Multiple установлено значение false, то доставки 5, 6 и 7 по-прежнему не будут подтверждены.
Уникальный код доставки: теги доставки
После того, как потребитель зарегистрируется в RabbitMQ, когда RabbitMQ использует basic.deliver для доставки сообщения потребителю, тело сообщения будет содержать тег доставки, который однозначно идентифицирует доставку.На том же канале это значение уникально. Значение тега доставки имеет длину 64 бита. Значение начинается с 1 и увеличивается на 1 при каждой отправке сообщения. Максимальное значение – 9 223372036854775807. Когда потребитель отвечает на сообщение, он принимает этот параметр, чтобы сообщить RabbitMQ, что на определенную доставку получен правильный ответ.
Код подтверждения доставки сообщения на стороне потребителя
проект:
Проект: Код Путь:
Код потребителя:
Код: Ключевые моменты
а. Channel.basicConsume настроен на получение неавтоматического подтверждения
б) После обработки сообщения вызовите channel.basicAck для ручного подтверждения сообщения.
// 默认消费者实现
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [ConsumerConfirmRecv] Received '" + message + "'");
// 消息正向确认
channel.basicAck(envelope.getDeliveryTag(),true);
// 消息否定确认: 如果设置multiple=false,requeue值启作用,如果设置multiple=true,则requeue无论设置什么值,后台统一处理成true
// channel.basicNack(envelope.getDeliveryTag(),false, false);
}
};
// 接收消息:设置非自动确认
channel.basicConsume(QUEUE_NAME, false, consumer);
Анализ захвата пакетов на стороне потребителя
Выполните приведенный выше код, перехватите и проанализируйте ключевые пакеты. Если он отличается от обычного пакета сообщения, см. общий анализ пакета на стороне потребителя.ПО промежуточного слоя серии девять RabbmtiMQ изучает протокол AMQP с помощью захвата wireshark
Положительный акк — Basic.ack
Если он отличается от общего пакета сообщений, в этой статье показан общий анализ пакета на стороне потребителя, но добавлен пакет Basic.ack.
Кадры 131-132 RabbitMQ отправляет сообщение потребителю, а потребитель дает отрицательное подтверждение.
131 фрейм RabbitMQ отправляет сообщения потребителям
Frame 131: 269 bytes on wire (2152 bits), 269 bytes captured (2152 bits) on interface 0
Ethernet II, Src: PcsCompu_48:72:ad (08:00:27:48:72:ad), Dst: Giga-Byt_bf:ae:ce (40:8d:5c:bf:ae:ce)
Internet Protocol Version 4, Src: 10.240.80.147, Dst: 10.240.80.99
Transmission Control Protocol, Src Port: 5672, Dst Port: 49877, Seq: 600, Ack: 642, Len: 215
# 返回Consume的调用结果
Advanced Message Queueing Protocol
Type: Method (1)
Channel: 1
Length: 36
Class: Basic (60)
Method: Consume-Ok (21)
Arguments
# 此消息的消费者的编号
Consumer-Tag: amq.ctag-AhhpMhEMC5VuWcny42rObg
# 要投递的消息参数
Advanced Message Queueing Protocol
Type: Method (1)
Channel: 1
Length: 87
Class: Basic (60)
Method: Deliver (60)
Arguments
# 要投递的消息者标志
Consumer-Tag: amq.ctag-AhhpMhEMC5VuWcny42rObg
# 消息的编号,从1开始
Delivery-Tag: 1
# 是否是重新投递的消息,如果消息是第一次投递,则此值是false,如果此消息是重新投递的,则此值为true
.... ...0 = Redelivered: False
# 来源的交换机
Exchange: consumerconfirm-exchange
# 路由键
Routing-Key: consumer-confirm
# 要投递的消息的头
Advanced Message Queueing Protocol
Type: Content header (2)
Channel: 1
Length: 27
Class ID: Basic (60)
Weight: 0
Body size: 33
Property flags: 0x9800
# 消息的属性
Properties
Content-Type: text/plain
Delivery-Mode: 2
Priority: 0
# 要投递的消息的内容
Advanced Message Queueing Protocol
Type: Content body (3)
Channel: 1
Length: 33
Payload: 436f6e73756d6572436f6e6669726d53656e642131353137...
132 кадра Подтверждение сообщения потребителем
Отрицательный акк — Basic.Nack
Если он отличается от общего пакета сообщений, в этой статье показан общий анализ пакета на стороне потребителя, но добавлен пакет Basic.Nack.
275-276 кадровRabbitMQ отправляет сообщение потребителю, а потребитель выполняет отрицательное подтверждение.
275 кадровRabbitMQ отправляет сообщения потребителям, что аналогично содержимому в предыдущем разделе, здесь опущенном.
276 кадровСторона потребителя выполняет отрицательное подтверждение, которое имеет больше атрибутов Requeue, чем basic.ack.
Другие механизмы на стороне потребителя для обеспечения правильной обработки данных
Для корректной обработки данных на стороне потребителя также можно установить QoS и сбой доставки.
Настройка предварительной выборки канала (QoS)
Работает только в ручном режиме подтверждения для сообщений.
Чтобы потребитель не обрабатывал слишком много сообщений одновременно, вы можете установить максимальное значение предварительной выборки через basic.qos. Это значение определяет максимальное количество неподтвержденных сообщений, разрешенных на канале. Как только количество неподтвержденных сообщений достигнет настроенного значения, RabbitMQ прекратит доставку дополнительных сообщений в канал до тех пор, пока не будет подтверждено хотя бы одно неподтвержденное сообщение.
Примечание: настройки предварительной выборки канала не включены в basic.get ("pull API"), даже в режиме ручного подтверждения сообщения.
Режим подтверждения потребителя, предварительная выборка и пропускная способность
Параметры, которые больше всего влияют на пропускную способность в RabbitMQ: режим подтверждения сообщения и значение предварительной выборки QoS режим автоматического подтверждения сообщения или установка значения предварительной выборки QoS на бесконечность, хотя это может максимизировать скорость доставки сообщений, оно не обрабатывается вовремя на потребителе Количество сообщений также будет увеличиваться, тем самым увеличивая потребление оперативной памяти потребителями и используя потребительскую сторону для сбоя. Таким образом, два вышеуказанных случая нужно использовать с осторожностью.
RabbitMQ официально рекомендует устанавливать значение предварительной выборки QoS в диапазоне от 100 до 300, что обычно обеспечивает наилучшую пропускную способность и не приводит к сбою потребителей.
Когда потребитель выходит из строя или теряет соединение: автоматическая повторная постановка в очередь
В режиме ручного подтверждения сообщения, если все каналы или соединения для доставки сообщений внезапно закрываются (включая потерю TCP-соединений на стороне потребителя, зависание приложения (процесса) потребителя и исключения протокола на уровне канала), если какое-либо из доставленных сообщений происходят Сообщения, которые не подтверждены потребителем, автоматически помещаются в очередь повторно.
Обратите внимание, что клиенту требуется некоторое время, чтобы обнаружить, что обнаружение соединения недоступно, поэтому будет период времени, когда все сообщения будут повторно доставлены.
Из-за возможной повторной доставки сообщений все интерфейсы на стороне потребителя должны быть гарантированно идемпотентными.
Несколько подтверждений и подтверждений для неизвестных delivery_tags
Если потребитель имеет несколько подтверждений для одного и того же тега delivery_tag, выдается исключение канала PRECONDITION_FAILED. Исключение канала также генерируется, если подтвержден неизвестный тег delivery_tag.
код
Смотрите код github для всех подробных кодов, пожалуйста, используйте как можно большетег v0.15, не используйте мастер, потому что мастер всегда меняется, нет гарантии, что код в статье и код на гитхабе всегда будут одинаковыми