Серия RabbitMQ (4) Транзакция RabbitMQ и подтверждение подтверждения сообщения отправителя - углубленная интерпретация

задняя часть сервер RabbitMQ Wireshark

введение

По предыдущим знаниям (Глубокое понимание принципа работы RabbitMQ и простота использования,Введение и практика нескольких режимов работы Кролика) Мы знаем, что если вы хотите обеспечить надежность сообщения, вам необходимо сохранить сообщение.Однако, помимо настройки кода, есть еще один важный шаг в сохранении сообщения, который заключается в обеспечении бесперебойной работы вашего сообщения. Введите Брокер (прокси-сервер), как показано на рисунке:

При нормальных обстоятельствах, если сообщение попадает в очередь через биржу, сохранение сообщения может быть завершено, но если произойдет авария до того, как сообщение достигнет брокера, сообщение будет потеряно. решить эту проблему?

У RabbitMQ есть два способа решить эту проблему:

  1. Реализуется через механизм транзакций, предоставляемый AMQP;
  2. Реализовано с использованием режима подтверждения отправителя;

1. Деловое использование

Реализация транзакции заключается в основном в установке канала (Channel), есть три основных метода:

  1. channel.txSelect() объявляет запуск режима транзакции;

  2. channel.txComment() фиксирует транзакцию;

  3. channel.txRollback() откатывает транзакцию;

Из приведенного выше видно, что все транзакции начинаются с tx, что должно быть аббревиатурой расширения транзакции (модуль расширения транзакции).Если есть точное объяснение, пожалуйста, оставьте сообщение под блогом.

Давайте посмотрим на конкретную реализацию кода:

// 创建连接
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername(config.UserName);
factory.setPassword(config.Password);
factory.setVirtualHost(config.VHost);
factory.setHost(config.Host);
factory.setPort(config.Port);	
Connection conn = factory.newConnection();
// 创建信道
Channel channel = conn.createChannel();
// 声明队列
channel.queueDeclare(_queueName, true, false, false, null);
String message = String.format("时间 => %s", new Date().getTime());
try {
	channel.txSelect(); // 声明事务
	// 发送消息
	channel.basicPublish("", _queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
	channel.txCommit(); // 提交事务
} catch (Exception e) {
	channel.txRollback();
} finally {
	channel.close();
	conn.close();
}

Примечание. Пользователям необходимо настроить config.xx как собственную информацию о Rabbit.

Из приведенного выше кода мы видим, что код перед отправкой сообщения такой же, как и предыдущий, но перед отправкой сообщения нужно объявить канал как режим транзакции, и можно зафиксировать или откатить транзакцию.

Разобравшись с реализацией транзакции, а затем с тем, как выполняется транзакция, воспользуемся wireshark для перехвата пакета, как показано на рисунке:

Введите ip.addr==rabbitip && amqp для просмотра связи между клиентом и кроликом, вы можете увидеть процесс взаимодействия:

  • Клиент отправляет на сервер Tx.Select (режим открытой транзакции)
  • Сервер возвращает Tx.Select-Ok (режим открытой транзакции в порядке)
  • вперед новости
  • Клиент отправляет для фиксации транзакции Tx.Commit
  • Серверная сторона возвращает Tx.Commit-Ok

Вышеупомянутое завершает процесс взаимодействия с транзакцией.Если есть проблема в какой-либо из ссылок, будет выброшено исключение IoException для ее удаления, чтобы пользователь мог перехватить исключение для отката транзакции или решить, следует ли повторить сообщение.

Ну, так как транзакция уже есть, использовать режим с подтверждением отправителя нечего, потому что производительность транзакции очень низкая.Тест производительности транзакции:

Режим транзакции, результат выглядит следующим образом:

  • Режим транзакции, отправка 1 Вт данных, время выполнения: 14197 с.
  • Режим транзакции, отправка 1 Вт данных, время выполнения: 13597 с.
  • Режим транзакции, отправка данных 1w, время выполнения: 14216s

В нетранзакционном режиме результат следующий:

  • Нетранзакционный режим, отправка 1 Вт данных, время выполнения: 101 с.
  • Нетранзакционный режим, отправка 1 Вт данных, время выполнения: 77 с.
  • Нетранзакционный режим, отправка 1 Вт данных, время выполнения: 106 с.

Как видно из вышеизложенного, производительность нетранзакционного режима в 149 раз выше производительности транзакционного режима.Мой компьютерный тест выглядит так.Разные конфигурации компьютеров немного отличаются, но вывод один и тот же, производительность транзакционного режима режим намного хуже. , есть ли решение, способное не только обеспечить достоверность сообщения, но и учесть производительность? Это режим подтверждения подтверждения отправителя, который будет обсуждаться далее.

расширить знания

Мы знаем, что потребители могут использовать сообщения для автоматической или ручной отправки подтверждающих сообщений о потреблении, поэтому, если мы используем транзакции в режиме потребителя (конечно, если мы используем подтверждающие сообщения вручную, транзакции вообще не используются), что произойдет?

Потребительский шаблон использует транзакции

Предполагая, что транзакции используются в потребительском шаблоне, и транзакция откатывается после подтверждения сообщения, какие изменения внесет RabbitMQ?

Результаты делятся на два случая:

  1. AutoAck=false поддерживает транзакции при ручном ответе, то есть, даже если вы вручную подтвердили, что сообщение было получено, после сообщения с подтверждением будет ожидаться разрешение возврата транзакции, прежде чем принимать решение о подтверждении. сообщение или поместить его обратно в очередь. Если вы вручную подтвердите текущую транзакцию, а затем откатите транзакцию, то транзакция откатится как основная, и сообщение будет снова поставлено в очередь;
  2. autoAck=true Если самоподтверждение истинно, то транзакция не поддерживается, то есть даже если вы откатите транзакцию после получения сообщения, это не поможет, очередь удалила сообщение;

2. Подтвердите режим подтверждения отправителя

Подтверждение Режим подтверждения отправителя аналогичен транзакции, и он также подтверждается отправителем путем установки канала.

Три реализации Confirm:

Способ 1: channel.waitForConfirms() обычный режим подтверждения отправителя;

Способ 2: режим пакетного подтверждения channel.waitForConfirmsOrDie();

Способ 3: channel.addConfirmListener() асинхронно отслеживает режим подтверждения отправителя;

Способ 1: обычный режим подтверждения

// 创建连接
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername(config.UserName);
factory.setPassword(config.Password);
factory.setVirtualHost(config.VHost);
factory.setHost(config.Host);
factory.setPort(config.Port);
Connection conn = factory.newConnection();
// 创建信道
Channel channel = conn.createChannel();
// 声明队列
channel.queueDeclare(config.QueueName, false, false, false, null);
// 开启发送方确认模式
channel.confirmSelect();
String message = String.format("时间 => %s", new Date().getTime());
channel.basicPublish("", config.QueueName, null, message.getBytes("UTF-8"));
if (channel.waitForConfirms()) {
	System.out.println("消息发送成功" );
}

Глядя на код, мы можем знать, что нам нужно использовать только channel.confirmSelect(), чтобы объявить режим подтверждения отправителя перед отправкой сообщения, а затем использовать channel.waitForConfirms(), чтобы дождаться подтверждения сообщения сервером.

Способ 2: режим пакетного подтверждения

// 创建连接
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername(config.UserName);
factory.setPassword(config.Password);
factory.setVirtualHost(config.VHost);
factory.setHost(config.Host);
factory.setPort(config.Port);
Connection conn = factory.newConnection();
// 创建信道
Channel channel = conn.createChannel();
// 声明队列
channel.queueDeclare(config.QueueName, false, false, false, null);
// 开启发送方确认模式
channel.confirmSelect();
for (int i = 0; i < 10; i++) {
	String message = String.format("时间 => %s", new Date().getTime());
	channel.basicPublish("", config.QueueName, null, message.getBytes("UTF-8"));
}
channel.waitForConfirmsOrDie(); //直到所有信息都发布,只要有一个未确认就会IOException
System.out.println("全部执行完成");

Из приведенного выше кода видно, что channel.waitForConfirmsOrDie(), используя синхронный режим, выполнит следующий код после отправки всех сообщений: пока одно сообщение не будет подтверждено, будет выброшено исключение IOException.

Способ 3: асинхронный режим подтверждения

// 创建连接
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername(config.UserName);
factory.setPassword(config.Password);
factory.setVirtualHost(config.VHost);
factory.setHost(config.Host);
factory.setPort(config.Port);
Connection conn = factory.newConnection();
// 创建信道
Channel channel = conn.createChannel();
// 声明队列
channel.queueDeclare(config.QueueName, false, false, false, null);
// 开启发送方确认模式
channel.confirmSelect();
for (int i = 0; i < 10; i++) {
	String message = String.format("时间 => %s", new Date().getTime());
	channel.basicPublish("", config.QueueName, null, message.getBytes("UTF-8"));
}
//异步监听确认和未确认的消息
channel.addConfirmListener(new ConfirmListener() {
	@Override
	public void handleNack(long deliveryTag, boolean multiple) throws IOException {
		System.out.println("未确认消息,标识:" + deliveryTag);
	}
	@Override
	public void handleAck(long deliveryTag, boolean multiple) throws IOException {
		System.out.println(String.format("已确认消息,标识:%d,多个消息:%b", deliveryTag, multiple));
	}
});

Преимущество асинхронного режима заключается в том, что эффективность выполнения высока, и нет необходимости ждать завершения выполнения сообщения, а нужно только отслеживать сообщение.Информация, возвращаемая асинхронно выше, выглядит следующим образом:

Видно, что код выполняется асинхронно, и подтверждение сообщения может быть подтверждено пакетами.Независимо от того, зависит ли пакетное подтверждение от возвращаемого множественного параметра, этот параметр является логическим значением, если true означает, что все сообщения до значения deliveryTag выполняется партиями, если ложно, то это означает одно подтверждение.

Подтвердить тест производительности

Тестовая предпосылка: как и в случае с транзакциями, мы отправляем сообщения 1w.

Способ 1: подтвердить нормальный режим

  • Время выполнения: 2253 с
  • Время выполнения: 2018г.
  • Время исполнения: 2043с

Способ 2: Подтвердить пакетный режим

  • Время выполнения: 1576 с
  • Время выполнения: 1400с
  • Время выполнения: 1374 с

Способ 3: Подтвердите метод асинхронного мониторинга

  • Время выполнения: 1498 с
  • Время выполнения: 1368 с
  • Время выполнения: 1363 с

Суммировать

Основываясь на общей тестовой ситуации, производительность подтверждения пакетного определения и подтверждения асинхронного режима не сильно отличается, а режим подтверждения примерно в 10 раз быстрее, чем транзакция.

Нажмите и удерживайте QR-код, чтобы подписаться на мой технический общедоступный аккаунт.