карта разума
1. Проанализируйте причины потери данных
Чтобы проанализировать потерю сообщений RabbitMQ, давайте рассмотрим процесс отправки сообщения от производителя к потребителю:
Видно, что весь процесс сообщения должен проходить через две передачи по сети:От производителя к серверу RabbitMQ, от сервера RabbitMQ к потребителю.
Хранится в очереди (Queue) до того, как потребитель не использует его.
Таким образом, известно, что существует три сценария, в которых произойдет потеря сообщения:
- Сохраненные в очереди, если очередь не сохраняет сообщение, данные будут потеряны, если сервер RabbitMQ выйдет из строя и перезапустится.
- Когда производитель отправляет сообщение на сервер RabbitMQ, если сервер RabbitMQ выйдет из строя и остановит службу, сообщение будет потеряно.
- Потребитель получает потребление данных, хранящееся в очереди, с сервера RabbitMQ, но программа-потребитель дает сбой или не может правильно использовать, что приводит к потере данных.
Для приведенных выше трех сценариев RabbitMQ предоставляет три решения, а именно сохранение сообщений, механизм подтверждения и механизм транзакции ACK.
2. Сохранение сообщения
RabbitMQ поддерживает постоянство сообщений, и необходимо установить постоянство сообщений: Exchange является постоянным и Queue постоянным, чтобы при отправке сообщения на сервер RabbitMQ оно было постоянным.
Сначала взгляните на диаграмму классов коммутатора Exchange:
Глядя на эту диаграмму классов, на самом деле нужно объяснить, что все четыре переключателя, представленные в предыдущей статье, являются подклассами абстрактного класса AbstractExchange, поэтому, согласно характеристикам java,Создание экземпляра подкласса сначала вызовет конструктор суперкласса, что такое конструктор родительского класса, то есть AbstractExchange?
Как видно из комментариев вышеПрочный параметр указывает, является ли он долговечным. Значение по умолчанию постоянное (true). Чтобы создать постоянный Exchange, вы можете написать:
@Bean
public DirectExchange rabbitmqDemoDirectExchange() {
//Direct交换机
return new DirectExchange(RabbitMQConfig.RABBITMQ_DEMO_DIRECT_EXCHANGE, true, false);
}
Далее идет очередь Queue.Давайте сначала посмотрим, как выглядит конструктор Queue:
Также устанавливается, будет ли сохраняться через устойчивый параметр, значение по умолчанию — true. Таким образом, вы можете создать его без указания:
@Bean
public Queue fanoutExchangeQueueA() {
//只需要指定名称,默认是持久化的
return new Queue(RabbitMQConfig.FANOUT_EXCHANGE_QUEUE_TOPIC_A);
}
На этом настройка сохранения сообщений завершена. Далее запускаем проект и отправляем несколько сообщений. Мы видим:
Как доказать, что он был сохранен?На самом деле, соответствующий файл можно найти:
Найдите каталог на соответствующем диске:
Сохранение сообщений может предотвратить потерю сообщений на сервере RabbitMQ из-за простоя и перезапуска..
3. Механизм подтверждения сообщения
3.1 механизм подтверждения
Когда производитель отправляет на сервер RabbitMQ, доставка может завершиться ошибкой из-за проблем с сетью, что приведет к потере данных.. Мы можем использовать режим подтверждения, чтобы предотвратить потерю данных. Как работает рабочий процесс? См. схему ниже:Как видно из рисунка выше, уведомление осуществляется через две функции обратного вызова **confirm() и returnMessage()**.
Сообщение, отправленное производителем в RabbitMQ, сначала будет отправлено в Exchange, что соответствует функции обратного вызова.confirm(). Вторым шагом является назначение маршрута от Exchange до Queue, и соответствующая функция обратного вызоваreturnedMessage().
Как реализовать код, смотрите демо:
первый вapplication.ymlДобавьте следующую конфигурацию в файл конфигурации:
spring:
rabbitmq:
publisher-confirms: true
# publisher-returns: true
template:
mandatory: true
# publisher-confirms:设置为true时。当消息投递到Exchange后,会回调confirm()方法进行通知生产者
# publisher-returns:设置为true时。当消息匹配到Queue并且失败时,会通过回调returnedMessage()方法返回消息
# spring.rabbitmq.template.mandatory: 设置为true时。指定消息在没有被队列接收时会通过回调returnedMessage()方法退回。
Иметь небольшую деталь,Издатель-возврат и обязательны, если все настроен, приоритет является предпочтительным в обязательном. Вы можете увидеть исходный код:Далее нам нужно определить метод обратного вызова:
@Component
public class RabbitmqConfirmCallback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
private Logger logger = LoggerFactory.getLogger(RabbitmqConfirmCallback.class);
/**
* 监听消息是否到达Exchange
*
* @param correlationData 包含消息的唯一标识的对象
* @param ack true 标识 ack,false 标识 nack
* @param cause nack 投递失败的原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
logger.info("消息投递成功~消息Id:{}", correlationData.getId());
} else {
logger.error("消息投递失败,Id:{},错误提示:{}", correlationData.getId(), cause);
}
}
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
logger.info("消息没有路由到队列,获得返回的消息");
Map map = byteToObject(message.getBody(), Map.class);
logger.info("message body: {}", map == null ? "" : map.toString());
logger.info("replyCode: {}", replyCode);
logger.info("replyText: {}", replyText);
logger.info("exchange: {}", exchange);
logger.info("routingKey: {}", exchange);
logger.info("------------> end <------------");
}
@SuppressWarnings("unchecked")
private <T> T byteToObject(byte[] bytes, Class<T> clazz) {
T t;
try (ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
ObjectInputStream ois = new ObjectInputStream(bis)) {
t = (T) ois.readObject();
} catch (Exception e) {
e.printStackTrace();
return null;
}
return t;
}
}
Здесь я просто печатаю сообщение, возвращенное методом обратного вызова.В реальном проекте возвращенное сообщение может быть сохранено в таблице журнала и далее обработано с помощью запланированной задачи.
я используюRabbitTemplateДля отправки необходимо установить RabbitTemplate на уровне службы:
@Service
public class RabbitMQServiceImpl implements RabbitMQService {
@Resource
private RabbitmqConfirmCallback rabbitmqConfirmCallback;
@Resource
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init() {
//指定 ConfirmCallback
rabbitTemplate.setConfirmCallback(rabbitmqConfirmCallback);
//指定 ReturnCallback
rabbitTemplate.setReturnCallback(rabbitmqConfirmCallback);
}
@Override
public String sendMsg(String msg) throws Exception {
Map<String, Object> message = getMessage(msg);
try {
CorrelationData correlationData = (CorrelationData) message.remove("correlationData");
rabbitTemplate.convertAndSend(RabbitMQConfig.RABBITMQ_DEMO_DIRECT_EXCHANGE, RabbitMQConfig.RABBITMQ_DEMO_DIRECT_ROUTING, message, correlationData);
return "ok";
} catch (Exception e) {
e.printStackTrace();
return "error";
}
}
private Map<String, Object> getMessage(String msg) {
String msgId = UUID.randomUUID().toString().replace("-", "").substring(0, 32);
CorrelationData correlationData = new CorrelationData(msgId);
String sendTime = sdf.format(new Date());
Map<String, Object> map = new HashMap<>();
map.put("msgId", msgId);
map.put("sendTime", sendTime);
map.put("msg", msg);
map.put("correlationData", correlationData);
return map;
}
}
Готово! Далее мы тестируем, отправляем сообщение, которое мы можем утешить:Предполагая, что маршрут, соответствующий очереди для отправки сообщения, не соответствует, вы можете увидеть следующую информацию:
Это режим подтверждения. что он делаетЧтобы гарантировать, что сообщение не будет потеряно, когда производитель доставит сообщение RabbitMQ.
3.2 Механизм транзакций (ACK)
Первая картинка уже сказала,После того, как потребитель получит сообщение из очереди, он напрямую подтвердит получение.Если потребитель не работает или программа работает ненормально и данные не потребляются нормально, в этом случае произойдет потеря данных..
Следовательно, ключ состоит в том, чтобы изменить автоматический вход в действие в ручную вход и вернуться к подтверждению входа для обычного потребления. Если есть исключение, верните, чтобы отклонить вход и вернуться в очередь.Как реализовать код, смотрите демо:
первое место среди потребителейapplication.ymlУстановить фиксацию транзакции в файле какmanualРучной режим:
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: manual # 手动ack模式
concurrency: 1 # 最少消费者数量
max-concurrency: 10 # 最大消费者数量
Затем напишите слушателя потребителя:
@Component
public class RabbitDemoConsumer {
enum Action {
//处理成功
SUCCESS,
//可以重试的错误,消息重回队列
RETRY,
//无需重试的错误,拒绝消息,并从队列中删除
REJECT
}
@RabbitHandler
@RabbitListener(queuesToDeclare = @Queue(RabbitMQConfig.RABBITMQ_DEMO_TOPIC))
public void process(String msg, Message message, Channel channel) {
long tag = message.getMessageProperties().getDeliveryTag();
Action action = Action.SUCCESS;
try {
System.out.println("消费者RabbitDemoConsumer从RabbitMQ服务端消费消息:" + msg);
if ("bad".equals(msg)) {
throw new IllegalArgumentException("测试:抛出可重回队列的异常");
}
if ("error".equals(msg)) {
throw new Exception("测试:抛出无需重回队列的异常");
}
} catch (IllegalArgumentException e1) {
e1.printStackTrace();
//根据异常的类型判断,设置action是可重试的,还是无需重试的
action = Action.RETRY;
} catch (Exception e2) {
//打印异常
e2.printStackTrace();
//根据异常的类型判断,设置action是可重试的,还是无需重试的
action = Action.REJECT;
} finally {
try {
if (action == Action.SUCCESS) {
//multiple 表示是否批量处理。true表示批量ack处理小于tag的所有消息。false则处理当前消息
channel.basicAck(tag, false);
} else if (action == Action.RETRY) {
//Nack,拒绝策略,消息重回队列
channel.basicNack(tag, false, true);
} else {
//Nack,拒绝策略,并且从队列中删除
channel.basicNack(tag, false, false);
}
channel.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
Объясните приведенный выше код.Если нет никаких исключений, вручную подтвердите ответ на BasicAck сервера RabbitMQ (потребление выполнено успешно).
Если возникает какое-либо повторное исключение, мы отвечаем на basicNack и устанавливаем requeue.
Если это исключение, которое нельзя повторно поставить в очередь, оно ответит на basicNack и установит удаление из очереди RabbitMQ.
Следующий тест — отправить обычное сообщение «привет»:Объясните значение трех методов, возвращаемых ack.
①Успешное подтверждение
void basicAck(long deliveryTag, boolean multiple) throws IOException;
Потребитель вызывает этот метод для подтверждения сообщения после его успешной обработки.
- Доставка: Индекс сообщения
- множественный: следует ли пакетировать.. true: все сообщения меньше, чем deliveryTag, будут подтверждены сразу.
② Не удалось подтвердить
void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;
- deliveryTag: индекс сообщения.
- множественный: следует ли пакетировать. true: все сообщения меньше, чем deliveryTag, будут отклонены сразу.
- requeue: повторно ставится в очередь отклоненный элемент.
③ Не удалось подтвердить
void basicReject(long deliveryTag, boolean requeue) throws IOException;
- DeliveryTag: ИНДЕКС сообщения.
- requeue: повторно ставится в очередь отклоненный элемент.
Разница между basicNack() и basicReject() заключается в следующем:basicNack() может отклонять пакетами, basicReject() может отклонять только одно сообщение за раз.
В-четвертых, яма столкнулась
4.1 Бесконечный цикл, вызванный включением механизма nack
Приведенный выше код я намеренно написал ошибку. Тест отправляет «плохо», то исключение будет брошено обратно в очередь. Что есть проблема: очередь возврата и потребительские расходы, потребленный потребитель, он вернулся в очередь, вызывая бесконечную петлю.Так как же избежать этой ситуации?
Поскольку nack вызовет бесконечный цикл, я предлагаю одну идею:Не используйте basicNack(), сохраните сообщение, вызывающее исключение, в таблицу и запишите созданное исключение, тело сообщения и идентификатор сообщения. Обработано задачами на время.
Если у вас есть хорошие решения, вы также можете оставить сообщение для обсуждения~
4.2 double ack
Иногда я по невнимательности, случайно включаю автоматический режим Ack, а потом вручную отвечаю на Ack. Он сообщит об этой ошибке:
消费者RabbitDemoConsumer从RabbitMQ服务端消费消息:java技术爱好者
2020-08-02 22:52:42.148 ERROR 4880 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, method-id=80)
2020-08-02 22:52:43.102 INFO 4880 --- [cTaskExecutor-1] o.s.a.r.l.SimpleMessageListenerContainer : Restarting Consumer@f4a3a8d: tags=[{amq.ctag-8MJeQ7el_PNbVJxGOOw7Rw=rabbitmq.demo.topic}], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,5), conn: Proxy@782a1679 Shared Rabbit Connection: SimpleConnection@67c5b175 [delegate=amqp://guest@127.0.0.1:5672/, localPort= 56938], acknowledgeMode=AUTO local queue size=0
Эта ошибка возникает, вы можете проверить, добавил ли файл YML следующую конфигурацию:
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: manual
concurrency: 1
max-concurrency: 10
Если вышеуказанная конфигурация была добавлена, она все еще сообщает об ошибке,Возможно, вы использовали @Configuration для настройки SimpleRabbitListenerContainerFactory.По характеристикам SpringBoot код лучше, чем конфигурация.Конфигурация кода переопределяет конфигурацию yml, и вы забываете установить ручной режим.:
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
//设置手动ack模式
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return factory;
}
Если вы все еще получаете ошибку, она может быть написана не в том месте, в проекте производителя. Вышеупомянутая конфигурация должна быть настроена в проекте потребителя. Потому что режим ack предназначен для потребителей. Я просто не так написала, написала на продюсера, несколько часов кинула, слезы~
4.3 Проблемы с производительностью
На самом деле, ручной ACK определенно намного медленнее, чем автоматический, я проверил некоторую информацию в Интернете, и разница в производительности составляет примерно 10 раз. Поэтому обычно не рекомендуется открывать ручной режим ACK в практических приложениях. Тем не менее, открытие не является абсолютно невозможным, и конкретная ситуация подробно анализируется в зависимости от количества параллелизма, важности данных и так далее.
такВ реальных проектах необходимо взвесить важность параллелизма и данных, а затем принять решение о конкретном плане..
4.4 Включите режим ручного подтверждения, если нет своевременного ответа, это вызовет исключение из очереди
Если ручной режим ACK включен, но из-за ошибки в коде сервер RabbitMQ не отвечает, то сообщение будет помещено в кучу сообщений в состоянии Unacked, а сообщение Ready будет передано только при связь потребителя прервана. Если потребитель не был отключен, будет все больше и больше сообщений Unacked, которые будут занимать все больше и больше памяти, и, наконец, произойдет исключение.
Эта проблема, я не могу использовать свой компьютер для демонстрации, мой компьютер слишком завис.
V. Резюме
После вышеизложенного делается вывод, что у RabbitMQ есть три способа предотвратить потерю данных:
- сохранение сообщения
- Механизм подтверждения сообщения производителя (режим подтверждения)
- Режим подтверждения сообщения потребителя (режим подтверждения)
Коды всех приведенных выше примеров выложены на гитхаб:
Если вы считаете, что эта статья была вам полезна, ставьте лайк~
Ваши лайки - самая большая мотивация для моего творчества~
Если вы хотите впервые увидеть мои обновленные статьи, вы можете выполнить поиск в общедоступной учетной записи на WeChat "java技术爱好者
",Не хочу быть соленой рыбой, я программист, стремящийся запомниться всем. Увидимся в следующий раз! ! !
Возможности ограничены, если есть какие-то ошибки или неуместности, просьба критиковать и исправлять их, учиться и обмениваться вместе!