Как RabbitMQ гарантирует доступность сообщений

Java задняя часть

1. Введение в RabbitMQ

AMQP (Advanced Message Queuing Protocol, расширенный протокол очереди сообщений) — это стандартный протокол расширенной очереди сообщений прикладного уровня, предоставляющий службы унифицированного обмена сообщениями.Это открытый стандарт для протоколов прикладного уровня, разработанный для ПО промежуточного слоя, ориентированного на сообщения.

RabbitMQ, брокер сообщений и сервер очередей, реализует стандартный протокол AMQP.

Распределенные очереди сообщений имеют множество сценариев приложений, таких как асинхронная обработка, развязка приложений и сглаживание пиков трафика.

1. Асинхронная обработка

После регистрации пользователи должны отправлять текстовые сообщения и электронные письма.Традиционный метод заключается в том, чтобы сначала записать информацию о пользователе в базу данных, затем отправить текстовые сообщения и электронные письма и вернуться после завершения.

Если используется очередь сообщений, информация о пользователе может быть сначала записана в базу данных, а затем регистрационная информация может быть записана в очередь сообщений.Отправьте SMS, электронное письмо или другую бизнес-логику, чтобы подписаться на это сообщение и завершить отправку.

2. Разделение приложений

В приведенном выше примере, если на большом распределенном веб-сайте пользовательская система, система SMS и почтовая система могут быть независимыми системными службами.

В это время, после успешной регистрации пользователя, можно удаленно вызывать разные сервисные интерфейсы через RPC, но лучше подписаться на интересующие вас данные через очередь сообщений, даже если вы будете добавлять или удалять функции в будущем, главное дело менять не надо.

3. Отсечение пиков трафика

Как правило, в целях SPIKE или групповой покупки наличие трафика и приложение под слишком большим давлением. Вы можете присоединиться к очереди сообщений на переднем конце приложения и ограничить количество активных людей, установив максимальную длину очереди. В это время задний сервер может легко обработать данные.

2. Обмен сообщениями

В протоколе AMQP есть несколько основных понятий, которые мы должны понять в первую очередь.

1. Виртуальный хост

Виртуальные хосты, каждый виртуальный хост содержит все основные компоненты AMQP, в виртуальном хосте создаются пользователи, команды, коммутаторы и т.д. Типичное использование заключается в том, что если несколько продуктов компании хотят использовать только один сервер, их можно разделить на разные виртуальные хосты, и любая информация в нем существует независимо и не мешает друг другу.

2. Подключение

Соединение, TCP-соединение между приложением и сервером.

3. Канал

Канал, когда ваше приложение подключается к серверу, создает TCP-соединение. После открытия TCP-соединения можно создать канал, поэтому канал является внутренней логической единицей TCP-соединения. Это связано с тем, что создание и уничтожение TCP-соединения требует относительно больших затрат, а установление нового TCP-соединения для каждого доступа не только является огромным расточительством, но и может привести к узким местам в производительности системы.

4. Очередь

Очередь, куда в конечном итоге будут отправлены все сообщения, ожидающие, пока их подхватят те, кто заинтересован.

5. Обмен

Обмен, первая станция для сообщения, которое достигает службы, является обменом, а затем в соответствии с правилами распределения ключ маршрутизации сопоставляется, и сообщение помещается в соответствующую очередь. Стоит отметить, что существует более одного типа обмена.

  • Прямой Непосредственно подключен к обмену, только когда ключ маршрутизации в сообщении согласуется с ключом в отношениях привязки, обмен отправит сообщение в соответствующую очередь

  • разветвление Широковещательный обмен, пока сообщение отправляется на широковещательный обмен, он отправит сообщение во все очереди.

  • Тема Тематический обмен, согласно ключу маршрутизации, подстановочным правилам (* и #), отправка сообщений в соответствующую очередь

6. Привязка

Привязка, связь привязки между обменом и очередью, привязка содержит ключ маршрутизации, информация о привязке хранится в таблице поиска биржи, и биржа распределяет сообщения в соответствии с ней.

После понимания концепций, связанных с этими компонентами, давайте подытожим, как сообщение передается в RabbitMQ.

3. Постоянство и подтверждение отправителя

1. Настойчивость

На самом деле на приведенном выше рисунке показан лишь базовый процесс потока сообщений.У таких компонентов, как обмены и очереди, есть еще один важный атрибут: постоянство.

По умолчанию, после перезапуска сервера RabbitMQ, созданные нами переключатели и очереди исчезают.Конечно, если есть данные, которые могут быть использованы в будущем, от них будет трудно избавиться. Постоянные обмены и очереди используются для их воссоздания и связывания после перезапуска сервера AMQP.В RabbitMQ установите для свойства durable значение true.

Однако, кроме этого недостаточно. Хотя обмены и очереди гарантированно безопасны, данные, которые еще не использованы, подвергаются риску. Следовательно, нам также необходимо установить постоянный режим доставки сообщения.

Таким образом, если сервер RabbitMQ будет перезапущен, наши политики и связанные данные будут гарантированы. Поэтому мы говорим, что сообщения, которые можно восстановить после сбоя сервера AMQP, называются постоянными сообщениями. Затем он должен гарантировать следующие три пункта:

  • Установите постоянный режим доставки
  • Обменники настойчивы
  • Очередь постоянная

2. Отправитель подтверждает

До сих пор мы хранили сообщение в безопасности. Однако есть еще одна проблема. Поскольку операция публикации не возвращает производителю никакой информации, как узнать, правильно ли сервер принял сообщение и сохранил его на жестком диске?

Для этого мы можем установить канал в режим транзакций. Транзакции являются частью стандарта AMQP, но у RabbitMQ есть лучший подход, который представляет собой режим подтверждения отправителя, подтверждения издателя. Если установлен режим подтверждения, опубликованному сообщению будет присвоен уникальный идентификационный номер, и после того, как сообщение будет доставлено в соответствующую очередь, канал отправит производителю режим подтверждения отправителя (содержащий уникальный идентификатор сообщения).

В-четвертых, интегрируйте с примерами Spring

Столько чепухи, просто чтобы проложить путь к следующей части кода. В конце концов, после понимания вышеизложенного код почти на бумаге.

1. Файл конфигурации

В конфигурационном файле мы должны сначала объявить информацию о сервере RabbitMQ, IP-адресе, номере порта, имени пользователя и пароле и т.д., но самое главное установить режим подтверждения выпуска.

<bean id="rabbitConnectionFactory"
	class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
	<constructor-arg value="127.0.0.1"/>
	<property name="username" value="shiqizhen"/>
	<property name="password" value="shiqizhen"/>
	<property name="port" value="5672"></property>
	<property name="virtualHost" value="shiqizhen"></property>
	<property name="publisherConfirms" value="true"></property>
	<property name="publisherReturns" value="true"></property>
</bean>

Далее объявите обмен и очередь, помните, что они постоянны, это верно.

<rabbit:admin connection-factory="rabbitConnectionFactory"/>
    
//队列的名字、持久化、不要自动删除、不是独享队列
<rabbit:queue name="userInfoQueue" durable="true" auto-delete="false" exclusive="false"/>
//交换器,类型为direct。并绑定交换器和队列的关系,路由键为10086
<rabbit:direct-exchange name="user-exchange" durable="true" auto-delete="false">
	<rabbit:bindings>
		<rabbit:binding queue="userInfoQueue" key="10086"/>
	</rabbit:bindings>
</rabbit:direct-exchange>

Наконец, настройте шаблоны потребителей и сообщений.

//配置消费者 ref为bean的引用 queues指明了消费者与队列的关系
//重要的是acknowledge 确认模式为手动确认
<rabbit:listener-container connection-factory="rabbitConnectionFactory" acknowledge="manual">
        <rabbit:listener ref="consumerListener" queues="userInfoQueue" method="onMessage" />
</rabbit:listener-container>

//配置Spring RabbitMQ消息模板 
<bean id="rabbitTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate">
	<constructor-arg ref="rabbitConnectionFactory"></constructor-arg>
	<property name="confirmCallback" ref="publisherConfirm"></property>
	<property name="returnCallback" ref="returnMsgCallBack"></property>
	<property name="mandatory" value="true"></property>
</bean>

2. Производители

Выше мы объявили rabbitTemplate и можем напрямую использовать его метод send для отправки сообщений. Однако у него есть несколько параметров, которые необходимо понять в первую очередь.

  • обмен Имя биржи, на которую отправляется сообщение
  • ключ маршрутизации Ключ маршрутизации, как биржа распределяет сообщение в соответствующую очередь
  • Сообщение Объект тела сообщения, который содержит тело сообщения и свойства сообщения. Свойства сообщения содержат много вспомогательной информации, такой как тип содержимого сообщения, идентификатор сообщения, идентификатор пользователя и т. д.
  • корреляционные данные Данные, связанные с сообщением, на самом деле имеют только атрибут ID. Но очень важно, чтобы этот параметр был включен в подтвержденный издателем метод обратного вызова. Мы можем визуально увидеть, какое сообщение было отправлено успешно или не удалось, основываясь на нем.
@Controller
public class IndexController {
	
	@Autowired
	RabbitTemplate rabbitTemplate;

	@RequestMapping("/send_msg")
	@ResponseBody
	public User send_msg() {	
		String exchange = "user-exchange";
		String routingKey = "10086";
		
		User user = new User();
		String id = IdUtil.getId();
		user.setUid(id);
		user.setUsername("小小沙弥");
		user.setPassword("1234");
		user.setCreatetime(DateUtil.getDateTime(new Date()));
		
		CorrelationData correlation = new CorrelationData(id);
		Message message = new Message(JSONObject.toJSONBytes(user, SerializerFeature.WriteNullStringAsEmpty), new MessageProperties());
		logger.info("已发送消息到RabbitMQ服务器:{}",JSONObject.toJSONString(user));
		rabbitTemplate.send(exchange, routingKey,message,correlation);	
		return user;
	}
}

3. Потребители

Потребитель — это bean-компонент, на который ссылается ссылка прослушивателя, которую мы настроили выше. Помните, что мы установили режим подтверждения на ручное подтверждение, поэтому на стороне потребителя есть очень важное действие, которое заключается в подтверждении сообщения.

  • канал.basicAck(доставкаТаг, Ложь) Первый параметр — это идентификатор сообщения, сгенерированный внутри RabbitMQ, а второй параметр указывает, следует ли подтверждать сообщения пакетами. С помощью этой инструкции мы сообщаем производителю, что сообщение было правильно использовано, и RabbitMQ удалит сообщение с диска.
  • channel.basicReject(deliveryTag, ложь) отклонить сообщение. Если потребляемое сообщение не то, что нам нужно, или во время обработки сообщается об ошибке, мы можем отклонить сообщение. Но стоит отметить второй параметр. Если установлено значение false, сообщение отклоняется и удаляется с сервера; если установлено значение true, сообщение отклоняется и возвращается в очередь. Если у вас есть только один потребитель, лучше не устанавливать для этого параметра значение true, иначе сообщение будет повторяться до тех пор, пока сервер-потребитель не будет уничтожен. Если оно отклонено из-за сбоя обработки, лучше удалить сообщение и записать его в файл журнала или базу данных.
@Service
public class ConsumerListener implements ChannelAwareMessageListener{

	Logger logger = LoggerFactory.getLogger(this.getClass());
	
	public void onMessage(Message message, Channel channel) throws Exception {
		
		logger.info("消费者监听到RabbitMQ消息...");
		MessageProperties properties = message.getMessageProperties();
		String msg = new String(message.getBody(),"utf-8");
		logger.info("交换器:{},路由键:{}",properties.getReceivedExchange(),properties.getReceivedRoutingKey());
		logger.info("消息内容:{}",msg);	
		long deliveryTag = properties.getDeliveryTag();
		channel.basicAck(deliveryTag, false);//确认信息,false为不批量确认
		//channel.basicReject(deliveryTag, true);//true为重入队列 false为删除消息
	}
}

4. Отправитель подтверждает

Когда мы отправляем сообщение в RabbitMQ, первой остановкой является обмен. Независимо от того, может ли RabbitMQ правильно получать сообщения, мы полагаемся на него, чтобы давать обратную связь. CorrelationData здесь задается на стороне производителя, мы можем использовать его как идентификатор сообщения, или мы можем напрямую написать сообщение здесь.

@Component
public class PublisherConfirm implements ConfirmCallback{

	Logger logger = LoggerFactory.getLogger(this.getClass());
	
	public void confirm(CorrelationData correlationData, boolean ack, String cause) {
		if (ack) {
			logger.info("消息投递成功!");
		}else {
			logger.warn("消息投递失败,原因:{},消息ID:{}",cause,correlationData.getId());
		}
	}
}

Если мы ошибемся в названии биржи, здесь вы получите следующее сообщение:

22:57:51,635  WARN PublisherConfirm:19 - 消息投递失败,原因:
channel error; protocol method: #method<channel.close>
(reply-code=404, reply-text=NOT_FOUND - no exchange 'user-exchange_xxx' in vhost 'shiqizhen', class-id=60, method-id=40),

消息ID:516387069669408768
  22:57:51,638 ERROR CachingConnectionFactory:1278 - Channel shutdown: 
channel error; protocol method:
 #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'user-exchange_xxx' in vhost 'shiqizhen', class-id=60, method-id=40)

5. Обратный обратный вызов

В дополнение к установке подтверждения отправителя RabbitMQ, в Spring также есть значение publisherReturns, которое мы отмечаем. Хотя мы отправили сообщение на биржу, стоит знак вопроса, может ли биржа корректно распределить сообщение в соответствующую очередь. PublisherReturns вступает в игру, если сообщение не может быть доставлено в указанную очередь. Помните, что если вы хотите применить эту функцию, вам нужно установить обязательное значение true.

@Component
public class ReturnMsgCallBack implements ReturnCallback{

	Logger logger = LoggerFactory.getLogger(this.getClass());
	
	public void returnedMessage(Message message, int replyCode, 
					String replyText, String exchange, String routingKey) {
		logger.info("消息内容:{}",new String(message.getBody()));
		logger.info("回复文本:{},回复代码:{}",replyText,replyCode);
		logger.info("交换器名称:{},路由键:{}",exchange,routingKey);	
	}
}

Если мы случайно напечатаем имя ключа маршрутизации, он будет вызван здесь.

23:24:27,813  INFO ReturnMsgCallBack:16 - 消息内容:{"createtime":"2018-11-25 23:24:24","password":"1234","role":null,"uid":"516393749815754752","username":"小小沙弥"}
23:24:27,814  INFO ReturnMsgCallBack:17 - 回复文本:NO_ROUTE,回复代码:312
23:24:27,814  INFO ReturnMsgCallBack:18 - 交换器名称:user-exchange,路由键:10086_xxx

//这里是发送方确认打印的信息 说投递到交换器成功
23:24:27,814  INFO PublisherConfirm:17 - 消息投递成功!

Есть проблема, как и в первом примере, если имя ключа маршрутизации написано неправильно, отправитель подтверждает, что напечатает сообщение об исключении, что ack является ложным, но почему он не вызывает publisherReturns?

Если ключ маршрутизации неверен, сообщение никогда не было получено. Это должно быть серьезной ошибкой, поэтому RabbitMQ напрямую закрыл текущий канал.

Channel shutdown: 
channel error; protocol method:
reply-code=404, reply-text=NOT_FOUND - no exchange 'user-exchange_xxx' in vhost ...

Пять, следите за состоянием сервера RabbitMQ.

Если ваша служба RabbitMQ не является кластером, что мы делаем, когда служба RabbitMQ останавливается из-за сбоя сети или по другим причинам? Конечно, вы можете добавить try/catch в метод Send, чтобы возвращать свой статус на основе информации о перехвате. Но есть лучшая идея, которую можно использовать в комбинации. При создании подключения службы RabbitMQ нам нужно настроить bean-компонент,CachingConnectionFactoryу него есть методaddConnectionListener, мы можем использовать его для мониторинга состояния подключения к серверу.

public class RabbitMQConnectionListener implements ConnectionListener{
	public void onCreate(Connection connection) {
		System.out.println("服务器已启动...");
	}
	public void onClose(Connection connection) {
		System.out.println("服务器已关闭...");
	}
}

И в подходящем месте, например в методе инициализации контейнера Spring, добавить такое предложениеrabbitConnectionFactory.addConnectionListener(new RabbitMQConnectionListener());

Таким образом, мы можем понять статус подключения сервера RabbitMQ, а затем мы можем судить об этом статусе, когда производитель вызывает метод отправки в соответствии с этим статусом. Если нет подключения, вы можете сначала сохранить сообщение в базе данных или в кэше. При подключении к RabbitMQ мы сначала достаем кешированное сообщение и отправляем его, а затем сбрасываем состояние на «подключено».

6. Резюме

В этой статье кратко представлены связанные концепции стандарта протокола AMQP и то, как RabbitMQ правильно настраивает и использует такие механизмы, как постоянные сообщения, режим отправителя и обратные вызовы возврата в Spring. И в конце рассказывается, как отслеживать состояние подключения к серверу RabbitMQ в Spring. Одним словом, как мы используем RabbitMQ, чтобы гарантировать, что сообщения не будут потеряны. Надеюсь, эта статья помогла вам использовать RabbitMQ!