Глубокое понимание использования, принципа и оптимизации обычных и последовательных сообщений RocketMq.

Java

1. Предпосылки

Недавно я проводил тесты давления в системе и оптимизировал некоторые проблемы. Я получил хороший опыт оптимизации. Последующие статьи будут посвящены этому аспекту.

Самый большой выигрыш в процессе этого подавления — это некоторые оптимизации RocketMq. Сначала наша компания использовала RabbitMq, но в некоторых сценариях пикового трафика было обнаружено, что накопление очереди было серьезным, что приводило к зависанию RabbitMq. Чтобы справиться с этим сценарием, мы, наконец, представили RocketMq из Alibaba Cloud, RocketMq может обрабатывать большое количество сообщений, а стабильность службы также может быть гарантирована Alibaba Cloud. После внедрения RocketMq действительно была решена проблема простоя очереди сообщений, вызванная накоплением очереди.

Я изначально думал, что после использования RocketMq все будет без забот, но на самом деле в процессе подавления обнаружилось много проблем.Вот несколько вопросов, и каждый возьмет эти вопросы, чтобы найти ответы в тексте:

  1. В RocketMq, если очередь сообщений накопится, что будет с потребителем?
  2. Есть ли в RocketMq способ улучшить скорость потребления сообщений для обычных и последовательных сообщений?
  3. Как установить более разумное количество повторных попыток сообщения об ошибке? Отличаются ли последовательные сообщения от обычных сообщений?

2. Обычное сообщение VS последовательное сообщение

Затем RocketMq предоставляет нам различные типы сообщений для настройки:

  • Обычное сообщение: Сообщение без специальной функции.
  • Сообщения, упорядоченные по секциям: сообщения, которые потребляются в широте секции, сохраняющей порядок.
  • Сообщение глобального заказа: глобальное сообщение по заказу можно рассматривать как только один раздел, и он всегда потребляется в том же разделе.
  • Сообщения по времени/отложенные: сообщения могут быть задержаны на определенный период времени для потребления.
  • Сообщение транзакции: для двухфазных сообщений транзакции сначала подготовьте и доставьте сообщение. В это время потребление сообщения не может быть выполнено. Когда фиксация или откат выдаются на втором этапе, будет выполняться потребление или откат сообщения.

Хотя существует много видов конфигураций, по-прежнему используются общие сообщения и сообщения о порядке разделов. В последующем в основном говорится об этих двух видах новостей.

2.1 Отправка сообщения

2.1.1 Общее сообщение

Код для отправки обычного сообщения относительно прост:

    public static void main(String[] args) throws MQClientException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("test_group_producer");
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();
        
        Message msg =
                new Message("Test_Topic", "test_tag", ("Hello World").getBytes(RemotingHelper.DEFAULT_CHARSET));
        SendResult sendResult = producer.send(msg);
        System.out.printf("%s%n", sendResult);
        producer.shutdown();
    }

Его внутренний основной код:

    private SendResult sendDefaultImpl(Message msg, final CommunicationMode communicationMode, final SendCallback sendCallback, final long timeout
    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        // 1. 根据 topic找到publishInfo
        TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
        if (topicPublishInfo != null && topicPublishInfo.ok()) {
            boolean callTimeout = false;
            MessageQueue mq = null;
            Exception exception = null;
            SendResult sendResult = null;
            // 如果是同步 就三次 否则就1次
            int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
            int times = 0;
            String[] brokersSent = new String[timesTotal];
            // 循环
            for (; times < timesTotal; times++) {
                String lastBrokerName = null == mq ? null : mq.getBrokerName();
                MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
                if (mqSelected != null) {
                    mq = mqSelected;
                    brokersSent[times] = mq.getBrokerName();
                    try {
                        beginTimestampPrev = System.currentTimeMillis();
                        if (times > 0) {
                            //Reset topic with namespace during resend.
                            msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
                        }
                        long costTime = beginTimestampPrev - beginTimestampFirst;
                        if (timeout < costTime) {
                            callTimeout = true;
                            break;
                        }

                        sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
                        endTimestamp = System.currentTimeMillis();
                        // 更新延迟
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                        switch (communicationMode) {
                            case ASYNC:
                                return null;
                            case ONEWAY:
                                return null;
                            case SYNC:
                                if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                                    if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
                                        continue;
                                    }
                                }

                                return sendResult;
                            default:
                                break;
                        }
                    } 
                } else {
                    break;
                }
            }
        // 省略
            
    }

Основной процесс выглядит следующим образом:

  • Шаг 1: Получите TopicPublishInfo в соответствии с темой. TopicPublishInfo содержит информацию об опубликованных сообщениях нашей темы (). Эти данные сначала получают локально. Если они недоступны локально, они будут извлекаться из NameServer, а TopicPublishInfo будет регулярно получаться каждый 20 с.

  • Шаг 2: Получите общее количество выполнений (для повторных попыток).Если метод отправки синхронный, всего будет 3 раза, а в остальных случаях только 1.

  • Шаг 3: Выберите один из MessageQueue для отправки.Концепция MessageQueue может быть эквивалентна разделу раздела Kafka, который рассматривается как минимальная степень детализации отправки сообщений. Есть два способа выбрать это:

    • Выберите в соответствии с задержкой отправки.Если брокер, отправленный в последний раз, доступен, выберите его из текущего цикла обхода выбора брокера.Если он недоступен, вам нужно выбрать брокера с наименьшей задержкой и выбрать MessageQueue из текущего брокера.
    • Выберите MessageQueue с помощью циклического обучения.
  • Шаг 4: Отправка сообщения в Брокер на выбранной MessageQueue.

  • ШАГ 5: Обновите задержку брокера.

  • Шаг 6: Обработайте результат в соответствии с различными способами отправки:

    • Асинхронный: отправляйте асинхронно и заботьтесь о результате через callBack, поэтому здесь он не обрабатывается.
    • OneWay: как следует из названия, это односторонняя отправка, ее нужно только отправить брокеру, и вам не нужно заботиться о результате, здесь даже не нужен обратный вызов.
    • Синхронизация: отправьте синхронно, вам нужно позаботиться о результате, решить, нужно ли вам повторить попытку в соответствии с результатом, а затем вернуться к шагу 3.

Как видно, процесс отправки обычных сообщений Rocketmq относительно прост и понятен Давайте рассмотрим последовательные сообщения.

2.1.2 Последовательное сообщение

Сообщения о последовательности делятся на сообщения о последовательности раздела и сообщения о глобальной последовательности. Сообщения о глобальной последовательности легче понять, то есть, какое сообщение поступает первым, какое сообщение будет обработано первым, что соответствует нашему FIFO. Во многих случаях стоимость внедрения глобальные сообщения очень высоки, поэтому появляется сообщение о порядке разделов. Концепция разделенных последовательных сообщений может быть показана на следующей диаграмме:

Мы хешируем ключ сообщения, и сообщения с таким же хешем будут назначены одному и тому же разделу.Конечно, если мы хотим сделать глобальное последовательное сообщение, нам нужен только один раздел, поэтому стоимость глобального последовательного сообщения относительно большой. .

Друзья, знакомые с RocketMq, обнаружат, что на самом деле он не предоставляет API, связанный с последовательной отправкой сообщений, но версия Alibaba Cloud для RocketMq предоставляет API для последовательных сообщений.Принцип относительно прост, и на самом деле это инкапсуляция существующий API:

SendResult sendResultRMQ = this.defaultMQProducer.send(msgRMQ, new MessageQueueSelector() {
                @Override
                public MessageQueue select(List<MessageQueue> mqs, Message msg,
                    Object shardingKey) {
                    int select = Math.abs(shardingKey.hashCode());
                    if (select < 0) {
                        select = 0;
                    }
                    return mqs.get(select % mqs.size());
                }
            }, shardingKey);

Видно, что последовательные сообщения оставляют выбор MessageQueue нашему отправителю, поэтому мы напрямую используем hashCode нашего shardingKey для отправки разделов.

3.1 Использование сообщений

3.1.1 Общее сообщение

Общие сообщения относительно просты в использовании, как показано в следующем коде:

    public static void main(String[] args) throws InterruptedException, MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("Test_Consumer");
        consumer.subscribe("TopicTest", "*");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.setConsumeThreadMin(10);
        consumer.setConsumeThreadMax(10);
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
  • Шаг 1: Сначала создайте DefaultMQPushConsumer и зарегистрируйте соответствующую информацию Topic и NameServer.
  • Шаг 2: Зарегистрируйте прослушиватель сообщений.В RocketMq есть два прослушивателя сообщений, один — MessageListenerConcurrently, который используется для одновременного потребления обычных сообщений, а другой — MessageListenerOrderly, который используется для наших последовательных сообщений. Здесь мы используем MessageListenerConcurrently.
  • Шаг 3: Установите размер ConsumeThread, чтобы управлять нашим пулом потоков для его использования.
  • Шаг 4: Запустите Потребитель.

После запуска Потребителя мы начинаем потреблять из Брокера, но как мы потребляем из Брокера? Прежде всего, на нашем первом этапе мы подписываемся на тему, мы будем регулярно обновлять информацию, связанную с темой, такую ​​как изменения в MessageQueue, а затем назначать соответствующую MessageQueue текущему потребителю:

                // 这个数据 是10s更新一次 从内存中获取
                Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
                // 这个数据实时去拉取
                List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
                if (null == mqSet) {
                    if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                        log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
                    }
                }

                if (null == cidAll) {
                    log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);
                }

                if (mqSet != null && cidAll != null) {
                    List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
                    mqAll.addAll(mqSet);

                    Collections.sort(mqAll);
                    Collections.sort(cidAll);

                    AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;

                    List<MessageQueue> allocateResult = null;
                    try {
                    //通过默认的分配策略进行分配
                        allocateResult =
                                strategy.allocate(this.consumerGroup, this.mQClientFactory.getClientId(), mqAll,
                                        cidAll);
                    } catch (Throwable e) {
                        log.error(
                                "AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}",
                                strategy.getName(), e);
                        return;
                    }

                    Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
                    if (allocateResult != null) {
                        allocateResultSet.addAll(allocateResult);
                    }

                    boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);

Здесь сначала получите ConsumerId всего текущего потребления от Брокера. По умолчанию используется IP + имя экземпляра соответствующей машины. Информация о ConsumerId в Брокере сообщается Потребителем через время сердцебиения, а затем в соответствии с потреблением стратегия распределения Сообщение выделяется Потребителю, значение по умолчанию здесь равно Равномерно распределять, записывать очереди сообщений, которым мы назначены, в В processQueueTable, если есть новое дополнение, нам нужно создать PullRequest для представления запроса на получение сообщения и его асинхронную обработку:

        List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
        for (MessageQueue mq : mqSet) {
            if (!this.processQueueTable.containsKey(mq)) {
                if (isOrder && !this.lock(mq)) {
                    log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
                    continue;
                }

                this.removeDirtyOffset(mq);
                ProcessQueue pq = new ProcessQueue();
                // 这里就是获取我们第一次应该拿什么offset
                long nextOffset = this.computePullFromWhere(mq);
                if (nextOffset >= 0) {
                    ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
                    if (pre != null) {
                        log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
                    } else {
                        log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
                        PullRequest pullRequest = new PullRequest();
                        pullRequest.setConsumerGroup(consumerGroup);
                        pullRequest.setNextOffset(nextOffset);
                        pullRequest.setMessageQueue(mq);
                        pullRequest.setProcessQueue(pq);
                        pullRequestList.add(pullRequest);
                        changed = true;
                    }
                } else {
                    log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
                }
            }
        }

        this.dispatchPullRequest(pullRequestList);

В PullService данные будут непрерывно извлекаться из PullRequestQueue, а затем данные будут извлекаться.

        while (!this.isStopped()) {
            try {
                // rebalance 之后第一次向这个队列放数据 后续消费的时候会继续放
                PullRequest pullRequest = this.pullRequestQueue.take();
                this.pullMessage(pullRequest);
            } catch (InterruptedException ignored) {
            } catch (Exception e) {
                log.error("Pull Message Service Run Method exception", e);
            }
        }

После извлечения данных это ответит на PullCallBack:

PullCallback pullCallback = new PullCallback() {
            @Override
            public void onSuccess(PullResult pullResult) {
                if (pullResult != null) {
                    pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
                        subscriptionData);

                    switch (pullResult.getPullStatus()) {
                        case FOUND:
                                firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
                                boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
                                DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
                                    pullResult.getMsgFoundList(),
                                    processQueue,
                                    pullRequest.getMessageQueue(),
                                    dispatchToConsume);

                                if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
                                    DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
                                        DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
                                } else {
                                    DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                                }
                            

                            

Если сообщение успешно извлечено здесь, мы сначала сохраняем извлеченное сообщение в нашу ProcessQueue, которая используется для статуса обработки нашего потребителя и ожидающих сообщений, а затем отправляем его в наш пул потоков Consumer для реального потребления бизнес-логики, а затем отправить PullRequest для нашего следующего потребления.

Вы видели, что этот режим похож на однопоточный аккпет в нашей сети, который обрабатывает бизнес-логику несколькими потоками, и принцип тот же: он непрерывно тянется одним потоком, а затем определяется нашим бизнесом. пул для обработки. Как показано ниже:

Мы обнаружили, что процесс извлечения сообщений на самом деле является циклическим процессом. Здесь возникает первая проблема. Если скорость потребления очереди сообщений не может соответствовать скорости отправки сообщений, будет происходить накопление сообщений. Многие студенты могут увидеть возможность в соответствии с Мы могли бы подумать, что наши сообщения о вытягивании продолжаются, потому что скорость нашего потребления относительно низкая, в нашей памяти будет много сообщений в виде очередей, из-за чего наша JVM будет выглядеть как OOM, то есть переполнение памяти.

Так появится ли ООМ? На самом деле нет, RocketMq проделал хорошую работу по обеспечению безопасности со следующими двумя фрагментами кода:

        if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
            System.out.println(cachedMessageCount + ":"+pullRequest);
            this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
            return;
        }

        if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {
            this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
            return;
        }

Во-первых, нужно определить, превышает ли количество сообщений в текущем кеше памяти ограничение.Значение по умолчанию — 1000. Если оно больше, запрос на вытягивание будет отправлен снова после задержки на определенный период времени. Затем оцените, превышает ли размер текущего кеша памяти определенное значение, по умолчанию — 100 МБ. Если он больше, повторная отправка запроса на вытягивание будет задержана на некоторое время. Поэтому, если на нашего потребителя происходит накопление сообщений, это в основном не имеет никакого эффекта.

Итак, давайте подумаем, как решить вторую проблему? В сценарии обычных сообщений как увеличить скорость потребления?

  • Прежде всего, мы должны улучшить нашу собственную скорость обработки, поскольку скорость обработки увеличивается, скорость потребления, естественно, увеличивается.
  • Во-вторых, настроить пул потребительских потоков разумного размера. Если он слишком мал, ресурсы машины не будут использоваться полностью. Если он слишком велик, переключение контекста потока может быть очень быстрым. говоря, это оценивается в соответствии с бизнесом потребителя.Если это процессор, хорошо установить размер процессора для интенсивных потоков.Если это интенсивный ввод-вывод, установите удвоенный размер процессора.
  • Другим является MessageQueue.Внимательные студенты должны выделить MessageQueue для получения и потребления, прежде чем они увидят, что наши потребители потребляют сообщения выше, поэтому естественно думать, что если мы выделим немного больше MessageQueue, это ускорит нашу скорость потребления. MessageQueue мало помогает для нашего обычного улучшения потребления сообщений, потому что все запросы на потребление будут отправлены в пул потоков для потребления, и никакое количество MessageQueue не поможет, если только у нас много компьютеров-потребителей, количество MessageQueues меньше чем Когда используется машина-потребитель, добавление MessageQueue в это время будет иметь эффект улучшения.Это так называемый пусть наша машина дождь и роса равномерно.

3.1.1.1 Обработка результатов обычного потребления сообщений

Обработка результатов потребления сообщений в RocketMQ также более важна.Вот три вопроса:

  • Как наши обычные сообщения обрабатывают результат?
  • Что делать, если потребление не работает?
  • Когда потребляются обычные сообщения, они обрабатываются одновременно.Если сообщения с более поздними смещениями потребляются первыми, но сообщения с более ранними смещениями не потребляются, в это время происходит простой, а сообщения с более ранними смещениями обрабатываются. не потребляется. Будут ли некоторые данные потеряны? То есть следующее потребление начнется со смещения, которое не было потреблено? Если нет, то как RocketMQ это делает?

Для начала разберемся с первым вопросом, как работать с результатом потребления, в processResult есть следующий код:

public void processConsumeResult(
        final ConsumeConcurrentlyStatus status,
        final ConsumeConcurrentlyContext context,
        final ConsumeRequest consumeRequest
    ) {
        int ackIndex = context.getAckIndex();
        switch (status) {
            case CONSUME_SUCCESS:
                int ok = ackIndex + 1;
                int failed = consumeRequest.getMsgs().size() - ok;
                this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);
                this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);
                break;
            case RECONSUME_LATER:
                ackIndex = -1;
                this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),
                    consumeRequest.getMsgs().size());
                break;
            default:
                break;
        }

        switch (this.defaultMQPushConsumer.getMessageModel()) {
            case BROADCASTING:
                for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
                    MessageExt msg = consumeRequest.getMsgs().get(i);
                    log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
                }
                break;
            case CLUSTERING:
                List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
                for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
                    MessageExt msg = consumeRequest.getMsgs().get(i);
                    boolean result = this.sendMessageBack(msg, context);
                    if (!result) {
                        msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
                        msgBackFailed.add(msg);
                    }
                }

                if (!msgBackFailed.isEmpty()) {
                    consumeRequest.getMsgs().removeAll(msgBackFailed);

                    this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
                }
                break;
            default:
                break;
        }

        long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
        if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
            this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
        }
    }
  • Шаг 1: Сначала получите ackIndex, то есть количество успешных подтверждений.По умолчанию установлено максимальное количество целых чисел, что означает все успехи.
  • Шаг 2: Получите ConsumeConcurrentlyStatus и обработайте его в соответствии с разными состояниями.Существует два ConsumeConcurrentlyStatus:
    • CONSUME_SUCCESS: представляет успешное потребление, записывает успешные TPS и неудачные TPS.
    • RECONSUME_LATER: указывает на необходимость повторного использования. Как правило, это состояние возвращается после сбоя, а неудачный TPS регистрируется.
  • Шаг 3: Затем, согласно типу сообщения, выполняется разные логические повторные попытки. Есть два типа потребления сообщений:
    • Вещание: расходы на вещание, вещание не попробует еще раз, здесь будет напрямую ударить в журнал предупреждения и отбросить его.
    • КЛАСТЕРИЗАЦИЯ: Использование кластера. Неудачное сообщение будет отправлено обратно в текущую тему в первую очередь. Если отправка не удалась, локальная обработка будет продолжена. Если в Брокере будет обнаружено, что количество повторных попыток для этого сообщения достигло верхнего предела, сообщение будет отправлено в RetryTopic, которое затем будет отправлено RetryTopic в очередь недоставленных сообщений.
  • Шаг 4: Получите смещение сообщения и обновите текущий прогресс потребления

На четвертом шаге выше, если не углубляться во внутреннюю логику, вы ошибочно подумаете, что он обновит смещение текущего сообщения до последнего прогресса потребления, тогда среднее смещение, упомянутое в вопросе 3, может быть потеряно. , но на самом деле этого не происходит.Конкретная логика гарантируется в removeMessage:

    public long removeMessage(final List<MessageExt> msgs) {
        long result = -1;
        final long now = System.currentTimeMillis();
        try {
            this.lockTreeMap.writeLock().lockInterruptibly();
            this.lastConsumeTimestamp = now;
            try {
                if (!msgTreeMap.isEmpty()) {
                    result = this.queueOffsetMax + 1;
                    int removedCnt = 0;
                    for (MessageExt msg : msgs) {
                        MessageExt prev = msgTreeMap.remove(msg.getQueueOffset());
                        if (prev != null) {
                            removedCnt--;
                            msgSize.addAndGet(0 - msg.getBody().length);
                        }
                    }
                    msgCount.addAndGet(removedCnt);

                    if (!msgTreeMap.isEmpty()) {
                        result = msgTreeMap.firstKey();
                    }
                }
            } finally {
                this.lockTreeMap.writeLock().unlock();
            }
        } catch (Throwable t) {
            log.error("removeMessage exception", t);
        }
        return result;
    }

В removeMessage гарантия делается через msgTreeMap.msgTreeMap это TreeMap, который отсортирован в порядке возрастания по смещению.Если есть значение в treeMap, то возвращаемое им смещение будет firstKey в текущем msgTreeMap вместо текущего смещения , тем самым решая проблему Вопрос третий.

Вышеупомянутый процесс резюмируется, как показано на следующем рисунке:

3.1.2 Сообщения последовательности

Передний процесс последовательного потребления сообщений в основном так же, как и у обычных сообщений. Что нам нужно сосредоточиться на здесь, является логика после того, как сообщение брошено в наш пул потребительского потока:

            final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
            synchronized (objLock) {
                // 省略
                List<MessageExt> msgs = this.processQueue.takeMessags(consumeBatchSize);
                status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
                // 省略
            }

Можно обнаружить, что здесь есть еще один шаг, чем обычные сообщения, то есть блокировка, Здесь мы получим блокировку с messageQueue в качестве широты, а затем перейдем к нашей processQueue, чтобы получить наше сообщение, которое также получается с помощью наш msgTreeMap.Message с минимальным смещением.

Итак, наша предыдущая стратегия повышения скорости параллелизма пула потоков здесь бесполезна, так что же нам делать? Поскольку наша блокировка основана на очереди сообщений в качестве широты, было бы хорошо увеличить очередь сообщений, поэтому увеличение скорости потребления здесь прямо противоположно скорости обычного сообщения.Улучшение очереди сообщений в обычном сообщении может быть неэффективным. так здорово, но улучшено потребление последовательных сообщений.Это огромно.

Во время стресс-теста мы обнаружили, что последовательное потребление сообщений очень медленное, а накопление сообщений очень серьезное.После отладки мы обнаружили, что очередь чтения и записи по умолчанию для RocketMQ в облаке Alibaba составляет 16, у нас есть 10 машин-потребителей и размер каждого пула потребительских потоков равен 10. Теоретический параллелизм должен быть 100, но из-за последовательного сообщения фактический параллелизм составляет только 16. Наконец, я попросил технический персонал Alibaba увеличить очередь чтения и записи до 100, что делает полное использование наших ресурсов и значительно увеличивает скорость последовательного потребления сообщений.Сообщения в основном больше не накапливаются.

3.1.2.1 Последовательная обработка результатов потребления сообщений

Результат последовательного сообщения обрабатывается, и поток обработки обычных сообщений, немного отличается, код выглядит следующим образом:

public boolean processConsumeResult(
        final List<MessageExt> msgs,
        final ConsumeOrderlyStatus status,
        final ConsumeOrderlyContext context,
        final ConsumeRequest consumeRequest
    ) {
        boolean continueConsume = true;
        long commitOffset = -1L;
        if (context.isAutoCommit()) {
            switch (status) {
                case SUCCESS:
                    commitOffset = consumeRequest.getProcessQueue().commit();
                    this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
                    break;
                case SUSPEND_CURRENT_QUEUE_A_MOMENT:
                    this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
                    if (checkReconsumeTimes(msgs)) {
                        consumeRequest.getProcessQueue().makeMessageToCosumeAgain(msgs);
                        this.submitConsumeRequestLater(
                            consumeRequest.getProcessQueue(),
                            consumeRequest.getMessageQueue(),
                            context.getSuspendCurrentQueueTimeMillis());
                        continueConsume = false;
                    } else {
                        commitOffset = consumeRequest.getProcessQueue().commit();
                    }
                    break;
                default:
                    break;
            }
        } else {
            switch (status) {
                case SUCCESS:
                    this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
                    break;
                case SUSPEND_CURRENT_QUEUE_A_MOMENT:
                    this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
                    if (checkReconsumeTimes(msgs)) {
                        consumeRequest.getProcessQueue().makeMessageToCosumeAgain(msgs);
                        this.submitConsumeRequestLater(
                            consumeRequest.getProcessQueue(),
                            consumeRequest.getMessageQueue(),
                            context.getSuspendCurrentQueueTimeMillis());
                        continueConsume = false;
                    }
                    break;
                default:
                    break;
            }
        }

        if (commitOffset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
            this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), commitOffset, false);
        }

        return continueConsume;
    }
  • Шаг 1: Определите, будет ли текущее смещение автоматически отправляться для обновлений.Как правило, autoCommit не нужно устанавливать.По умолчанию используется автоматическая отправка.Такая настройка будет выполнена, если нет особых требований.
  • Шаг 2: Если он отправлен автоматически, вам нужно оценить статус:
    • УСПЕХ: если это успешное состояние, получите смещение, которое необходимо отправить в данный момент, а затем запишите его в OK TPS.
    • SUSPEND_CURRENT_QUEUE_A_MOMENT: обратите внимание, что RECONSUME_LATER будет возвращен, если в обычных сообщениях произойдет сбой, в чем разница? В этом состоянии он не будет повторно отправлять текущую тему, а снова отправит ConsumeRequest в локальный пул потоков, задержит повторную попытку, и время по умолчанию здесь равно 1 с. Если оно превышает максимальное количество повторных попыток, данные будут отправлены в RetryTopic.
  • Шаг 3: Аналогичен шагу 2, если он не отправляется автоматически, но отправленное смещение не будет получено.
  • Шаг 4: Обновите смещение.

Здесь мы возвращаемся к третьему вопросу, как установить количество повторных сообщений о потреблении этого сообщения? Поскольку Ali Cloud MQ US напрямую, поэтому мы упаковали слой, легкий доступ. Затем у меня изначально слой доступа мы изначально единой конфигурации максимально повторяют 2000 раз, в 2000 раз причинах, установленные здесь, в основном хотят наш очередь сообщений как можно больше, чтобы бесконечно повторить попытку, потому что мы в принципе по умолчанию в конечном итоге добиваются успеха, но в порядке, если во-первых, Здесь мы установили большее значение 2000 раз. Установите 2000 для нашего общего сообщения, в основном без воздействия, потому что он будет повторно доставляется в брокер, но наше сообщение о заказе не работает, если порядок в 2000 раз сообщение установлено повторную попытку, когда столкнутся с таким сообщением, не может Удачное время приведет к тому, что сообщение было получено на месте, и из-за блокировки очереди, поэтому текущая учетная левка всегда будет заблокирована, что приведет к последующим сообщениям, не будет потребляться, если установлено не менее чем в 2000 раза меньше, чем полчаса, будет заблокирован. Таким образом, сообщение здесь должно быть порядок небольшого набора значений, мы теперь установлены на 16.

4. Наконец

Раньше я не очень много читал исходный код Rocketmq. После этого подавления я узнал много изысканных и отличных дизайнов от Rocketmq и перенес некоторый опыт на некоторые вопросы в тексте. Надеюсь, вы внимательно его прочитаете и найдете ответ .

Если вы считаете, что эта статья полезна для вас, то ваше внимание и пересылка - самая большая поддержка для меня, O(∩_∩)O:

å