Проблема расположение задержки последовательной потребления RocketMQ RocketMQ

Java RocketMQ
Проблема расположение задержки последовательной потребления RocketMQ RocketMQ

«Эта статья участвовала в мероприятии Haowen Convocation Order, щелкните, чтобы просмотреть:Двойные заявки на внутреннюю и внешнюю стороны, призовой фонд в 20 000 юаней ждет вас, чтобы бросить вызов!"

Проблема с задержкой последовательного потребления RocketMQ

Предыстория и явление проблемы

Прошлой ночью я получил сигнал тревоги приложения и обнаружил, что определенное сообщение о потреблении бизнеса в Интернете было задержано более чем на 54 секунды (интервал от сообщения, отправленного в MQ, до времени его использования):

2021-06-30T23:12:46.756 message processing is incredibly delayed! (Current delay time: 54725, incredible delay count in 10 seconds: 5677) 

Глядя на мониторинг RocketMQ, обнаруживается, что действительно существует относительно большое отставание сообщений:image

Просмотр пользователей тем из RocketMQ-Console:image

Для этой темы бизнес-требованиянужно заказатьиз. Поэтому при отправке указывайтеБизнес-ключ, а при потреблении с помощьюсхема последовательного потребления.

Мы используем кластер RocketMQ с тремя брокерами.Для этой темы у каждого брокера есть 8 ReadQueues и WriteQueues. Вот краткое упоминание о значении ReadQueue и WriteQueue:

В RocketMQ,При отправке сообщения используйте количество WriteQueues для возврата информации о маршрутизации.,Когда сообщение потребляется, информация о маршрутизации возвращается в соответствии с количеством ReadQueues.. На уровне физического файла только WriteQueues создают файлы. Например: установите WriteQueueNum = 8, ReadQueueNum = 4, будет создано 8 папок, представляющих 8 очередей 0 1 2 3 4 5 6 7, но когда сообщение используется, информация о маршрутизации возвращает только 4, в конкретном запросе. сообщение отправляется, потребляются только сообщения в четырех очередях 0 1 2 3, а 4 5 6 7 не потребляются вообще. И наоборот, если вы установите WriteQueueNum = 4 и ReadQueueNum = 8, при создании сообщений будут создаваться только сообщения в 0 1 2 3, а при потреблении сообщений они будут потребляться из всех очередей 0 1 2 3 4 5 6 7, конечно 4 В 5 6 7 вообще нет сообщений.Предполагая, что потребление является групповым потреблением, в группе есть два потребителя.На самом деле только первый потребитель фактически потребляет сообщения (0 1 2 3), а второй второго потребителя вообще нет.Невозможно использовать сообщение (4 5 6 7).Как правило, мы установим эти два значения одинаковыми., они будут установлены по-разному только тогда, когда количество очередей в топике необходимо уменьшить.

анализ проблемы

Первое, что приходит на ум, это то, чтоПотребительская нить застряла?Потоки застревают, как правило, потому что:

  1. Остановка мира случилась:
  2. GC причины
  3. Вызвано другими причинами точки сохранения (такими как jstack, синхронизированный вход в точку сохранения и т. д., см. эту мою статьюСвязано с JVM - комплексное решение SafePoint и Stop The World)
  4. Поток, обрабатывающий сообщение, занимает слишком много времени, блокировка может не быть получена, и он может застрять в каком-то IO.

коллекция в то времяJFR(Для JFR, пожалуйста, обратитесь к моей другой серииПолное решение JFR),Обнаружить:

  1. не произошло за это времядолгий застойGC и другие события Safepoint Stop-the-world:

image

image

  1. В это время поток припаркован, а отображение стекаУ потребляющего потока нет сообщений для потребления:

image

Поскольку проблем с приложением нет, давайте посмотрим, есть ли проблемы с RocketMQ. Нас интересуют общие журналы RocketMQ Broker:

  1. Статистика потребления времени сохранения сообщения, если здесь возникает исключение, нам необходимо настроить параметры, связанные с Java MMAP, см.:
  2. Исключение сохранения сообщения, проверьте storeerr.log
  3. Исключение блокировки, проверьте lock.log

Так к какому брокеру мне обратиться? Как упоминалось ранее, hashKey указывается при отправке в эту тему, и мы можем определить, какой брокер находится через hashKey сообщения:

int hashCode = "我们的hashKey".hashCode();
log.info("{}", Math.abs(hashCode % 24));

Мы нашли hashKey сообщения.По вышеприведенному коду получается 20, то есть очередь 20. Согласно предыдущему описанию,Мы знаем, что у каждого брокера 8 очередей, а 20 соответствует очереди на брокере-2, то есть,брокер-2 queueId = 5 эта очередь. Давайте рассмотрим проблему с расположением логов на брокере-2.

Мы обнаружили, что в lock.log есть исключения, как показано ниже, похожих много, и они длились около 54 с, что согласуется с временем парковки потока и задержкой сообщения:

2021-07-01 07:11:47 WARN AdminBrokerThread_10 - tryLockBatch, message queue locked by other client. Group: 消费group OtherClientId: 10.238.18.6@29 NewClientId: 10.238.18.122@29 MessageQueue [topic=消息topic, brokerName=broker-2, queueId=5]

Этот журнал означает,10.238.18.122@29Попытка этого экземпляра заблокировать очередь с идентификатором очереди = 5 не удалась, поскольку10.238.18.6@29держит этот замок. Так почему же это происходит?

Принцип последовательного потребления RocketMQ с несколькими очередями

Если RocketMQ хочет добиться последовательного потребления нескольких очередей, сначала необходимо указать hashKey. Сообщение будет помещено в определенную очередь с помощью hashKey. Когда потребители используют эту очередь, если указано последовательное потребление, дапотребление одного потока, так что порядок в той же очереди гарантирован.

Так как же обеспечить, чтобы каждая очередь использовалась одним потоком? Каждый брокер поддерживает один:

private final ConcurrentMap<String/* group */, ConcurrentHashMap<MessageQueue, LockEntry>> mqLockTable =
        new ConcurrentHashMap<String, ConcurrentHashMap<MessageQueue, LockEntry>>(1024);

ОнConcurrentMap<消费组名称, ConcurrentHashMap<消息队列, 锁对象>>. Объект блокировки LockEntry включает в себя:

RebalanceLockManager.java:

//读取 rocketmq.broker.rebalance.lockMaxLiveTime 这个环境变量,默认 60s
private final static long REBALANCE_LOCK_MAX_LIVE_TIME = Long.parseLong(System.getProperty(
        "rocketmq.broker.rebalance.lockMaxLiveTime", "60000"));
static class LockEntry {
    //RocketMQ 客户端唯一 id
    private String clientId;
    private volatile long lastUpdateTimestamp = System.currentTimeMillis();
    
    //省略getter setter
    
    public boolean isLocked(final String clientId) {
        boolean eq = this.clientId.equals(clientId);
        return eq && !this.isExpired();
    }

    public boolean isExpired() {
        // 在 REBALANCE_LOCK_MAX_LIVE_TIME 这么长时间后过期
        boolean expired =
            (System.currentTimeMillis() - this.lastUpdateTimestamp) > REBALANCE_LOCK_MAX_LIVE_TIME;

        return expired;
    }
}

Клиент RocketMQ отправляетLOCK_BATCH_MQКогда запрос сделан к брокеру, брокер инкапсулирует запрос клиента как LockEntry и попытается обновить карту.Если обновление прошло успешно, блокировка получена, а в случае сбоя блокировка не получена. Подробная логика обновления брокера (Если вам интересно, вы можете просмотреть его, или вы можете пропустить его напрямую, не влияя на понимание.Есть изображения, которые легко понять.):

public boolean tryLock(final String group, final MessageQueue mq, final String clientId) {
    //判断没有已经锁住
    if (!this.isLocked(group, mq, clientId)) {
        try {
            //获取锁,这个锁是实例内的,因为每个 broker 维护自己的队列锁表,并不共享
            this.lock.lockInterruptibly();
            try {
                //尝试获取,判断是否存在,存在就判断是否过期
                ConcurrentHashMap<MessageQueue, LockEntry> groupValue = this.mqLockTable.get(group);
                if (null == groupValue) {
                    groupValue = new ConcurrentHashMap<>(32);
                    this.mqLockTable.put(group, groupValue);
                }
                
                LockEntry lockEntry = groupValue.get(mq);
                if (null == lockEntry) {
                    lockEntry = new LockEntry();
                    lockEntry.setClientId(clientId);
                    groupValue.put(mq, lockEntry);
                    log.info("tryLock, message queue not locked, I got it. Group: {} NewClientId: {} {}",
                        group,
                        clientId,
                        mq);
                }

                if (lockEntry.isLocked(clientId)) {
                    lockEntry.setLastUpdateTimestamp(System.currentTimeMillis());
                    return true;
                }

                String oldClientId = lockEntry.getClientId();

                if (lockEntry.isExpired()) {
                    lockEntry.setClientId(clientId);
                    lockEntry.setLastUpdateTimestamp(System.currentTimeMillis());
                    log.warn(
                        "tryLock, message queue lock expired, I got it. Group: {} OldClientId: {} NewClientId: {} {}",
                        group,
                        oldClientId,
                        clientId,
                        mq);
                    return true;
                }
                //这里就是我们刚刚看到的日志
                log.warn(
                    "tryLock, message queue locked by other client. Group: {} OtherClientId: {} NewClientId: {} {}",
                    group,
                    oldClientId,
                    clientId,
                    mq);
                return false;
            } finally {
                this.lock.unlock();
            }
        } catch (InterruptedException e) {
            log.error("putMessage exception", e);
        }
    } else {

    }

    return true;
}
//判断是否是已经锁住了
private boolean isLocked(final String group, final MessageQueue mq, final String clientId) {
    //通过消费组名称获取
    ConcurrentHashMap<MessageQueue, LockEntry> groupValue = this.mqLockTable.get(group);
    //如果不为 null
    if (groupValue != null) {
        //尝试获取 lockEntry,看是否存在
        LockEntry lockEntry = groupValue.get(mq);
        if (lockEntry != null) {
            //如果存在,判断是否过期
            boolean locked = lockEntry.isLocked(clientId);
            if (locked) {
                lockEntry.setLastUpdateTimestamp(System.currentTimeMillis());
            }

            return locked;
        }
    }

    return false;
}

Каждый клиент MQ будет периодически отправлятьLOCK_BATCH_MQзапрашивать и поддерживать все очереди полученных блокировок локально:

ProcessQueue.java:

//定时发送 **LOCK_BATCH_MQ** 间隔
public final static long REBALANCE_LOCK_INTERVAL = Long.parseLong(System.getProperty("rocketmq.client.rebalance.lockInterval", "20000"));

ConsumeMessageOrderlyService.java:

if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) {
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            ConsumeMessageOrderlyService.this.lockMQPeriodically();
        }
    }, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS);
}

Блок-схема показана ниже:

image

ConsumeMessageOrderlyServiceПри выключении все очереди разблокированы:

public void shutdown() {
    this.stopped = true;
    this.scheduledExecutorService.shutdown();
    this.consumeExecutor.shutdown();
    if (MessageModel.CLUSTERING.equals(this.defaultMQPushConsumerImpl.messageModel())) {
        this.unlockAllMQ();
    }
}

причина проблемы

Наш клиент сюда регулярно отправляетLOCK_BATCH_MQИнтервал - это по умолчанию 20-е годы, а время блокировки бокового брокера также является по умолчанию 60-х.

Наша оркестрация контейнеров кластера использует k8s и имеет функцию миграции экземпляров. Когда кластер находится под высокой нагрузкой, автоматически масштабируются новые узлы (которые можно понимать как виртуальные машины), а также создаются и развертываются новые экземпляры службы. Когда нагрузка на некоторые сервисы в кластере низкая, некоторые экземпляры сервисов будут уменьшены. В настоящее время нет необходимости в таком количестве узлов, и некоторые узлы будут переработаны. Однако все еще есть экземпляры сервисов, которые нельзя уменьшено на переработанных узлах.В настоящее время эти экземпляры службы необходимо перенести на другие узлы.. Вот наш бизнес-пример, где это происходит.

Когда проблема возникла, это произошломигрировать, старый экземпляр закрыт, но не ждетConsumeMessageOrderlyService#shutdownисполнение,В результате блокировка не снимается активно, а ожидает истечения срока действия блокировки в 60 сек., прежде чем новый экземпляр получит блокировку очереди и начнет потребление..

задача решена

  1. В следующей версии добавьте изящную логику завершения работы для клиентов RocketMQ.
  2. Конфигурация всех экземпляров службы (клиенты RocketMQ)rocketmq.client.rebalance.lockIntervalСократите время сердцебиения (5 с), конфигурация RocketMQ Brokerrocketmq.broker.rebalance.lockMaxLiveTimeСократите время истечения (например, 15 с), но оставьте время истечения в 3 раза больше, чем время сердцебиения (3-кратная аксиома проектирования в кластерах).

Ищите «My Programming Meow» в WeChat, подписывайтесь на официальный аккаунт, чистите каждый день, легко улучшайте свои технологии и получайте различные предложения.