Всем известно, что в Rocketmq есть концепция ConsumerGroup. В кластерном режиме несколько серверов настраиваются с одной и той же группой ConsumerGroup, так что только один сервер принимает сообщения одновременно (обратите внимание, что не гарантируется, что они будут использованы только один раз, и существует нестабильность сети). Итак, автор очень озадачен, как Rocketmq реализует этот режим? Как сделать так, чтобы потреблял только один сервер?
Хотя ответ прост, это хорошая возможность взглянуть на исходный код вопроса.
Структура RocketMq
Как видно из рисунка, MQ в основном имеет две связи: доставка сообщений и получение сообщений.
Многие архитектуры следуют веяниям времени.Конечно, структурная система Rocketmq изначально не создана Али, а основана на протоколе AMQP. Производитель, брокер и потребитель в Rocketmq основаны на концепциях AMQP. Итак, давайте поговорим о AMQP (Advanced Message Queuing Protocol, Расширенный протокол очереди сообщений) здесь, чтобы каждый мог лучше понять процесс разработки технологии.
скачать документ http://www.amqp.org/specification/0-9-1/amqp-org-download
- Брокер: приложения, которые получают и распространяют
- Виртуальный хост: разделите основные компоненты AMQP на виртуальную группу по соображениям мультиарендности и безопасности. Каждый арендатор изолирован от сети, аналогично концепции пространства имен в Linux (вы можете сами найти это в Google).
- Соединение: TCP-соединение между издателем/потребителем и брокером.
- Канал: это более легкое соединение, чем соединение, и является логическим соединением на соединении.
- Exchange: отвечает за распределение сообщений по разным очередям.
- Очередь: сообщение в конечном итоге попадет в Очередь, сообщение будет отправлено Потребителю Брокером или сообщение будет извлечено Потребителем.
- Связывание: стратегия маршрутизации сообщений между обменом и очередью
3 типа очередей сообщений
Конечно, на основе такого протокола в выборе очереди сообщений блистает не только RocketMq, но и разные очереди сообщений.
https://mp.weixin.qq.com/s/B1D-J_1wpaqj0sxcmaArbQ
В основном делятся на 3 лагеря:
- Есть потоки Broker с тяжелыми темами: kafka, JMS
- Существует поток темы Broker light: RocketMQ
- Нет брокера: ZeroMQ
Конечно, если вы знакомы с протоколом AMQP, вы также можете самостоятельно разработать очередь сообщений.
https://zhuanlan.zhihu.com/p/28967866
Имея некоторую предысторию, давайте взглянем на процесс доставки сообщений в RocketMQ. Или конкретный вопрос, как RocketMQ выбирает очередь для доставки?
Как Producer доставляет сообщения в разные очереди
Здесь упоминается, что весь код для производителей и потребителей в RocketMq находится в клиентском пакете. Откройте исходный код, вы увидите, что в Procuder есть пакет селектора, и посмотрите, подходит ли этот пакет.
Вы можете видеть, что три класса под селектором реализуют MessageQueueSelector, давайте посмотрим на код MessageQueueSelector.
public interface MessageQueueSelector {
MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);
}
public class MessageQueue {
private String topic;
private String brokerName;
private int queueId;
}
Посмотрите, где вызывается MessageQueueSelector.select(), и обнаружите, что это DefaultMQProducerImpl, после чего вы сможете подтвердить, какая очередь выбрана MessageQueueSelector.
RocketMq предоставляет 3 различных способа выбора очередей:
- SelectMessageQueueByHash
- SelectMessageQueueByMachineRoom
- SelectMessageQueueByRandom
Количество очередей по умолчанию
Внимательные школьники обязательно спросят, а количество очередей бесконечно? Это можно найти в руководстве RocketMq.Количество очередей по умолчанию равно 4 (defaultTopicQueueNums: 4).Конечно, вы также можете настроить его самостоятельно.
При этом не знаю может кто из одноклассников нашел не то место.Просто автор сначала нашел не то место.Также нашел selectOneMessageQueue в TopicPublishInfo.Код такой.
public class TopicPublishInfo{
// 不同版本,代码有些不同,逻辑类似
public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
if (lastBrokerName != null) {
int index = this.sendWhichQueue.getAndIncrement();
for (int i = 0; i < this.messageQueueList.size(); i++) {
int pos = Math.abs(index++) % this.messageQueueList.size();
MessageQueue mq = this.messageQueueList.get(pos);
if (!mq.getBrokerName().equals(lastBrokerName)) {
return mq;
}
}
return null;
}
else {
int index = this.sendWhichQueue.getAndIncrement();
int pos = Math.abs(index) % this.messageQueueList.size();
return this.messageQueueList.get(pos);
}
}
}
Я проверил вызывающего абонента и обнаружил, что это MQFaultStrategy.Похоже, что когда Rocketmq не удается использовать, он повторно доставляет сообщение в другую очередь, чтобы его можно было распределить по разным машинам для использования в кластерном режиме. (Вы все еще задаетесь вопросом, почему разные машины могут быть гарантированы, пожалуйста, прочитайте ниже)
Как потребитель получает сообщения из очереди сообщений
Это более сложный шаг для понимания.Во-первых, вы можете посмотреть руководство RocketMQ:
Все потребители RocketMQ извлекают сообщения из брокера для потребления, но для получения сообщений в режиме реального времени RocketMQ использует метод длительного опроса, чтобы гарантировать, что характер сообщений в реальном времени согласуется с методом push. Этот метод длительного опроса аналогичен механизму получения сообщений Web QQ. Пожалуйста, обратитесь к следующей информации для получения дополнительной информации. http://www.ibm.com/developerworks/cn/web/wa-lo-comet/
Хотя объяснение очень подробное, оно все же не очень дружелюбно для новичков. Проще говоря, используется long polling, клиент инициирует запрос и сервер подключается первым, но если на сервере нет данных, это соединение или холд, и соединение закрывается, когда есть данные, проталкиваемые клиенту. Это не только гарантирует, что потребители не будут перегружены восходящими сообщениями, но также обеспечивает характер сообщений в реальном времени.
Тогда возникает еще один вопрос: как Consumer извлекает сообщения из MessageQueue? Это случайная тяга?
Давайте посмотрим на MQPullConsumer, от него наследуется DefaultMQPullConsumer.
public class MQPullConsumer {
// 拉消息,非阻塞
//
// @param mq from which message queue
// @param subExpression 订阅的tag,只支持"tag1 || tag2 || tag3"
// @param offset 标志位
// @param maxNums 消费最大数量
PullResult pull(final MessageQueue mq, final String subExpression, final long offset,
final int maxNums) throws MQClientException, RemotingException, MQBrokerException,
InterruptedException;
}
Вы можете видеть, что MessageQueue передается, что смущает Я действительно не могу понять, когда решать, из какой очереди извлекать сообщения. Благодаря всемогущей поисковой системе,
https://zhuanlan.zhihu.com/p/25140744
RocketMq имеет специальный класс AllocateMessageQueueStrategy.class, который скрыт в пакете Client.Consumer.rebalance.
- AllocateMessageQueueAveragely
- AllocateMessageQueueAveragelyByCircle
- AllocateMessageQueueByConfig
- AllocateMessageQueueByMachineRoom
- AllocateMessageQueueConsistentHash
Каждый раз, когда количество потребителей изменяется, будет запускаться AllocateMessageQueueStrategy. То есть очередь, извлекаемая Потребителем каждый раз, является фиксированной.
Теперь, оглядываясь назад на первую диаграмму архитектуры RocketMQ, вы думаете, что рисунок очень тщательный.
Суммировать
- Любой фреймворк имеет историю производных изменений, и только понимая историю архитектурных изменений, можно лучше понять фреймворк.
- Внимательно прочитайте инструкцию, в ней много архитектурных подробностей
- Изучайте исходный код с вопросами