1. Обзор предыдущей ситуации
Руководство по использованию RabbitMQ (1) Настройка установки среды RabbitMQ и пример Hello World
Руководство по использованию RabbitMQ (3) Как обеспечить успешную отправку 99,99% сообщений?
До сих пор мы можем гарантировать, что сообщение успешно отправлено производителем на сервер RabbitMQ, и что сообщение не будет потеряно после ненормального состояния сервера RabbitMQ (перезагрузка, простои и т. д.) Возможно, вы считаете, что сообщение должно быть в безопасности сейчас? На самом деле это недостаточно безопасно, если не верите, то читайте дальше.
2. Резюме этой статьи
На самом деле, есть еще один сценарий для рассмотрения:Когда потребитель получает сообщение, бизнес-логика не была обработана, и потребитель вешает трубку, поэтому сообщение также теряется?, Например, когда пользователь размещает заказ, центр заказов отправляет сообщение в очередь в RabbitMQ, а центр поинтов получает это сообщение и намеревается добавить 20 баллов пользователю, разместившему заказ, но баллы не были успешно добавлен, и центр точек зависает сам по себе, вызывая проблемы с данными.
Так как же решить эту проблему?
Чтобы гарантировать, что сообщения будут успешно обработаны потребителями, RabbitMQ предоставляетподтверждение сообщения, В этой статье в основном объясняется, как использовать механизм подтверждения сообщений в RabbitMQ, чтобы убедиться, что сообщения успешно используются потребителями, и избежать потери сообщений, вызванной внезапным простоем потребителя.
3. Включить явный режим подтверждения
в блоге 1Руководство по использованию RabbitMQ (1) Настройка установки среды RabbitMQ и пример Hello World, наш код для запуска потребителя выглядит так:
// 创建队列消费者
com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("Received Message '" + message + "'");
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
Дело в том, чтоchannel.basicConsume(QUEUE_NAME, true, consumer);
Второй параметр метода, давайте сначала посмотрим на исходный код basicConsume():
public String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException {
return this.basicConsume(queue, autoAck, "", callback);
}
Параметр autoAck здесь относится к тому, подтверждается ли оно автоматически.Если для него установлено значение true, RabbitMQ автоматически установит отправленное сообщение как подтверждение, а затем удалит его из памяти (или с диска), независимо от того, получит ли потребитель сообщение и обработает ли его. это успешно; если установлено значение false, RabbitMQ будет ждать, пока потребитель явно ответит на сигнал подтверждения, прежде чем удалять из памяти (или с диска).
Рекомендуется установить для autoAck значение false, чтобы у потребителей было достаточно времени для обработки сообщений, не беспокоясь о потере сообщений, вызванной простоем потребителя во время обработки сообщений.
В этот момент сообщение в очереди разделено на две части:
- Ожидание доставки сообщения потребителю (часть Ready на рисунке ниже)
- Он был доставлен потребителю, но не получил подтверждающий сигнал от потребителя (Неподготовленная часть на рисунке ниже)
Если RabbitMQ не получил сигнал подтверждения от потребителя, а потребитель, получающий сообщение, был отключен, RabbitMQ организует повторную постановку сообщения в очередь и ожидание доставки следующему потребителю. быть первоначальным потребителем.
RabbitMQ не устанавливает время истечения срока действия для неподтвержденных сообщений.Единственным основанием для принятия решения о необходимости повторной доставки сообщения потребителям является то, было ли разорвано соединение потребителя, которое потребляет сообщение.Причина такого дизайна в том, что RabbitMQ позволяет потребителям для использования сообщения. Сообщения могут занимать много времени.
Для простоты понимания возьмем конкретный пример, и будем использовать DurableProducer выше со слов производителя:
package com.zwwhnly.springbootaction.rabbitmq.durable;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class DurableProducer {
private final static String EXCHANGE_NAME = "durable-exchange";
private final static String QUEUE_NAME = "durable-queue";
public static void main(String[] args) throws IOException, TimeoutException {
// 创建连接
ConnectionFactory factory = new ConnectionFactory();
// 设置 RabbitMQ 的主机名
factory.setHost("localhost");
// 创建一个连接
Connection connection = factory.newConnection();
// 创建一个通道
Channel channel = connection.createChannel();
// 创建一个Exchange
channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
// 发送消息
String message = "durable exchange test";
AMQP.BasicProperties props = new AMQP.BasicProperties().builder().deliveryMode(2).build();
channel.basicPublish(EXCHANGE_NAME, "", props, message.getBytes());
// 关闭频道和连接
channel.close();
connection.close();
}
}
Затем создайте новый потребительский класс AckConsumer:
package com.zwwhnly.springbootaction.rabbitmq.ack;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class AckConsumer {
private final static String QUEUE_NAME = "durable-queue";
public static void main(String[] args) throws IOException, TimeoutException {
// 创建连接
ConnectionFactory factory = new ConnectionFactory();
// 设置 RabbitMQ 的主机名
factory.setHost("localhost");
// 创建一个连接
Connection connection = factory.newConnection();
// 创建一个通道
Channel channel = connection.createChannel();
// 创建队列消费者
com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
int result = 1 / 0;
System.out.println("Received Message '" + message + "'");
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
Сначала мы устанавливаем параметр autoAck в значение true, то есть автоматическое подтверждение, и намеренно пишем исключение при потреблении сообщений, затем запускаем клиент-производитель для записи сообщения в очередь, а затем запускаем клиент-потребитель и обнаруживаем, что сообщение не потребляется успешно, но исчез:
Затем мы устанавливаем для autoAck значение false:
channel.basicConsume(QUEUE_NAME, false, consumer);
Запустите клиент производителя еще раз, чтобы написать сообщение в очередь, а затем запустить клиент Потребителя. В это время, хотя потребительский клиент все еще имеет код исключения кода, сообщение все еще находится в очереди:
Затем мы удаляем код исключения в клиенте-потребителе, перезапускаем клиент-потребитель и обнаруживаем, что потребление сообщения прошло успешно, но сообщение не было подтверждено:
Вручную остановите клиент-потребитель и обнаружите, что сообщение снова находится в состоянии «Готово» и готово к повторной доставке:
Причина, по которой сообщение потребляется, но все еще находится в состоянии Unacked, заключается в том, что мы не добавили в код явный код подтверждения:
String message = new String(body, "UTF-8");
//int result = 1 / 0;
System.out.println("Received Message '" + message + "'");
long deliveryTag = envelope.getDeliveryTag();
channel.basicAck(deliveryTag, false);
deliveryTag можно рассматривать как номер сообщения, который представляет собой 64-битное целое число.
На данный момент запустите потребительский клиент и обнаружите, что сообщение успешно потребляется и удалена из очереди:
4. Исходный код и ссылка
Адрес источника:GitHub.com/Где находится Ухань/SPR…, добро пожаловать на скачивание.
«Практическое руководство RabbitMQ» Чжу Чжунхуа