Краткий анализ сообщения RocketMQ «Push» (часть 1)

Java RocketMQ
Краткий анализ сообщения RocketMQ «Push» (часть 1)

Прошлый опыт говорит нам, что для режима push часто требуется длинная связь между брокером и потребителем.После того, как производитель успешно отправит сообщение брокеру, брокер продолжит активно отправлять сообщение потребителю.

Реализация Roket MQ использует другой подход: push-режим ta по сути является оболочкой для pull-режима. Автоматизируйте действие извлечения вручную, передав его выделенному потоку.

Это означает, что в Rocket MQ нет реального режима push.

Начните с PullMessageService

Ключом к автоматизации извлечения сообщений является PullMessageService. Обычно знание класса из отношения наследования ta дает огромную награду, но иногда мы не можем начать из-за сложного отношения наследования.К счастью, система наследования ta относительно проста, что помогает нам выяснить контекст как как можно скорее.image.pngИз диаграммы UML мы можем естественным образом сгенерировать следующие ассоциации:

  1. Реализуйте интерфейс Runnable, возможно, метод запуска является записью для автоматического извлечения сообщений.
  2. Основываясь на первом пункте, мы делаем вывод, что может быть поток, который молча поддерживает работу метода run.
  3. Основываясь на первых двух пунктах и ​​соглашениях об именах, у нас есть основания полагать, что ServiceThread должен быть связан с потоками.
public abstract class ServiceThread implements Runnable {

    /* 内部持有一个线程 */
    private Thread thread;
    /* 线程安全的boolean类型 */
    private final AtomicBoolean started = new AtomicBoolean(false);
    
    public void start() {
        /* started保证了线程只会被启动一次 */
        if (!started.compareAndSet(false, true)) {
            return;
        }
        stopped = false;
        /* getServiceName()抽象方法交由子类实现,用以获取线程名称 */
        this.thread = new Thread(this, getServiceName());
        this.thread.setDaemon(isDaemon);
        this.thread.start();
    }
    
}

Исходный код ServiceThread идеально соответствует нашему предыдущему предположению, поэтому мы можем сместить фокус на метод запуска PullMessageService.

public class PullMessageService extends ServiceThread {

    /* PullRequest队列 */
    private final LinkedBlockingQueue<PullRequest> pullRequestQueue 
        = new LinkedBlockingQueue<>();
    
    @Override
    public void run() {
        /* 每次拉取消息都要检测自身状态,改变stopped的状态可以停止拉取消息逻辑 */
        while (!this.isStopped()) {
            /* pullRequestQueue 队列为空会阻塞 */
            PullRequest pullRequest = this.pullRequestQueue.take();
            /* 进行消息拉取 */
            this.pullMessage(pullRequest);
        }
    }
}

Очевидно, что это модель производитель-потребитель. Когда в очереди pullRequestQueue есть объект PullRequest, метод run извлечет объект и выполнит логику извлечения.Если очередь производителя пуста, заблокируйте очередь и приостановите поведение потребления. Детали извлечения сообщений до сих пор не были известны, давайте взглянем на метод pullMessage.

public class PullMessageService extends ServiceThread {

    private final MQClientInstance mQClientFactory;
    
    public PullMessageService(MQClientInstance mQClientFactory) {
        this.mQClientFactory = mQClientFactory;
    }

    private void pullMessage(PullRequest pullRequest) {
        MQConsumerInner consumer = this.mQClientFactory
            .selectConsumer(pullRequest.getConsumerGroup());
        
        DefaultMQPushConsumerImpl impl = 
            (DefaultMQPushConsumerImpl) consumer;
            
        impl.pullMessage(pullRequest);
    }

}

Другими словами, объект mQClientFactory должен передаваться, когда PullMessageService создает экземпляр объекта. Через MQClientInstance#selectConsumer() можно запросить объект-потребитель, соответствующий текущему запросу pullRequest, а затем выполнить логику принудительной передачи.Прямая принудительная передача здесь связана с тем, что объект PullMessageService будет использоваться только в режиме push. Затем делегируйте логику получения сообщений DefaultMQPushConsumerImp#pullMessage().

Слишком много незнакомых объектов сыпется в этот момент.Если автор не объяснит это, боюсь, читателям будет сложно это принять.Просто проанализируйте класс MQClientInstance, соответствующий объекту mQClientFactory.

MQClientInstance

Во-первых, давайте посмотрим, как рождается объект MQClientInstance.При запуске каждого Consumer переменная-член mQClientFactor будет заполнена, но MQClientInstance спроектирован как одноэлементный режим. Все потребители и производители в JVM имеют один и тот же экземпляр MQClientInstance, и потребитель в конце зарегистрируется в mQClientFactory.

public class DefaultMQPushConsumerImpl implements MQConsumerInner {

    private MQClientInstance mQClientFactory;

    public synchronized void start() throws MQClientException {
        switch (this.serviceState) {
            case CREATE_JUST: {
                /* 初始化 MQClientInstance */
                mQClientFactory = MQClientManager.getInstance()
                    .getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
                mQClientFactory.registerConsumer(
                    this.defaultMQPushConsumer.getConsumerGroup(), this
                );
                /* 虽然每个Consumer都调用此方法,但只启动一次 */
                mQClientFactory.start()
            }
            default:
                break;
        }
    }

}

Короче говоря, это информация о потребителе переменной-члена ConsumerTable сопровождающего экземпляра MQClientInstance, ключ — это имя ConsumerGroup, а значение — это сам потребитель.

public class MQClientInstance { 
    
    private final ConcurrentMap<String, MQConsumerInner> consumerTable 
        = new ConcurrentHashMap<>();
        
    public boolean registerConsumer(String group, MQConsumerInner consumer) {
        if (null == group || null == consumer) {
            return false;
        }

        MQConsumerInner prev = this.consumerTable
            .putIfAbsent(group, consumer);
        return true;
    }

}

Таким образом, в приведенном выше примере вы можете запросить соответствующий объект-потребитель в соответствии с именем ConsumerGroup через объект MQClientInstance. Consumer и MQClientInstance ссылаются друг на друга, образуя циклическую зависимость.image.png

Инициализация и время запуска PullMessageService

До сих пор наш PullMessageService по-прежнему был обычным объектом, потому что поток без start() — это как любовь без материала, просто тарелка рыхлого песка, и ее развеет без ветра.

  • Мы всегда знали, что PullMessageService может запрашивать потребителей, но мы не упомянули об этом, когда анализировали исходный код запуска Consumer.
  • В то же время, если вы суммируете и обнаружите, что несколько потребителей в одной и той же JVM, похоже, используют один и тот же объект PullMessageService, в противном случае нет необходимости искать.

Глядя на переписку, я чувствую, что PullMessageService должен быть связан с MQClientInstance.

public class MQClientInstance {
   
    private final PullMessageService pullMessageService;
    
    public MQClientInstance(ClientConfig clientConfig, 
        int instanceIndex, String clientId, RPCHook rpcHook) {
        this.rebalanceService = new RebalanceService(this);
    }
    
   
    public void start() throws MQClientException {
        synchronized (this) {
            switch (this.serviceState) {
                case CREATE_JUST:
                    this.rebalanceService.start();
                    break;
                default:
                    break;
            }
        }
    }

}

Конечно же, объект MQClientInstance выполняет метод построения, и объект pullMessageService инициализируется.Во время выполнения метода запуска вызывается метод pullMessageService.start() для запуска потока, извлекающего сообщение.

О производстве

Выше автор представил, кто отвечает за извлечение сообщений, какой механизм используется для их извлечения и как запускается поток извлечения.Однако, поскольку PullMessageService является типичной моделью производства-потребления, мы вводим только процесс потребления.

  • Как производственная очередь получает запросы на извлечение сообщений?
  • Когда появился объект запроса на включение?
  • LinkedBlockingQueue — неограниченная очередь, будет ли риск переполнения памяти?

Этот вопрос задерживается в моей голове один за другим, задерживаясь.

Замечено, что производственная очередь является приватно-модифицированной переменной-членом, и внешний мир не имеет разрешения на доступ к ней, поэтому основная идея состоит в том, чтобы проверить метод работы с этим свойством в PullMessageService, и мы должны получить ответ.

public class PullMessageService extends ServiceThread {

    public void executePullRequestLater(PullRequest pullRequest, 
        long timeDelay) {
        if (!isStopped()) {
            this.scheduledExecutorService.schedule(
                () -> executePullRequestImmediately(pullRequest),
                timeDelay,
                TimeUnit.MILLISECONDS
            );
        }
    }

    public void executePullRequestImmediately(PullRequest pullRequest) {
        this.pullRequestQueue.put(pullRequest);  
    }
    
}

Вызов этих двух методов будет генерировать объекты запроса на извлечение для очереди, один из которых будет немедленно заполнен в очереди, а другой будет отправлен в ScheduledExecutorService, а логика размещения будет выполнена после небольшой задержки timeDelay.

Отслеживая весь путь, я обнаружил, что все записи вызовов являются методами executePullRequestLater и executePullRequestImmediately DefaultMQPushConsumerImpl.Метод очень прост: получить объект извлечения сообщения через объект mQClientFactory, который хранится сам по себе, а затем вызвать метод постановки в очередь объект.

private void executePullRequestLater(PullRequest pullRequest, 
    long timeDelay) {
    this.mQClientFactory.getPullMessageService()
        .executePullRequestLater(pullRequest, timeDelay);
}

public void executePullRequestImmediately(PullRequest pullRequest) {
    this.mQClientFactory.getPullMessageService()
        .executePullRequestImmediately(pullRequest);
}

Отношения вызова следующие:

image.png image.pngВидно, что почти все вызовы существуют в методе pullMessage, и видите ли вы процесс построения экземпляра PullRequest в этом методе, после исключения остается только класс реализации балансировки нагрузки RebalancePushImpl.

Отслеживая весь путь, оказывается, что объект запроса на вытягивание был построен в родительском классе RebalanceImpl для RebalancePushImpl.

Конкретный принцип заключается в том, что RebalanceImpl отвечает за балансировку нагрузки ConsumeQueue, поэтому RebalanceImpl всегда знает, в какие очереди каждый потребитель выделяется в первый раз, поэтому он может создавать объекты PullRequest в соответствии с очередями, выделенными им самим. Логика этой части воплощена в updateProcessQueueTableInRebalance.

public void dispatchPullRequest(List<PullRequest> pullRequestList) {
    for (PullRequest pullRequest : pullRequestList) {
        this.defaultMQPushConsumerImpl
            .executePullRequestImmediately(pullRequest);
    }
}

      Сконструированный объект PullRequest помещается в очередь через dispatchPullRequest, который фактически является источником получения всех сообщений.

Представленные здесь, были объяснены наиболее важные основные принципы и важные механизмы механизма вытягивания. Далее давайте взглянем на детали реализации после запуска действия извлечения сообщения и делегирования конкретной обработки в DefaultMQPushConsumerImpl#pullMessage(). Я собираюсь начать с трех объектов в качестве точек входа.

Описание трех важных объектов

Давайте сначала посмотрим на состав объекта запроса на вытягивание, наиболее тесно связанного с этой статьей.

public class PullRequest {

    /* 消费组 */
    private String consumerGroup;

    /* 待拉取消费队列 */
    private MessageQueue messageQueue;

    /* 消息处理队列,从Broker拉取到的消息会存放至此队列,而后提交到消费者消费线程池 */
    private ProcessQueue processQueue;

    /* 下一次拉取的启始偏移量 */
    private long nextOffset;
    
}

Состав атрибутов относительно прост, в основном для расширения двух других ключевых объектов, MessageQueue и ProcessQueue.

public class MessageQueue implements Comparable<MessageQueue>, 
    Serializable {

    private String topic;
    private String brokerName;
    private int queueId;
    
}
public class ProcessQueue {

    /* 消息临时存储容器 TreeMap<消息在ConsumerQueue中的偏移量,消息实体> */
    private final TreeMap<Long, MessageExt> msgTreeMap = new TreeMap<>();
    /* ProcessQueue中消息数量 */
    private final AtomicLong msgCount = new AtomicLong();
    /* ProcessQueue消息占用内存总和 */
    private final AtomicLong msgSize = new AtomicLong();

}

⚠️: ProcessQueue только выдержки из соответствующих свойств этой статьи, На самом деле это намного сложнее, чем это.

Чтобы облегчить объяснение значения и использования двух вышеупомянутых объектов, автор подготовил схематическую диаграмму, которую, я думаю, можно легко объяснить с помощью диаграммы.image.pngФункция MessageQueue относительно проста. ta похожа на удостоверение личности. Я могу подтвердить единственную очередь через этот объект. Следующий объект MessageQueue определяет очередь queue0 на рисунке.

{
    "topic": "topic",
    "brokerName": "broker0",
    "queueId": 0
}

Официальное определение ProcessQueue: Моментальный снимок использования очереди, который является моментальным снимком очереди на стороне клиента. На самом деле, я не думаю, что официальное определение очень точное, на мой взгляд, ta — это фрагмент очереди, ta сохраняет некоторые сообщения в очереди, а также сохраняет другую информацию, связанную с потреблением, например, может ли она вызвать управление потоком потребления клиента, будь то ключевая информация, такая как отбрасывание.

DefaultMQPushConsumerImpl#pullMessage

В сочетании с предыдущей статьей мы уже знаем, что объект запроса для извлечения сообщений рождается в соответствующей реализации балансировки нагрузки, а также та, которая ставит в очередь запросы на извлечение, соответствующие каждой очереди, а затем поток блокируется с помощью take(). пробуждается для выполнения операции pullMessage. , метод Потребителя pullMessage помещает каждый запрос PullRequest обратно в очередь в зависимости от ситуации и так далее, и так далее, и продолжает извлекать.

Поскольку существует строгое взаимно-однозначное соответствие между PullRequest и выделенной очередью, нет необходимости слишком заботиться о генерации этого объекта, даже неограниченная очередь вполне подойдет.

Метод pullMessage слишком длинный, извлечем код ключа для анализа:

  1. Проверьте статус, решите, следует ли отменить или отложить запрос в зависимости от ситуации
public void pullMessage(PullRequest pullRequest) {
    /* 通过 pullRequest 获取到 processQueue */
    ProcessQueue processQueue = pullRequest.getProcessQueue();
    /**
     * 由于Consumer节点,或者Broker Queue数量变换,导致负载均衡结果变动
     * 有可能之前分布在此 Consumer 节点上的 Queue 会被分配到别处,
     * 此时该 Queue 对应的快照 processQueue,本节点无权消费,置于被丢弃状态
     */
    if (processQueue.isDropped()) {
        /* 
         * 这里是直接丢弃pullRequest
         * 除非下一次负载均衡将该队列又分配回来,否则请求再也没有循环流转的机会
         */
        return;
    }

    /* 检查当前Consumer运行状态是否ServiceState.RUNNING,不是则直接抛出异常 */
    try {
        this.makeSureStateOK();
    } catch(MQClientException e) {
        /* 放弃本次,默认延迟3s再次拉取,pullRequest又入队了 */
        executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
        return;
    }
    
    /* 若当前消费者被挂起,则将拉取任务延迟1s放至 PullMessageService 的拉取队列中,结束本次拉取 */
    if (this.isPause()) {
        /* 同样pullRequest又入队了 */
        executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND);
        return;
    }
}
  1. Определите, нужно ли инициировать управление потоком. После срабатывания объект запроса снова будет поставлен в очередь после задержки по умолчанию 50 мс.
  2. Многие условия упакованы в объект запроса RemotingCommand.
  3. Выполнить сообщение запроса RPC
  4. Выполните обратный вызов, измените начальное смещение следующего сообщения о вытягивании и решите, следует ли поставить объект PullRequest в очередь немедленно или отложить в соответствии со статусом, чтобы дождаться следующего вытягивания.

в заключении:image.png

Если эта статья была вам полезна, смело ставьте лайк👍