Прошлый опыт говорит нам, что для режима push часто требуется длинная связь между брокером и потребителем.После того, как производитель успешно отправит сообщение брокеру, брокер продолжит активно отправлять сообщение потребителю.
Реализация Roket MQ использует другой подход: push-режим ta по сути является оболочкой для pull-режима. Автоматизируйте действие извлечения вручную, передав его выделенному потоку.
Это означает, что в Rocket MQ нет реального режима push.
Начните с PullMessageService
Ключом к автоматизации извлечения сообщений является PullMessageService. Обычно знание класса из отношения наследования ta дает огромную награду, но иногда мы не можем начать из-за сложного отношения наследования.К счастью, система наследования ta относительно проста, что помогает нам выяснить контекст как как можно скорее.Из диаграммы UML мы можем естественным образом сгенерировать следующие ассоциации:
- Реализуйте интерфейс Runnable, возможно, метод запуска является записью для автоматического извлечения сообщений.
- Основываясь на первом пункте, мы делаем вывод, что может быть поток, который молча поддерживает работу метода run.
- Основываясь на первых двух пунктах и соглашениях об именах, у нас есть основания полагать, что ServiceThread должен быть связан с потоками.
public abstract class ServiceThread implements Runnable {
/* 内部持有一个线程 */
private Thread thread;
/* 线程安全的boolean类型 */
private final AtomicBoolean started = new AtomicBoolean(false);
public void start() {
/* started保证了线程只会被启动一次 */
if (!started.compareAndSet(false, true)) {
return;
}
stopped = false;
/* getServiceName()抽象方法交由子类实现,用以获取线程名称 */
this.thread = new Thread(this, getServiceName());
this.thread.setDaemon(isDaemon);
this.thread.start();
}
}
Исходный код ServiceThread идеально соответствует нашему предыдущему предположению, поэтому мы можем сместить фокус на метод запуска PullMessageService.
public class PullMessageService extends ServiceThread {
/* PullRequest队列 */
private final LinkedBlockingQueue<PullRequest> pullRequestQueue
= new LinkedBlockingQueue<>();
@Override
public void run() {
/* 每次拉取消息都要检测自身状态,改变stopped的状态可以停止拉取消息逻辑 */
while (!this.isStopped()) {
/* pullRequestQueue 队列为空会阻塞 */
PullRequest pullRequest = this.pullRequestQueue.take();
/* 进行消息拉取 */
this.pullMessage(pullRequest);
}
}
}
Очевидно, что это модель производитель-потребитель. Когда в очереди pullRequestQueue есть объект PullRequest, метод run извлечет объект и выполнит логику извлечения.Если очередь производителя пуста, заблокируйте очередь и приостановите поведение потребления. Детали извлечения сообщений до сих пор не были известны, давайте взглянем на метод pullMessage.
public class PullMessageService extends ServiceThread {
private final MQClientInstance mQClientFactory;
public PullMessageService(MQClientInstance mQClientFactory) {
this.mQClientFactory = mQClientFactory;
}
private void pullMessage(PullRequest pullRequest) {
MQConsumerInner consumer = this.mQClientFactory
.selectConsumer(pullRequest.getConsumerGroup());
DefaultMQPushConsumerImpl impl =
(DefaultMQPushConsumerImpl) consumer;
impl.pullMessage(pullRequest);
}
}
Другими словами, объект mQClientFactory должен передаваться, когда PullMessageService создает экземпляр объекта. Через MQClientInstance#selectConsumer() можно запросить объект-потребитель, соответствующий текущему запросу pullRequest, а затем выполнить логику принудительной передачи.Прямая принудительная передача здесь связана с тем, что объект PullMessageService будет использоваться только в режиме push. Затем делегируйте логику получения сообщений DefaultMQPushConsumerImp#pullMessage().
Слишком много незнакомых объектов сыпется в этот момент.Если автор не объяснит это, боюсь, читателям будет сложно это принять.Просто проанализируйте класс MQClientInstance, соответствующий объекту mQClientFactory.
MQClientInstance
Во-первых, давайте посмотрим, как рождается объект MQClientInstance.При запуске каждого Consumer переменная-член mQClientFactor будет заполнена, но MQClientInstance спроектирован как одноэлементный режим. Все потребители и производители в JVM имеют один и тот же экземпляр MQClientInstance, и потребитель в конце зарегистрируется в mQClientFactory.
public class DefaultMQPushConsumerImpl implements MQConsumerInner {
private MQClientInstance mQClientFactory;
public synchronized void start() throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST: {
/* 初始化 MQClientInstance */
mQClientFactory = MQClientManager.getInstance()
.getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
mQClientFactory.registerConsumer(
this.defaultMQPushConsumer.getConsumerGroup(), this
);
/* 虽然每个Consumer都调用此方法,但只启动一次 */
mQClientFactory.start()
}
default:
break;
}
}
}
Короче говоря, это информация о потребителе переменной-члена ConsumerTable сопровождающего экземпляра MQClientInstance, ключ — это имя ConsumerGroup, а значение — это сам потребитель.
public class MQClientInstance {
private final ConcurrentMap<String, MQConsumerInner> consumerTable
= new ConcurrentHashMap<>();
public boolean registerConsumer(String group, MQConsumerInner consumer) {
if (null == group || null == consumer) {
return false;
}
MQConsumerInner prev = this.consumerTable
.putIfAbsent(group, consumer);
return true;
}
}
Таким образом, в приведенном выше примере вы можете запросить соответствующий объект-потребитель в соответствии с именем ConsumerGroup через объект MQClientInstance. Consumer и MQClientInstance ссылаются друг на друга, образуя циклическую зависимость.
Инициализация и время запуска PullMessageService
До сих пор наш PullMessageService по-прежнему был обычным объектом, потому что поток без start() — это как любовь без материала, просто тарелка рыхлого песка, и ее развеет без ветра.
- Мы всегда знали, что PullMessageService может запрашивать потребителей, но мы не упомянули об этом, когда анализировали исходный код запуска Consumer.
- В то же время, если вы суммируете и обнаружите, что несколько потребителей в одной и той же JVM, похоже, используют один и тот же объект PullMessageService, в противном случае нет необходимости искать.
Глядя на переписку, я чувствую, что PullMessageService должен быть связан с MQClientInstance.
public class MQClientInstance {
private final PullMessageService pullMessageService;
public MQClientInstance(ClientConfig clientConfig,
int instanceIndex, String clientId, RPCHook rpcHook) {
this.rebalanceService = new RebalanceService(this);
}
public void start() throws MQClientException {
synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
this.rebalanceService.start();
break;
default:
break;
}
}
}
}
Конечно же, объект MQClientInstance выполняет метод построения, и объект pullMessageService инициализируется.Во время выполнения метода запуска вызывается метод pullMessageService.start() для запуска потока, извлекающего сообщение.
О производстве
Выше автор представил, кто отвечает за извлечение сообщений, какой механизм используется для их извлечения и как запускается поток извлечения.Однако, поскольку PullMessageService является типичной моделью производства-потребления, мы вводим только процесс потребления.
- Как производственная очередь получает запросы на извлечение сообщений?
- Когда появился объект запроса на включение?
- LinkedBlockingQueue — неограниченная очередь, будет ли риск переполнения памяти?
Этот вопрос задерживается в моей голове один за другим, задерживаясь.
Замечено, что производственная очередь является приватно-модифицированной переменной-членом, и внешний мир не имеет разрешения на доступ к ней, поэтому основная идея состоит в том, чтобы проверить метод работы с этим свойством в PullMessageService, и мы должны получить ответ.
public class PullMessageService extends ServiceThread {
public void executePullRequestLater(PullRequest pullRequest,
long timeDelay) {
if (!isStopped()) {
this.scheduledExecutorService.schedule(
() -> executePullRequestImmediately(pullRequest),
timeDelay,
TimeUnit.MILLISECONDS
);
}
}
public void executePullRequestImmediately(PullRequest pullRequest) {
this.pullRequestQueue.put(pullRequest);
}
}
Вызов этих двух методов будет генерировать объекты запроса на извлечение для очереди, один из которых будет немедленно заполнен в очереди, а другой будет отправлен в ScheduledExecutorService, а логика размещения будет выполнена после небольшой задержки timeDelay.
Отслеживая весь путь, я обнаружил, что все записи вызовов являются методами executePullRequestLater и executePullRequestImmediately DefaultMQPushConsumerImpl.Метод очень прост: получить объект извлечения сообщения через объект mQClientFactory, который хранится сам по себе, а затем вызвать метод постановки в очередь объект.
private void executePullRequestLater(PullRequest pullRequest,
long timeDelay) {
this.mQClientFactory.getPullMessageService()
.executePullRequestLater(pullRequest, timeDelay);
}
public void executePullRequestImmediately(PullRequest pullRequest) {
this.mQClientFactory.getPullMessageService()
.executePullRequestImmediately(pullRequest);
}
Отношения вызова следующие:
Видно, что почти все вызовы существуют в методе pullMessage, и видите ли вы процесс построения экземпляра PullRequest в этом методе, после исключения остается только класс реализации балансировки нагрузки RebalancePushImpl.
Отслеживая весь путь, оказывается, что объект запроса на вытягивание был построен в родительском классе RebalanceImpl для RebalancePushImpl.
Конкретный принцип заключается в том, что RebalanceImpl отвечает за балансировку нагрузки ConsumeQueue, поэтому RebalanceImpl всегда знает, в какие очереди каждый потребитель выделяется в первый раз, поэтому он может создавать объекты PullRequest в соответствии с очередями, выделенными им самим. Логика этой части воплощена в updateProcessQueueTableInRebalance.
public void dispatchPullRequest(List<PullRequest> pullRequestList) {
for (PullRequest pullRequest : pullRequestList) {
this.defaultMQPushConsumerImpl
.executePullRequestImmediately(pullRequest);
}
}
Сконструированный объект PullRequest помещается в очередь через dispatchPullRequest, который фактически является источником получения всех сообщений.
Представленные здесь, были объяснены наиболее важные основные принципы и важные механизмы механизма вытягивания. Далее давайте взглянем на детали реализации после запуска действия извлечения сообщения и делегирования конкретной обработки в DefaultMQPushConsumerImpl#pullMessage(). Я собираюсь начать с трех объектов в качестве точек входа.
Описание трех важных объектов
Давайте сначала посмотрим на состав объекта запроса на вытягивание, наиболее тесно связанного с этой статьей.
public class PullRequest {
/* 消费组 */
private String consumerGroup;
/* 待拉取消费队列 */
private MessageQueue messageQueue;
/* 消息处理队列,从Broker拉取到的消息会存放至此队列,而后提交到消费者消费线程池 */
private ProcessQueue processQueue;
/* 下一次拉取的启始偏移量 */
private long nextOffset;
}
Состав атрибутов относительно прост, в основном для расширения двух других ключевых объектов, MessageQueue и ProcessQueue.
public class MessageQueue implements Comparable<MessageQueue>,
Serializable {
private String topic;
private String brokerName;
private int queueId;
}
public class ProcessQueue {
/* 消息临时存储容器 TreeMap<消息在ConsumerQueue中的偏移量,消息实体> */
private final TreeMap<Long, MessageExt> msgTreeMap = new TreeMap<>();
/* ProcessQueue中消息数量 */
private final AtomicLong msgCount = new AtomicLong();
/* ProcessQueue消息占用内存总和 */
private final AtomicLong msgSize = new AtomicLong();
}
⚠️: ProcessQueue только выдержки из соответствующих свойств этой статьи, На самом деле это намного сложнее, чем это.
Чтобы облегчить объяснение значения и использования двух вышеупомянутых объектов, автор подготовил схематическую диаграмму, которую, я думаю, можно легко объяснить с помощью диаграммы.Функция MessageQueue относительно проста. ta похожа на удостоверение личности. Я могу подтвердить единственную очередь через этот объект. Следующий объект MessageQueue определяет очередь queue0 на рисунке.
{
"topic": "topic",
"brokerName": "broker0",
"queueId": 0
}
Официальное определение ProcessQueue: Моментальный снимок использования очереди, который является моментальным снимком очереди на стороне клиента. На самом деле, я не думаю, что официальное определение очень точное, на мой взгляд, ta — это фрагмент очереди, ta сохраняет некоторые сообщения в очереди, а также сохраняет другую информацию, связанную с потреблением, например, может ли она вызвать управление потоком потребления клиента, будь то ключевая информация, такая как отбрасывание.
DefaultMQPushConsumerImpl#pullMessage
В сочетании с предыдущей статьей мы уже знаем, что объект запроса для извлечения сообщений рождается в соответствующей реализации балансировки нагрузки, а также та, которая ставит в очередь запросы на извлечение, соответствующие каждой очереди, а затем поток блокируется с помощью take(). пробуждается для выполнения операции pullMessage. , метод Потребителя pullMessage помещает каждый запрос PullRequest обратно в очередь в зависимости от ситуации и так далее, и так далее, и продолжает извлекать.
Поскольку существует строгое взаимно-однозначное соответствие между PullRequest и выделенной очередью, нет необходимости слишком заботиться о генерации этого объекта, даже неограниченная очередь вполне подойдет.
Метод pullMessage слишком длинный, извлечем код ключа для анализа:
- Проверьте статус, решите, следует ли отменить или отложить запрос в зависимости от ситуации
public void pullMessage(PullRequest pullRequest) {
/* 通过 pullRequest 获取到 processQueue */
ProcessQueue processQueue = pullRequest.getProcessQueue();
/**
* 由于Consumer节点,或者Broker Queue数量变换,导致负载均衡结果变动
* 有可能之前分布在此 Consumer 节点上的 Queue 会被分配到别处,
* 此时该 Queue 对应的快照 processQueue,本节点无权消费,置于被丢弃状态
*/
if (processQueue.isDropped()) {
/*
* 这里是直接丢弃pullRequest
* 除非下一次负载均衡将该队列又分配回来,否则请求再也没有循环流转的机会
*/
return;
}
/* 检查当前Consumer运行状态是否ServiceState.RUNNING,不是则直接抛出异常 */
try {
this.makeSureStateOK();
} catch(MQClientException e) {
/* 放弃本次,默认延迟3s再次拉取,pullRequest又入队了 */
executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
return;
}
/* 若当前消费者被挂起,则将拉取任务延迟1s放至 PullMessageService 的拉取队列中,结束本次拉取 */
if (this.isPause()) {
/* 同样pullRequest又入队了 */
executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND);
return;
}
}
- Определите, нужно ли инициировать управление потоком. После срабатывания объект запроса снова будет поставлен в очередь после задержки по умолчанию 50 мс.
- Многие условия упакованы в объект запроса RemotingCommand.
- Выполнить сообщение запроса RPC
- Выполните обратный вызов, измените начальное смещение следующего сообщения о вытягивании и решите, следует ли поставить объект PullRequest в очередь немедленно или отложить в соответствии со статусом, чтобы дождаться следующего вытягивания.
в заключении:
Если эта статья была вам полезна, смело ставьте лайк👍