Всем привет, я Java-разработчик Saiyan, который любит поэзию, спасибо за внимание~ ┗( ▔, ▔ )┛. Я слышал, что лайки и чтение более совместимы~
Стихотворение дня:
少年恃险若平地,独倚长剑凌清秋。
-- [Тан·Гу Куан] "Путешествие с тремя трудностями"
Сегодня давайте взглянем на подводные камни механизма повторных попыток RocketMQ. О нет, я зашел не в ту студию, рекомендуется поставить лайк + в избранное.
Ладно, сядем в машину~
Далее только проектируется механизм повтора Потребителя, Производитель относительно прост, это простая повторная передача (конечно, есть и механизм аварийного переключения), поэтому я не буду обсуждать его сейчас...
Привлечение очков знаний
- Принцип механизма повтора ACK
- Очередь недоставленных писем (очередь DLQ)
несколько вопросов
- Что означает повтор сообщения?
- Потребительское потребление сообщений делится на кластерный режим (Cluster) и широковещательный режим (Broadcast).Будут ли оба режима выполнять повторную попытку сообщения?
- Какова стратегия повторной отправки сообщения?
- Правила времени задержки для повторной отправки сообщения?
- Что такое очередь недоставленных сообщений? Каковы характеристики?
- Каковы условия для Msg, чтобы присоединиться к очереди недоставленных сообщений?
Фон знаний
Мы знаем, что когда потребители извлекают сообщения и потребляют сообщения, они разделяются двумя классами:
- Сообщение по запросу: PullMessageService
- Использование сообщений: ConsumeMessageConcurrentlyService
процесс потребления сообщений
Ниже показан только код ключа
1. Предположим, мы извлекаем сообщение и готовим его к отправке в ConsumeMessageConcurrentlyService для использования.Следующий код будет скорректирован:
// ConsumeMessageConcurrentlyService
public void submitConsumeRequest(
final List<MessageExt> msgs,
final ProcessQueue processQueue,
final MessageQueue messageQueue,
final boolean dispatchToConsume) {
final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
// 假设未分页
if (msgs.size() <= consumeBatchSize) {
// 消息封装到里面
ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
try {
// 丢线程池消费
this.consumeExecutor.submit(consumeRequest);
}
}
}
2. Внутренний код ConsumeRequest
@Override
public void run() {
// 1、Consumer 中设计的回调方法
MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
boolean hasException = false;
ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
try {
// 2、回调 Consumer 中的监听回调方法
status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
} catch (Throwable e) {
hasException = true;
}
// 3、如果status 返回null,设置为 RECONSUME_LATER 类型
if (null == status) {
status = ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
// 4、对返回的 status 结果进行处理
ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
}
Какие? Что означает метод обратного вызова прослушивателя в Consumer?
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("your_topic");
// .... 省略部分代码
// 1、设置监听回调方法
consumer.setMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
try {
System.out.println(result);
// 2、返回成功表示消费成功,不会进行重试
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
// 3、返回 RECONSUME_LATER 表示消息需要重试(返回NULL也是一样)
// RECONSUME_LATER:通过单词我们知道是 稍后重新消费的意思,即重试
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
});
Метод обратного вызова — это анонимный класс, который вы написали выше. Думаю, ты должен знать, такой скромный (ー̀дー́)
3. Определите, нужно ли вам повторить попытку в соответствии с возвращенным статусом
public void processConsumeResult(
final ConsumeConcurrentlyStatus status,
final ConsumeConcurrentlyContext context,
final ConsumeRequest consumeRequest
) {
int ackIndex = context.getAckIndex();
switch (status) {
// 1、消费成功
case CONSUME_SUCCESS:
if (ackIndex >= consumeRequest.getMsgs().size()) {
ackIndex = consumeRequest.getMsgs().size() - 1;
}
break;
// 2、消费延迟
case RECONSUME_LATER:
ackIndex = -1;
break;
default:
break;
}
// 3、针对不同的消息模式做不同的处理
switch (this.defaultMQPushConsumer.getMessageModel()) {
// 4、广播模式:如果消费是爱 ackIndex 为-1就会执行循环,可以看到只是打印日志,没有其它多余的操作
case BROADCASTING:
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
}
break;
// 5、集群模式
case CLUSTERING:
List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
// 6、RECONSUME_LATER 时,ackIndex 为-1,执行循环。CONSUME_SUCCESS 时不会执行循环
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
// 7、能到这里说明是 RECONSUME_LATER 状态:回退Msg到Broker,也就是ACK(重试)
boolean result = this.sendMessageBack(msg, context);
// 8、ACK 可能会失败,需要记录失败的ACK
if (!result) {
msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
msgBackFailed.add(msg);
}
}
if (!msgBackFailed.isEmpty()) {
consumeRequest.getMsgs().removeAll(msgBackFailed);
// 9、存在ACK 失败的消息,将消息丢到线程池延迟 5s 重新消费
this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
}
break;
default:
break;
}
// 10、更新消费的偏移量:注意这里 CONSUME_SUCCESS 和 RECONSUME_LATER 都会更新
long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
}
}
Основываясь на приведенном выше исходном коде, мы можем сделать следующие выводы:
1,Из шага 4 мы видим, что в широковещательном режиме, даже если потребитель не сможет потреблять, он не будет повторять попытку, а только распечатает журнал предупреждений.
2. Только сообщения, которые не могут быть использованы (без возврата CONSUME_SUCCESS становятся сбоями), должны отправлять ACK и повторять попытку.
3. Если ACK завершается сбоем (мне всегда кажется, что ACK здесь вызывается странно, в "RocketMQ Technology Insider" он становится сбоем ACK), мы вызываемповторная попытка не удаласьБар.
Если повторная попытка не удалась, она будет отложена на 5 с.повторное потребление(Он вернет метод обратного вызова в Consumer)
4. Когда сообщение будет использовано успешно или нет, смещение потребителя будет обновлено.
4. ConsumeMessageConcurrentlyService.sendMessageBack: подготовьтесь к запросу брокера
public boolean sendMessageBack(final MessageExt msg, final ConsumeConcurrentlyContext context) {
// 1、注意这里:默认为0,其实一直都是0,其它地方没有修改。这表示RocketMQ延迟消息的 延迟级别
int delayLevel = context.getDelayLevelWhenNextConsume();
try {
// 2、发送给Broker
this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, context.getMessageQueue().getBrokerName());
return true;
} catch (Exception e) {
log.error("sendMessageBack exception, group: " + this.consumerGroup + " msg: " + msg.toString(), e);
}
return false;
}
Какие? Вы не знаете, что означает уровень задержки отложенных сообщений RocketMQ? Т_Т"
Пример задержки на официальном сайте RocketMQ
Мы знаем, что уровень задержки RocketMQ делится на 18 уровней, delayLevel от 1 до 18, каждое число соответствует времени задержки.
Время задержки следующее:
1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
Например: delayLevel=1, что означает задержку в 1 с.
Тогда delayLevel=4, означает ли задержка 30 секунд? Ага, ты тоже научился быстро отвечать, вот что значит. Ты действительно умный. (о゚▽゚)о
Брокер концы повторной попытки
Следующий код предназначен для исходного кода Broker, и читателям необходимо загрузить исходный код RocketMQ, чтобы увидеть его.
Этот метод представляет собой код, который обрабатывает повторный запрос Потребителя, и код в методе относительно длинный. В основном сделайте следующее:
- Другие темы новостей
"%RETRY%"+ group
, вычислить queueId (очередь повторных попыток, количество очередей равно 1) - Если сообщение повторяется >= 16 раз (по умолчанию). Продолжайте менять тему сообщения наочередь недоставленных сообщенийТема:
"%DLQ%" + group
, очередь потребления равна 1 (очередь недоставленных сообщений имеет только одну очередь потребления) - если не статьмертвая буква, чтобы вычислить уровень задержки сообщения
- Скопируйте исходное сообщение, повторно сгенерируйте сообщение, добавьте новое сообщение в BrokerController, а затем сохраните его в CommitLog для хранения (что? Вы не знаете, что такое CommitLog? Я напишу статью о внутренней структуре хранения RocketMQ в следующий выпуск)
- Новое сообщение будет иметь новый идентификатор сообщения
- Недоставленное письмо: сообщение имеет новое название темы:
"%RETRY%"+ group
Сохранить в CommitLog каксообщение с задержкой - Мертвая буква: с
"%DLQ%" + group
Имя темы, хранящееся в CommitLog: сообщения, хранящиеся в очереди недоставленных сообщений, не будут использоваться потребителем.
private RemotingCommand consumerSendMsgBack(final ChannelHandlerContext ctx, final RemotingCommand request){
// 1、新的Topic名:"%RETRY%"+ group
String newTopic = MixAll.getRetryTopic(requestHeader.getGroup());
// 重试队列数为1
int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % subscriptionGroupConfig.getRetryQueueNums();
// 2、都是为0
int delayLevel = requestHeader.getDelayLevel();
// 3、消息重试次数:重试几次这里存的就是低几次
int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();
// 4、如果超过最大重试次数(默认为16)
if (msgExt.getReconsumeTimes() >= maxReconsumeTimes
|| delayLevel < 0) {
// 5、更改Topic 名为死信队列名:"%DLQ%" + group
newTopic = MixAll.getDLQTopic(requestHeader.getGroup());
// 6、默认死信队列数为1
queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;
} else {
// 7、delayLevel 其实都为0,所以这里就相当于是重试次数 +3
if (0 == delayLevel) {
delayLevel = 3 + msgExt.getReconsumeTimes();
}
msgExt.setDelayTimeLevel(delayLevel);
}
// 8、新建消息,准备存到CommitLog中作为新消息
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
msgInner.setTopic(newTopic);
msgInner.setQueueId(queueIdInt);
// 8-1、重试次数+1。新消息被消费者消费时就会传上来,到第4步进行比较
msgInner.setReconsumeTimes(msgExt.getReconsumeTimes() + 1);
// 9、作为新消息存到CommitLog中
PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
}
Что такое очередь недоставленных сообщений (очередь DLQ)?
Обратитесь в блог, не делайте колеса
Резюме:
- Отдельная очередь (DLQ) в брокере, в которой хранятся сообщения, которые не были успешно обработаны после 16 повторных попыток на стороне потребителя.
- Очередь: только разрешение на запись, без разрешения на чтение. Следовательно, Потребитель не может повторно использовать его, и его можно только вручную вмешать и повторно доставить (работает в Rocket-MQ-Console).
- В очереди DLQ ТЕМА сообщения переименовывается в:
"%DLQ%"
+groupName
- Очередь DLQ на самом деле (папка consumequeue
"%DLQ%"
+groupName
Очередь под названной папкой темы)
Какие? Что, черт возьми, за папка "consumerqueue"? Жди меня..., напиши сразу структуру хранения сообщений RocketMQ и все поймешь
Механизм задержки повторного сообщения
Мы говорим, что после того, как сообщение о повторной попытке отправлено Брокеру, оно рассматривается как новое.сообщение с задержкойХранится в CommitLog, когда сообщение достигает точки времени потребления, оно будет повторно использовано Потребителем.
Сообщение повторяется 16 раз, прежде чем оно будет удалено.очередь недоставленных сообщений, он не будет потребляться.
Как долго остальные 15 сообщений задерживаются каждый раз?
Мы видим, что приведенный выше исходный код действительно просматривается: на уровень задержки сообщения влияет количество перепланировок. Чем больше повторных попыток, тем больше задержка.
delayLevel = 3 + msgExt.getReconsumeTimes();
Конкретная задержка повтора выглядит следующим образом:Изображение из облака Alibaba
Суммировать
Вернемся к нескольким вопросам, с которых мы начали:
- Что означает повтор сообщения?
- Потребительское потребление сообщений делится на кластерный режим (Cluster) и широковещательный режим (Broadcast).Будут ли оба режима выполнять повторную попытку сообщения?
- Какова стратегия повторной отправки сообщения?
- Правила времени задержки для повторной отправки сообщения?
- Что такое очередь недоставленных сообщений? Каковы характеристики?
- Каковы условия для Msg, чтобы присоединиться к очереди недоставленных сообщений?
Что означает повтор сообщения?
Чтобы обеспечить высокую доступность, RocketMQ, если Потребитель не может использовать сообщение (функция обратного вызова не возвращаетCONSUME_SUCCESS
), потребителю необходимо повторно использовать сообщение.
Потребительское потребление сообщений делится на кластерный режим (Cluster) и широковещательный режим (Broadcast).Будут ли оба режима выполнять повторную попытку сообщения?
Режим вещания запускается только сжурнал предупрежденийВ виде сообщения об ошибке потребления записи и не будет повторяться
Кластерный режим реализует только механизм повторной отправки сообщения.
Какова стратегия повторной отправки сообщения?
Сторона брокера использует метод задержки сообщения для повторного использования потребителем.
Правила времени задержки для повторной отправки сообщения?
Каковы условия для Msg, чтобы присоединиться к очереди недоставленных сообщений?
После повторной попытки сообщения 16 раз Потребитель не был успешно использован.
Наконец
Если есть какие-либо ошибки, пожалуйста, дайте мне знать в разделе комментариев. Мы продолжим обновлять статьи, связанные с RocektMQ, в будущем Добро пожаловать, чтобы оставить сообщение в области комментариев~
Последние статьи будут сначала обновляться в WeChat, добро пожаловать в преследование утки~ (ノ゚▽゚)ノ\