RabbitMQ предоставляет нам решение для обеспечения надежности сообщений на уровне приложения, что так же важно, как узлы RabbitMQ для обеспечения надежности сообщений.
1. Подтверждение потребительского сообщения
Когда Потребитель используетautoAck=true
При подписке на сообщения узла RabbitMQ это может быть связано с сетевыми причинами или могут быть исключения, когда Consumerm обрабатывает сообщения, или сервер не работает, что может привести к потере сообщений.
когдаautoAck=true
Когда сообщение отправлено, RabbitMQ автоматически установит отправленное сообщение в качестве подтверждения, а затем удалит его из памяти (или с диска), независимо от того, потребляет ли потребитель эти сообщения на самом деле. Чтобы этого не произошло, RabbitMQ предоставляет потребителю способ подтверждения обработки сообщений.На данный момент нам нужно установитьautoAck=false
autoAck = false;
channel.basicConsume(queueName, autoAck, deliverCallback, consumerTag -> {});
когда мы устанавливаемautoAck=false
В то же время узел RabbitMQ будет ждать метки подтверждения после того, как потребитель обработает сообщение, и удалит сообщение только тогда, когда RabbitMQ получит метку подтверждения. Если RabbitMQ не получил подтверждения от потребителя и подтверждает, что потребитель, который использовал сообщение, отключился, RabbitMQ повторно вставит сообщение в заголовок очереди сообщений и перенаправит его выжившему потребителю для потребления.
RabbitMQ не получил сигнал подтверждения от потребителя. Это может быть потому, что это долгосрочная задача. Это не означает, что потребитель повесил трубку, поэтому только тогда, когда RabbitMQ подтвердит, что канал действительно потерян, он будет неподтвержденным сообщения будут переадресованы.
Потребитель должен подтвердить, что сообщение было использовано, и сообщить RabbitMQ, как RabbitMQ идентифицирует каждое сообщение? ответdeliveryTag
.
Когда потребитель подписывается на сообщение RabbitMQ, RabbitMQ вызываетbasic.deliver
Отправьте сообщение потребителю, этот метод принимает монотонно возрастающее положительное целое числоdeliveryTag
может однозначно идентифицировать это сообщение здесьchannel
передача дальше.deliveryTag
Область примененияchannel
, поэтому сообщение должно быть подтверждено на том же канале, иначе оно вызоветunknown delivery tag
аномальный. Конечно, я думаю, что это действительно редкость.
RabbitMQ предоставляет нам три метода подтверждения или отклонения сообщений на стороне потребителя:
-
com.rabbitmq.client.Channel#basicAck()
/** * @param deliveryTag 交付标记 * @param multiple 是否批量确认,true:将一次性ack这条 channel上所有小于deliveryTag的消息。 */ void basicAck(long deliveryTag, boolean multiple) throws IOException;
-
com.rabbitmq.client.Channel#basicNack()
/** * @param deliveryTag 交付标记 * @param multiple 是否批量拒绝,true:将一次性reject这条 channel上所有小于deliveryTag的消息。 * @param requeue 被拒绝的消息是否重回队列 */ void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;
-
com.rabbitmq.client.Channel#basicReject()
// channel.basicNack 与 channel.basicReject 的区别在于basicNack可以拒绝多条消息,而basicReject一次只能拒绝一条消息 void basicReject(long deliveryTag, boolean requeue) throws IOException;
Как потребитель подтверждает сообщение?
- Consumer
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(ExchangeCostant.EXCHANGE_CONSUMER_ACK,
BuiltinExchangeType.DIRECT);
boolean durable = true;
channel.queueDeclare(QueueCostant.CONSUMER_ACK, durable, false, false, null);
channel.queueBind(QueueCostant.CONSUMER_ACK, ExchangeCostant.EXCHANGE_CONSUMER_ACK, "consumer.ack");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("ConsumerTag is [" + consumerTag + "]," +
" [x] Received '" + message + "'," +
" DeliveryTag is [" + delivery.getEnvelope().getDeliveryTag() + "]," +
" Thread is [" + Thread.currentThread().getName() + "]"
);
// 模拟处理消息的耗时
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 返回消息确认
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
System.out.println(" [x] Done");
};
boolean autoAck = false;
channel.basicConsume(QueueCostant.CONSUMER_ACK, autoAck, deliverCallback, consumerTag -> {
});
}
- Pubisher
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(ExchangeCostant.EXCHANGE_CONSUMER_ACK,
BuiltinExchangeType.DIRECT);
for (int i = 1; i <= 10; i++) {
// 推送持久化消息
String message = "Send message " + i;
channel.basicPublish(ExchangeCostant.EXCHANGE_CONSUMER_ACK, "consumer.ack", false, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
System.out.println(" [x] sent ' " + message + " '");
}
}
Сначала запускаем производителя для отправки 10 сообщений потребителю, в это время потребитель еще не запущен (объявлена очередь, соответствующая обмену).
На данный момент в очереди хранится 10 сообщений, ожидающих обработки потребителями. Затем мы запускаем потребителя, и все сообщения в очереди сообщений отправляются потребителю, в это время потребитель потребляет 3 сообщения и возвращает подтверждение в RabbitMQ.
Затем мы ОСТАНАВЛИВАЕМ потребителя, чтобы имитировать ситуацию, когда потребитель зависает.В это время 7 сообщений, которые не были подтверждены, восстанавливаются в очереди сообщений, ожидая отправки выжившим потребителям.
Теперь давайте обсудим другую проблему: что, если возникнет исключение, когда потребитель обрабатывает сообщение? Единственным основанием для RabbitMQ решить, должно ли сообщение быть доставлено повторно, является то, отключен ли канал потребителя.Нужно ли сообщение повторно обрабатывать в это время? Эта ситуация должна оцениваться в соответствии с конкретным сценарием использования и не обязательно требует, чтобы RabbitMQ повторно доставил сообщение. Но Кролик дает нам возможность отклонить сообщение и вернуть его в очередь.
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(ExchangeCostant.EXCHANGE_CONSUMER_ACK,
BuiltinExchangeType.DIRECT);
boolean durable = true;
channel.queueDeclare(QueueCostant.CONSUMER_ACK, durable, false, false, null);
channel.queueBind(QueueCostant.CONSUMER_ACK, ExchangeCostant.EXCHANGE_CONSUMER_ACK, "consumer.ack");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
try {
String message = new String(delivery.getBody(), "UTF-8");
if (2 == delivery.getEnvelope().getDeliveryTag()) {
throw new IllegalStateException("消息无法被正常处理, DeliveryTag is [" + delivery.getEnvelope().getDeliveryTag() + "]");
}
System.out.println("ConsumerTag is [" + consumerTag + "]," +
" [x] Received '" + message + "'," +
" DeliveryTag is [" + delivery.getEnvelope().getDeliveryTag() + "]," +
" Thread is [" + Thread.currentThread().getName() + "]"
);
// 模拟处理消息的耗时
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 返回消息确认
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
System.out.println(" [x] Done");
} catch (Exception e) {
channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, false);
}
};
boolean autoAck = false;
channel.basicConsume(QueueCostant.CONSUMER_ACK, autoAck, deliverCallback, consumerTag -> {
});
}
Однако вообще не рекомендуется использовать функцию возврата в очередь, так как отклоненное сообщение будет вставлено в начало очереди сообщений, что легко приведет к попаданию программы в бесконечный цикл. очередь писем вместо функции возврата в очередь.
2. Подтверждение сообщения издателя
Когда производитель отправляет сообщение на биржу RabbitMQ, у нас нет возможности гарантировать, что сообщение может достичь биржи.Чтобы обеспечить надежную отправку сообщения, RabbitMQ предоставляет два решения: сообщение о транзакции и подтверждение сообщения, поскольку сообщения о транзакциях будет серьезно снижать производительность RabbitMQ, поэтому он в основном не используется. Мы можем использовать асинхронное подтверждение сообщения, чтобы гарантировать, что отправленное сообщение должно быть доставлено в RabbitMQ.
static SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>());
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(ExchangeCostant.EXCHANGE_CONSUMER_ACK,
BuiltinExchangeType.DIRECT);
// confirm模式
channel.confirmSelect();
channel.addConfirmListener(
(deliveryTag, multiple) -> {
System.out.println("Ack Callback DeliveryTag is [" + deliveryTag + "] multiple is " + multiple);
if (multiple) {
confirmSet.headSet(deliveryTag + 1).clear();
} else {
confirmSet.remove(deliveryTag);
}
},
(deliveryTag, multiple) -> {
}
);
channel.addReturnListener(returnMessage -> {
String message = new String(returnMessage.getBody());
System.out.println("No routing message " + message);
});
boolean mandatory = true;
for (int i = 1; i <= 100; i++) {
// 推送持久化消息
long nextSeqNo = channel.getNextPublishSeqNo();
String message = "Send message " + nextSeqNo;
channel.basicPublish(ExchangeCostant.EXCHANGE_CONSUMER_ACK, "consumer.ack", mandatory, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
confirmSet.add(nextSeqNo);
System.out.println(" [x] sent ' " + message + " '");
}
}
Когда сообщение поступает в RabbitMQ и сохраняется, оно возвращаетdeliveryTag
, Подтверждение сообщения асинхронного мониторинга может быть пакетным или одиночным, мы можем поддерживать упорядоченный набор для хранения неподтвержденных сообщений для повторной отправки в RabbitMQ.
Другая проблема заключается в том, что если у биржи нет подходящей очереди или очередь не пришла и не была объявлена, сообщение также может быть потеряно.В это время мы можем установитьmandatory=true
,Добавить кreturnListener
для обработки таких немаршрутизируемых сообщений. Вы также можете установить переключательalternate-exchange
параметры для настройки резервного коммутатора для хранения этих немаршрутизируемых сообщений, чтобы уменьшить сложность программы-отправителя.
static SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>());
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(ExchangeCostant.EXCHANGE_CONSUMER_ACK,
BuiltinExchangeType.DIRECT, true, false, createBackUpExchange(channel));
// confirm模式
channel.confirmSelect();
channel.addConfirmListener(
(deliveryTag, multiple) -> {
System.out.println("Ack Callback DeliveryTag is [" + deliveryTag + "] multiple is " + multiple);
if (multiple) {
confirmSet.headSet(deliveryTag + 1).clear();
} else {
confirmSet.remove(deliveryTag);
}
},
(deliveryTag, multiple) -> {
}
);
boolean mandatory = false;
for (int i = 1; i <= 100; i++) {
// 推送持久化消息
long nextSeqNo = channel.getNextPublishSeqNo();
String message = "Send message " + nextSeqNo;
channel.basicPublish(ExchangeCostant.EXCHANGE_CONSUMER_ACK, "consumer.ack", mandatory, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
confirmSet.add(nextSeqNo);
System.out.println(" [x] sent ' " + message + " '");
}
}
public static Map<String, Object> createBackUpExchange(Channel channel) throws Exception {
Map<String, Object> arguments = new HashMap<>();
arguments.put("alternate-exchange", ExchangeCostant.EXCHANGE_BACKUP);
// 声明一个广播类型的交换机
channel.exchangeDeclare(ExchangeCostant.EXCHANGE_BACKUP, BuiltinExchangeType.FANOUT, true, false, null);
channel.queueDeclare(QueueCostant.BACK_UP, true, false, false, null);
channel.queueBind(QueueCostant.BACK_UP, ExchangeCostant.EXCHANGE_BACKUP, "");
return arguments;
}
3. Узел RabbitMQ
Обеспечение надежности сообщений на уровне приложения значительно улучшило безопасность приложения, но когда узел RabbitMQ перезапустится или выйдет из строя, сообщение все равно будет потеряно, поэтому нам также нужно установить постоянство очереди и постоянство Убедитесь, что сообщения могут быть восстановлены после выхода из строя или перезапуска узла.
Если из-за сбоя одной машины машина не сможет восстановиться, сообщения все равно будут потеряны, поэтому для критических сообщений нам необходимо настроить зеркальные очереди и кластеры, чтобы обеспечить высокую доступность служб сообщений.
Справочная статья:blog.CSDN.net/U013256816/…