1. Обзор предыдущей ситуации
Руководство по использованию RabbitMQ (1) Настройка установки среды RabbitMQ и пример Hello World
Вскоре после того, как два вышеуказанных блога были опубликованы, осторожные пользователи сети прокомментировали созданные очереди и отправленные сообщения.Если служба RabbitMQ будет перезапущена до запуска программы-потребителя, очереди и сообщения будут потеряны.
Это приводит к очень важному вопросу, который часто задают в интервью:Как при использовании RabbitMQ гарантировать, что сообщения не теряются в наибольшей степени и правильно потребляются?
2. Резюме этой статьи
RabbitMQ предоставляет следующие механизмы для решения этой проблемы:
- Производитель подтверждает
- Упорство
- Ручное подтверждение
В этом блоге мы сначала объясним механизм подтверждения производителя, а остальные механизмы будут объяснены в отдельных блогах позже.
3. Подтверждение производителя
Чтобы гарантировать, что сообщение не будет потеряно, мы должны сначала убедиться, что производитель может успешно отправить сообщение на сервер RabbitMQ.
Но в предыдущем примере, когда производитель отправил сообщение, правильно ли оно прибыло на сервер? Если никакая специальная настройка не выполняется, операция отправки сообщения по умолчанию не возвращает никакого сообщения производителю, то есть производитель не знает, правильно ли сообщение прибыло на сервер по умолчанию.
Мы также можем узнать из возвращаемого типа метода basicPublish:
public void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException {
this.basicPublish(exchange, routingKey, false, props, body);
}
Для лучшего понимания мы будем использовать предыдущий класс производителя Producer вchannel.queueDeclare(QUEUE_NAME, false, false, false, null);
Примечания:
package com.zwwhnly.springbootaction.rabbitmq.helloworld;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws IOException, TimeoutException {
// 创建连接
ConnectionFactory factory = new ConnectionFactory();
// 设置 RabbitMQ 的主机名
factory.setHost("localhost");
// 创建一个连接
Connection connection = factory.newConnection();
// 创建一个通道
Channel channel = connection.createChannel();
// 指定一个队列,不存在的话自动创建
//channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 发送消息
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
// 关闭频道和连接
channel.close();
connection.close();
}
}
В этот момент код запускается, потому что очереди не существует, сообщение должно быть некуда хранить, но программа не выдает ошибку, то есть сообщение потеряно, но мы этого не знаем.
RabblitMQ предлагает два решения этой проблемы:
- Реализуется механизмом транзакций
- Через механизм подтверждения отправителя (подтверждения издателя)
4. Механизм сделки
Существует три метода, связанных с механизмом транзакций в клиенте RabblitMQ:
- channel.txSelect: используется для установки текущего канала в режим транзакций.
- channel.txCommit: используется для фиксации транзакций
- channel.txRollback: используется для отката транзакции
Создайте новый класс производителя транзакций TransactionProducer, код выглядит следующим образом:
package com.zwwhnly.springbootaction.rabbitmq.producerconfirm;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class TransactionProducer {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws IOException, TimeoutException {
// 创建连接
ConnectionFactory factory = new ConnectionFactory();
// 设置 RabbitMQ 的主机名
factory.setHost("localhost");
// 创建一个连接
Connection connection = factory.newConnection();
// 创建一个通道
Channel channel = connection.createChannel();
// 指定一个队列,不存在的话自动创建
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.txSelect();
// 发送消息
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
channel.txCommit();
System.out.println(" [x] Sent '" + message + "'");
// 关闭频道和连接
channel.close();
connection.close();
}
}
Запустите код и убедитесь, что очередь успешно добавлена и сообщение успешно отправлено:
Немного изменим код, чтобы увидеть откат транзакции механизма исключений:
try {
channel.txSelect();
// 发送消息
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
int result = 1 / 0;
channel.txCommit();
System.out.println(" [x] Sent '" + message + "'");
} catch (IOException e) {
e.printStackTrace();
channel.txRollback();
}
потому чтоint result = 1 / 0;
Обязательно сработает исключение java.lang.ArithmeticException, поэтому транзакция будет откатана и отправка сообщения не удастся:
Если вы хотите отправить несколько сообщений, вы можете поместить в тело цикла методы channel.basicPublish, channel.txCommit и другие, как показано ниже:
channel.txSelect();
int loopTimes = 10;
for (int i = 0; i < loopTimes; i++) {
try {
// 发送消息
String message = "Hello World!" + i;
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
channel.txCommit();
System.out.println(" [x] Sent '" + message + "'");
} catch (IOException e) {
e.printStackTrace();
channel.txRollback();
}
}
Хотя транзакция может решить проблему подтверждения сообщения между отправителем сообщения и RabbitMQ, транзакция может быть успешно отправлена только в том случае, если сообщение успешно получено RabbitMQ, в противном случае транзакция может быть отброшена после перехвата исключения. Однако использование механизма транзакций приведет к снижению производительности RabbitMQ, поэтому рекомендуется использовать упомянутый ниже механизм подтверждения отправителя.
5. Механизм подтверждения отправителя
Механизм подтверждения отправителя означает, что производитель устанавливает канал в режим подтверждения. После перехода канала в режим подтверждения всем сообщениям, опубликованным на канале, будет присвоен уникальный идентификатор (начиная с 1). Сервер RabbitMQ, RabbitMQ отправит подтверждение (Basic.Ack) производителю (содержащее уникальный идентификатор сообщения), которое сообщает производителю, что сообщение прибыло в пункт назначения правильно.
Если RabbitMQ теряет сообщение из-за собственной внутренней ошибки, отправляется команда nack (Basic.Nack), и приложение-производитель также может обработать команду nack в методе обратного вызова.
Если сообщение и очередь являются устойчивыми, то сообщение подтверждения отправляется после того, как сообщение будет записано на диск.
Механизм транзакции блокирует отправителя после отправки сообщения, ожидая ответа от RabbitMQ, прежде чем продолжить отправку следующего сообщения.
Напротив, самым большим преимуществом механизма подтверждения отправителя является то, что он является асинхронным после публикации сообщения. Приложение-производитель может продолжать отправлять следующее сообщение, ожидая, пока канал вернет подтверждение.Когда сообщение будет окончательно подтверждено, приложение-производитель может обработать подтверждающее сообщение с помощью метода обратного вызова.
5.1 Обычное подтверждение
Создайте новый производственный класс подтверждения NormalConfirmProducer, код выглядит следующим образом:
package com.zwwhnly.springbootaction.rabbitmq.producerconfirm;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class NormalConfirmProducer {
private final static String EXCHANGE_NAME = "normal-confirm-exchange";
public static void main(String[] args) throws IOException, TimeoutException {
// 创建连接
ConnectionFactory factory = new ConnectionFactory();
// 设置 RabbitMQ 的主机名
factory.setHost("localhost");
// 创建一个连接
Connection connection = factory.newConnection();
// 创建一个通道
Channel channel = connection.createChannel();
// 创建一个Exchange
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
try {
channel.confirmSelect();
// 发送消息
String message = "normal confirm test";
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
if (channel.waitForConfirms()) {
System.out.println("send message success");
} else {
System.out.println("send message failed");
// do something else...
}
} catch (InterruptedException e) {
e.printStackTrace();
}
// 关闭频道和连接
channel.close();
connection.close();
}
}
channel.confirmSelect(); Установить канал в режим подтверждения.
channel.waitForConfirms(); Ожидание подтверждающего сообщения для отправки сообщения, если отправка прошла успешно, возвращается true, если отправка не удалась, возвращается false.
Если вы хотите отправить несколько сообщений, вы можете поместить в тело цикла методы channel.basicPublish, channel.waitForConfirms и другие методы, как показано ниже:
channel.confirmSelect();
int loopTimes = 10;
for (int i = 0; i < loopTimes; i++) {
try {
// 发送消息
String message = "normal confirm test" + i;
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
if (channel.waitForConfirms()) {
System.out.println("send message success");
} else {
System.out.println("send message failed");
// do something else...
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
результат операции:
send message success
send message success
send message success
send message success
send message success
send message success
send message success
send message success
send message success
send message success
Если режим подтверждения канала не включен, вызов channel.waitForConfirms() сообщит об ошибке:
Меры предосторожности:
1) Механизм транзакции и механизм подтверждения издателя являются взаимоисключающими и не могут сосуществовать.
RabbitMQ сообщит об ошибке, если будет предпринята попытка снова установить канал, который уже открыл режим транзакции, в режим подтверждения издателя:
channel.txSelect();
channel.confirmSelect();
RabbitMQ также сообщит об ошибке, если будет предпринята попытка установить канал, в котором включен режим подтверждения издателя, в режим транзакции:
channel.confirmSelect();
channel.txSelect();
2) Механизм транзакции и механизм подтверждения издателя гарантируют, что сообщение может быть отправлено в RabbitMQ правильно Значение «отправить в RabbitMQ» здесь означает, что сообщение правильно отправлено в обмен RabbitMQ, если обмен не имеет соответствующего queue , то сообщение также будет потеряно. Поэтому при использовании этих двух механизмов убедитесь, что задействованные обмены могут иметь совпадающие очереди.
Например, сообщение, отправленное вышеприведенным классом NormalConfirmProducer, отправляется на биржу normal-confirm-exchange, но биржа не привязана к какой-либо очереди.С точки зрения бизнеса сообщение все равно потеряно.
Обычный режим подтверждения заключается в вызове метода channel.waitForConfirms() каждый раз при отправке сообщения, а затем в ожидании подтверждения от сервера, что на самом деле является методом последовательного синхронного ожидания. Следовательно, по сравнению с механизмом транзакций улучшение производительности невелико.
5.2 Пакетное подтверждение
Режим пакетного подтверждения заключается в вызове метода channel.waitForConfirms() после отправки каждого пакета сообщений и ожидании возврата подтверждения от сервера, поэтому он имеет лучшую производительность, чем обычный режим подтверждения в 5.1.
Но недостатком является то, что при возврате к Basic.Nack или тайм-ауте клиенту производителя необходимо повторно отправить все сообщения в этом пакете, что приведет к очевидному количеству дублирующихся сообщений.Если сообщения часто теряются, пакетное подтверждение Производительность режима должна не увеличиваться, а уменьшаться.
Пример кода:
package com.zwwhnly.springbootaction.rabbitmq.producerconfirm;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeoutException;
public class BatchConfirmProducer {
private final static String EXCHANGE_NAME = "batch-confirm-exchange";
public static void main(String[] args) throws IOException, TimeoutException {
// 创建连接
ConnectionFactory factory = new ConnectionFactory();
// 设置 RabbitMQ 的主机名
factory.setHost("localhost");
// 创建一个连接
Connection connection = factory.newConnection();
// 创建一个通道
Channel channel = connection.createChannel();
// 创建一个Exchange
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
int batchCount = 100;
int msgCount = 0;
BlockingQueue blockingQueue = new ArrayBlockingQueue(100);
try {
channel.confirmSelect();
while (msgCount <= batchCount) {
String message = "batch confirm test";
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
// 将发送出去的消息存入缓存中,缓存可以是一个ArrayList或者BlockingQueue之类的
blockingQueue.add(message);
if (++msgCount >= batchCount) {
try {
if (channel.waitForConfirms()) {
// 将缓存中的消息清空
blockingQueue.clear();
} else {
// 将缓存中的消息重新发送
}
} catch (InterruptedException e) {
e.printStackTrace();
// 将缓存中的消息重新发送
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
// 关闭频道和连接
channel.close();
connection.close();
}
}
5.3 Асинхронное подтверждение
Асинхронный режим подтверждения заключается в добавлении интерфейса обратного вызова ConfirmListener на клиент-производитель и переписывании методов интерфейса handAck() и handNack() для обработки Basic.Ack и Basic.Nack, возвращаемых RabblitMQ, соответственно.
Оба метода имеют два параметра. Первый параметр, deliveryTag, используется для обозначения уникального порядкового номера сообщения. Второй параметр,multiple, указывает, есть ли несколько подтверждений.Значение true для нескольких подтверждений, а значение false , Представительство является одним подтверждением.
Образец кода:
package com.zwwhnly.springbootaction.rabbitmq.producerconfirm;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.TimeoutException;
public class AsyncConfirmProducer {
private final static String EXCHANGE_NAME = "async-confirm-exchange";
public static void main(String[] args) throws IOException, TimeoutException {
// 创建连接
ConnectionFactory factory = new ConnectionFactory();
// 设置 RabbitMQ 的主机名
factory.setHost("localhost");
// 创建一个连接
Connection connection = factory.newConnection();
// 创建一个通道
Channel channel = connection.createChannel();
// 创建一个Exchange
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
int batchCount = 100;
long msgCount = 1;
SortedSet<Long> confirmSet = new TreeSet<Long>();
channel.confirmSelect();
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.out.println("Ack,SeqNo:" + deliveryTag + ",multiple:" + multiple);
if (multiple) {
confirmSet.headSet(deliveryTag - 1).clear();
} else {
confirmSet.remove(deliveryTag);
}
}
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("Nack,SeqNo:" + deliveryTag + ",multiple:" + multiple);
if (multiple) {
confirmSet.headSet(deliveryTag - 1).clear();
} else {
confirmSet.remove(deliveryTag);
}
// 注意这里需要添加处理消息重发的场景
}
});
// 演示发送100个消息
while (msgCount <= batchCount) {
long nextSeqNo = channel.getNextPublishSeqNo();
channel.basicPublish(EXCHANGE_NAME, "", null, "async confirm test".getBytes());
confirmSet.add(nextSeqNo);
msgCount = nextSeqNo;
}
// 关闭频道和连接
channel.close();
connection.close();
}
}
результат операции:
Ack, SeqNo: 1, несколько: false
Ack, SeqNo: 2, несколько: false
Ack, SeqNo: 3, несколько: false
Ack, SeqNo: 4, несколько: false
Ack, SeqNo: 5, несколько: false
Ack, SeqNo: 6, несколько: false
Подтверждение, SeqNo: 7, несколько: ложь
Ack, SeqNo: 8, несколько: false
Ack, SeqNo: 9, несколько: false
Ack, SeqNo: 10, несколько: false
Примечание. После нескольких запусков обнаруживается, что выходные результаты каждого запуска отличаются, что указывает на то, что сообщение подтверждения, возвращаемое RabbitMQ производителю, не возвращается в пакете фиксированного размера.
6. Сравнение производительности
На данный момент мы узнали, что 4 режима (механизм транзакции, обычное подтверждение, пакетное подтверждение и асинхронное подтверждение) могут обеспечить подтверждение производителя. Давайте сравним их производительность и просто изменим количество сообщений, отправляемых в приведенном выше примере кода. Например, 10000. , следующие трудоемкие 4 режима:
Отправка 10000 сообщений, механизм транзакции занимает 2103
Отправка 10000 сообщений, обычный механизм подтверждения занимает время: 1483
При отправке 10 000 сообщений механизм пакетного подтверждения требует времени: 281
Отправка 10000 сообщений, асинхронный механизм подтверждения требует времени: 214
Видно, что механизм транзакций является самым медленным.Хотя обычный механизм подтверждения был улучшен, но ненамного.Пакетное подтверждение и асинхронное подтверждение имеют наилучшую производительность.Вы можете выбрать, какой механизм использовать в соответствии с вашими предпочтениями.Лично , рекомендуется использовать механизм асинхронного подтверждения.
7. Исходный код и ссылка
Адрес источника:GitHub.com/Где находится Ухань/SPR…, добро пожаловать на скачивание.
«Практическое руководство RabbitMQ» Чжу Чжунхуа