«Эта статья участвовала в мероприятии Haowen Convocation Order, щелкните, чтобы просмотреть:Двойные заявки на внутреннюю и внешнюю стороны, призовой фонд в 20 000 юаней ждет вас, чтобы бросить вызов!"
Проблема с задержкой последовательного потребления RocketMQ
Предыстория и явление проблемы
Прошлой ночью я получил сигнал тревоги приложения и обнаружил, что определенное сообщение о потреблении бизнеса в Интернете было задержано более чем на 54 секунды (интервал от сообщения, отправленного в MQ, до времени его использования):
2021-06-30T23:12:46.756 message processing is incredibly delayed! (Current delay time: 54725, incredible delay count in 10 seconds: 5677)
Глядя на мониторинг RocketMQ, обнаруживается, что действительно существует относительно большое отставание сообщений:
Просмотр пользователей тем из RocketMQ-Console:
Для этой темы бизнес-требованиянужно заказатьиз. Поэтому при отправке указывайтеБизнес-ключ, а при потреблении с помощьюсхема последовательного потребления.
Мы используем кластер RocketMQ с тремя брокерами.Для этой темы у каждого брокера есть 8 ReadQueues и WriteQueues. Вот краткое упоминание о значении ReadQueue и WriteQueue:
В RocketMQ,При отправке сообщения используйте количество WriteQueues для возврата информации о маршрутизации.,Когда сообщение потребляется, информация о маршрутизации возвращается в соответствии с количеством ReadQueues.. На уровне физического файла только WriteQueues создают файлы. Например: установите WriteQueueNum = 8, ReadQueueNum = 4, будет создано 8 папок, представляющих 8 очередей 0 1 2 3 4 5 6 7, но когда сообщение используется, информация о маршрутизации возвращает только 4, в конкретном запросе. сообщение отправляется, потребляются только сообщения в четырех очередях 0 1 2 3, а 4 5 6 7 не потребляются вообще. И наоборот, если вы установите WriteQueueNum = 4 и ReadQueueNum = 8, при создании сообщений будут создаваться только сообщения в 0 1 2 3, а при потреблении сообщений они будут потребляться из всех очередей 0 1 2 3 4 5 6 7, конечно 4 В 5 6 7 вообще нет сообщений.Предполагая, что потребление является групповым потреблением, в группе есть два потребителя.На самом деле только первый потребитель фактически потребляет сообщения (0 1 2 3), а второй второго потребителя вообще нет.Невозможно использовать сообщение (4 5 6 7).Как правило, мы установим эти два значения одинаковыми., они будут установлены по-разному только тогда, когда количество очередей в топике необходимо уменьшить.
анализ проблемы
Первое, что приходит на ум, это то, чтоПотребительская нить застряла?Потоки застревают, как правило, потому что:
- Остановка мира случилась:
- GC причины
- Вызвано другими причинами точки сохранения (такими как jstack, синхронизированный вход в точку сохранения и т. д., см. эту мою статьюСвязано с JVM - комплексное решение SafePoint и Stop The World)
- Поток, обрабатывающий сообщение, занимает слишком много времени, блокировка может не быть получена, и он может застрять в каком-то IO.
коллекция в то времяJFR(Для JFR, пожалуйста, обратитесь к моей другой серииПолное решение JFR),Обнаружить:
- не произошло за это времядолгий застойGC и другие события Safepoint Stop-the-world:
- В это время поток припаркован, а отображение стекаУ потребляющего потока нет сообщений для потребления:
Поскольку проблем с приложением нет, давайте посмотрим, есть ли проблемы с RocketMQ. Нас интересуют общие журналы RocketMQ Broker:
- Статистика потребления времени сохранения сообщения, если здесь возникает исключение, нам необходимо настроить параметры, связанные с Java MMAP, см.:
- Исключение сохранения сообщения, проверьте storeerr.log
- Исключение блокировки, проверьте lock.log
Так к какому брокеру мне обратиться? Как упоминалось ранее, hashKey указывается при отправке в эту тему, и мы можем определить, какой брокер находится через hashKey сообщения:
int hashCode = "我们的hashKey".hashCode();
log.info("{}", Math.abs(hashCode % 24));
Мы нашли hashKey сообщения.По вышеприведенному коду получается 20, то есть очередь 20. Согласно предыдущему описанию,Мы знаем, что у каждого брокера 8 очередей, а 20 соответствует очереди на брокере-2, то есть,брокер-2 queueId = 5 эта очередь. Давайте рассмотрим проблему с расположением логов на брокере-2.
Мы обнаружили, что в lock.log есть исключения, как показано ниже, похожих много, и они длились около 54 с, что согласуется с временем парковки потока и задержкой сообщения:
2021-07-01 07:11:47 WARN AdminBrokerThread_10 - tryLockBatch, message queue locked by other client. Group: 消费group OtherClientId: 10.238.18.6@29 NewClientId: 10.238.18.122@29 MessageQueue [topic=消息topic, brokerName=broker-2, queueId=5]
Этот журнал означает,10.238.18.122@29
Попытка этого экземпляра заблокировать очередь с идентификатором очереди = 5 не удалась, поскольку10.238.18.6@29
держит этот замок. Так почему же это происходит?
Принцип последовательного потребления RocketMQ с несколькими очередями
Если RocketMQ хочет добиться последовательного потребления нескольких очередей, сначала необходимо указать hashKey. Сообщение будет помещено в определенную очередь с помощью hashKey. Когда потребители используют эту очередь, если указано последовательное потребление, дапотребление одного потока, так что порядок в той же очереди гарантирован.
Так как же обеспечить, чтобы каждая очередь использовалась одним потоком? Каждый брокер поддерживает один:
private final ConcurrentMap<String/* group */, ConcurrentHashMap<MessageQueue, LockEntry>> mqLockTable =
new ConcurrentHashMap<String, ConcurrentHashMap<MessageQueue, LockEntry>>(1024);
ОнConcurrentMap<消费组名称, ConcurrentHashMap<消息队列, 锁对象>>
. Объект блокировки LockEntry включает в себя:
//读取 rocketmq.broker.rebalance.lockMaxLiveTime 这个环境变量,默认 60s
private final static long REBALANCE_LOCK_MAX_LIVE_TIME = Long.parseLong(System.getProperty(
"rocketmq.broker.rebalance.lockMaxLiveTime", "60000"));
static class LockEntry {
//RocketMQ 客户端唯一 id
private String clientId;
private volatile long lastUpdateTimestamp = System.currentTimeMillis();
//省略getter setter
public boolean isLocked(final String clientId) {
boolean eq = this.clientId.equals(clientId);
return eq && !this.isExpired();
}
public boolean isExpired() {
// 在 REBALANCE_LOCK_MAX_LIVE_TIME 这么长时间后过期
boolean expired =
(System.currentTimeMillis() - this.lastUpdateTimestamp) > REBALANCE_LOCK_MAX_LIVE_TIME;
return expired;
}
}
Клиент RocketMQ отправляетLOCK_BATCH_MQКогда запрос сделан к брокеру, брокер инкапсулирует запрос клиента как LockEntry и попытается обновить карту.Если обновление прошло успешно, блокировка получена, а в случае сбоя блокировка не получена. Подробная логика обновления брокера (Если вам интересно, вы можете просмотреть его, или вы можете пропустить его напрямую, не влияя на понимание.Есть изображения, которые легко понять.):
public boolean tryLock(final String group, final MessageQueue mq, final String clientId) {
//判断没有已经锁住
if (!this.isLocked(group, mq, clientId)) {
try {
//获取锁,这个锁是实例内的,因为每个 broker 维护自己的队列锁表,并不共享
this.lock.lockInterruptibly();
try {
//尝试获取,判断是否存在,存在就判断是否过期
ConcurrentHashMap<MessageQueue, LockEntry> groupValue = this.mqLockTable.get(group);
if (null == groupValue) {
groupValue = new ConcurrentHashMap<>(32);
this.mqLockTable.put(group, groupValue);
}
LockEntry lockEntry = groupValue.get(mq);
if (null == lockEntry) {
lockEntry = new LockEntry();
lockEntry.setClientId(clientId);
groupValue.put(mq, lockEntry);
log.info("tryLock, message queue not locked, I got it. Group: {} NewClientId: {} {}",
group,
clientId,
mq);
}
if (lockEntry.isLocked(clientId)) {
lockEntry.setLastUpdateTimestamp(System.currentTimeMillis());
return true;
}
String oldClientId = lockEntry.getClientId();
if (lockEntry.isExpired()) {
lockEntry.setClientId(clientId);
lockEntry.setLastUpdateTimestamp(System.currentTimeMillis());
log.warn(
"tryLock, message queue lock expired, I got it. Group: {} OldClientId: {} NewClientId: {} {}",
group,
oldClientId,
clientId,
mq);
return true;
}
//这里就是我们刚刚看到的日志
log.warn(
"tryLock, message queue locked by other client. Group: {} OtherClientId: {} NewClientId: {} {}",
group,
oldClientId,
clientId,
mq);
return false;
} finally {
this.lock.unlock();
}
} catch (InterruptedException e) {
log.error("putMessage exception", e);
}
} else {
}
return true;
}
//判断是否是已经锁住了
private boolean isLocked(final String group, final MessageQueue mq, final String clientId) {
//通过消费组名称获取
ConcurrentHashMap<MessageQueue, LockEntry> groupValue = this.mqLockTable.get(group);
//如果不为 null
if (groupValue != null) {
//尝试获取 lockEntry,看是否存在
LockEntry lockEntry = groupValue.get(mq);
if (lockEntry != null) {
//如果存在,判断是否过期
boolean locked = lockEntry.isLocked(clientId);
if (locked) {
lockEntry.setLastUpdateTimestamp(System.currentTimeMillis());
}
return locked;
}
}
return false;
}
Каждый клиент MQ будет периодически отправлятьLOCK_BATCH_MQзапрашивать и поддерживать все очереди полученных блокировок локально:
//定时发送 **LOCK_BATCH_MQ** 间隔
public final static long REBALANCE_LOCK_INTERVAL = Long.parseLong(System.getProperty("rocketmq.client.rebalance.lockInterval", "20000"));
ConsumeMessageOrderlyService.java
:
if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
ConsumeMessageOrderlyService.this.lockMQPeriodically();
}
}, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS);
}
Блок-схема показана ниже:
ConsumeMessageOrderlyService
При выключении все очереди разблокированы:
public void shutdown() {
this.stopped = true;
this.scheduledExecutorService.shutdown();
this.consumeExecutor.shutdown();
if (MessageModel.CLUSTERING.equals(this.defaultMQPushConsumerImpl.messageModel())) {
this.unlockAllMQ();
}
}
причина проблемы
Наш клиент сюда регулярно отправляетLOCK_BATCH_MQИнтервал - это по умолчанию 20-е годы, а время блокировки бокового брокера также является по умолчанию 60-х.
Наша оркестрация контейнеров кластера использует k8s и имеет функцию миграции экземпляров. Когда кластер находится под высокой нагрузкой, автоматически масштабируются новые узлы (которые можно понимать как виртуальные машины), а также создаются и развертываются новые экземпляры службы. Когда нагрузка на некоторые сервисы в кластере низкая, некоторые экземпляры сервисов будут уменьшены. В настоящее время нет необходимости в таком количестве узлов, и некоторые узлы будут переработаны. Однако все еще есть экземпляры сервисов, которые нельзя уменьшено на переработанных узлах.В настоящее время эти экземпляры службы необходимо перенести на другие узлы.. Вот наш бизнес-пример, где это происходит.
Когда проблема возникла, это произошломигрировать, старый экземпляр закрыт, но не ждетConsumeMessageOrderlyService#shutdownисполнение,В результате блокировка не снимается активно, а ожидает истечения срока действия блокировки в 60 сек., прежде чем новый экземпляр получит блокировку очереди и начнет потребление..
задача решена
- В следующей версии добавьте изящную логику завершения работы для клиентов RocketMQ.
- Конфигурация всех экземпляров службы (клиенты RocketMQ)
rocketmq.client.rebalance.lockInterval
Сократите время сердцебиения (5 с), конфигурация RocketMQ Brokerrocketmq.broker.rebalance.lockMaxLiveTime
Сократите время истечения (например, 15 с), но оставьте время истечения в 3 раза больше, чем время сердцебиения (3-кратная аксиома проектирования в кластерах).
Ищите «My Programming Meow» в WeChat, подписывайтесь на официальный аккаунт, чистите каждый день, легко улучшайте свои технологии и получайте различные предложения.