написать впереди
В своей повседневной разработке мы часто сталкиваемся с такими бизнес-сценариями «запуска события через определенный промежуток времени». Такие как:
- Заказ будет автоматически отменен, если вы не оплатите его в течение 30 минут после размещения заказа на платформе электронной коммерции.
- Красные конверты автоматически возвращаются, если они не получены в течение 24 часов.
Общие решения
1. Обычное сканирование
Время срабатывания события записывается заранее, и запланированная задача не проверяет базу данных для сравнения времени срабатывания.
Этот метод не работает в режиме реального времени.По мере увеличения частоты выполнения запланированных задач запуск в реальном времени будет улучшаться, но частое сканирование увеличивает нагрузку на базу данных, что также является самым простым способом.
2. JDK-решение
Таймер Timer, предоставленный jdk, очередь задержки DelayQueue.
Этот метод можно использовать в автономной среде с низкими требованиями к надежности, так как в памяти jvm существуют и задачи, и очереди, поэтому распределенная среда не поддерживается, и система не может быть восстановлена после внезапного простоя.
3. Отложенное сообщение промежуточного программного обеспечения сообщений
Производитель доставляет задержанное сообщение, а потребитель может использовать сообщение через указанное время, поэтому для развития нашего бизнеса нам нужно обращать внимание только на сообщение, срок действия которого только что истек.
В мире существует много зрелых промежуточных программ для сообщений.Как Java-разработчик, я предпочитаю RocketMQ с открытым исходным кодом, потому что другие промежуточные программы для сообщений являются для меня черным ящиком.В RocketMQ, если у вас возникнут проблемы и сомнения, вы даже можете взять исходный код для отладки.
Знакомство с задержанными сообщениями RocketMQ
Используйте официального производителя для доставки сообщения api
Как видите, предоставленный нам официальный метод таков:
setDelayTimeLevel()
, не является пользовательским временем задержки.
Я был так озадачен дизайном этого места, что придумал эту статью. Для уровня задержки «5» на рисунке это просто случайный уровень (соответствующий одной минуте), который будет подробно проанализирован позже.
После завершения доставки немедленно откройте консоль для просмотра
Сайты потребления, соответствующие четырем очередям в теме, не изменились, поэтому потребители, подписавшиеся на тему в это время, не могут немедленно использовать сообщение.
после ожидания задержки
Я обнаружил, что сообщение появилось в этой теме, и время было всего через одну минуту после моего времени доставки.
догадка
Из внешнего представления задержанное сообщение не доставляется напрямую в соответствующий топик, а испытало какой-то «транзит» между производителем и топиком, и производитель доставляет задержанное сообщение на эту «транзитную станцию», есть и другие задачи, которые вынимают просроченные сообщения с этой "пересадочной станции" и отправляют их в тему.
Отладочный анализ исходного кода
Совет: здесь, если вы следуете продюсеру
send()
Метод может быть нелегко найти, есть много уровней, и уровень задержки настройкиMessage
КатегорияsetDelayTimeLevel()
, обратный толчок должен иметь место для вызоваMessage
серединаgetDelayTimeLevel()
, названный в нескольких местахgetDelayTimeLevel()
Когда в метод помещается точка останова, можно найти логику обработки задержки.
Доставка производителяCommitLog.java
Перебрасывать обычные сообщения с уровнем задержки непосредственно на «ретрансляционную станцию» —SCHEDULE_TOPIC_XXXX
специальная очередь
public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
...
if (msg.getDelayTimeLevel() > 0) {
//延时消息处理逻辑
if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
//如果延时级别大于最大值,则置为最大值
msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
}
//一个名为“SCHEDULE_TOPIC_XXXX”的特殊topic常量名
topic = ScheduleMessageService.SCHEDULE_TOPIC;
//队列号为delayLevel - 1(延时级别减1)
queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
// Backup real topic, queueId
//将消息的真实topic和queueId设置为其他属性保存
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
//重新设置消息的topic为“SCHEDULE_TOPIC_XXXX”
msg.setTopic(topic);
msg.setQueueId(queueId);
}
...
}
это
SCHEDULE_TOPIC_XXXX
Специальную тему "транзитная станция" мы не можем просмотреть в консоли консоли, но она существует в нашем постоянном хранилище каталогов, как показано на рисунке, номер очереди 4, что соответствует уровню задержки минус 1 (5-1 =4), в соответствии с нашим анализом исходного кода.
Отложенная логическая обработкаScheduleMessageService.java
1. Установите соответствующее соотношение между уровнем задержки и временем задержки.parseDelayLevel()
public boolean parseDelayLevel() {
HashMap<String, Long> timeUnitTable = new HashMap<String, Long>();
timeUnitTable.put("s", 1000L);
timeUnitTable.put("m", 1000L * 60);
timeUnitTable.put("h", 1000L * 60 * 60);
timeUnitTable.put("d", 1000L * 60 * 60 * 24);
//不同延时时间的字符串
//String levelString = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"
String levelString = this.defaultMessageStore.getMessageStoreConfig().getMessageDelayLevel();
try {
String[] levelArray = levelString.split(" ");
for (int i = 0; i < levelArray.length; i++) {
String value = levelArray[i];
String ch = value.substring(value.length() - 1);
Long tu = timeUnitTable.get(ch);
int level = i + 1;
if (level > this.maxDelayLevel) {
this.maxDelayLevel = level;
}
long num = Long.parseLong(value.substring(0, value.length() - 1));
long delayTimeMillis = tu * num;
//存放延时级别与延时时长的对应关系
this.delayLevelTable.put(level, delayTimeMillis);
}
} catch (Exception e) {
log.error("parseDelayLevel exception", e);
log.info("levelString String = {}", levelString);
return false;
}
return true;
}
2. Настройте отдельные круговые задачи для каждого уровня задержкиstart()
public void start() {
//cas乐观锁保证线程安全
if (started.compareAndSet(false, true)) {
this.timer = new Timer("ScheduleMessageTimerThread", true);
for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
//遍历延时级别
Integer level = entry.getKey();
Long timeDelay = entry.getValue();
Long offset = this.offsetTable.get(level);
if (null == offset) {
offset = 0L;
}
if (timeDelay != null) {
//为每一个延时级别创建一个分发延时消息任务,首次启动延迟1s
this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
}
}
/* 持久化任务 */
...
}
}
Рассылать задержанные сообщенияDeliverDelayedMessageTimerTask.java
По истечении срока задержанного сообщения «транзитной станции» оно преобразуется в обычное сообщение и доставляется в целевую тему:
public void executeOnTimeup() {
/* 从SCHEDULE_TOPIC_XXXX中获取对应特定延时等级的消息 */
...
//当前时间
long now = System.currentTimeMillis();
//消息经过延时后应该被发送的真实时间
long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
//还需要等待的时间
long countdown = deliverTimestamp - now;
if (countdown <= 0) {
//已经到期无需等待
MessageExt msgExt =
ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(
offsetPy, sizePy);
if (msgExt != null) {
try {
//将延时消息转化为普通消息(还记得上文`CommitLog.java`中将普通消息转化为了延时消息么)
MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
//将消息发送至目的地topic
PutMessageResult putMessageResult = ScheduleMessageService.this.writeMessageStore
.putMessage(msgInner);
if (putMessageResult != null
&& putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
continue;
} else {
// XXX: warn and notify me
//错误重试
log.error(
"ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}",
msgExt.getTopic(), msgExt.getMsgId());
ScheduleMessageService.this.timer.schedule(
new DeliverDelayedMessageTimerTask(this.delayLevel,
nextOffset), DELAY_FOR_A_PERIOD);
ScheduleMessageService.this.updateOffset(this.delayLevel,
nextOffset);
return;
}
} catch (Exception e) {
log.error(
"ScheduleMessageService, messageTimeup execute error, drop it. msgExt="
+ msgExt + ", nextOffset=" + nextOffset + ",offsetPy="
+ offsetPy + ",sizePy=" + sizePy, e);
}
}
} else {
//countdown>0,即消息还未过期,即还需要等待countdown毫秒
//延期countdown毫秒进行递归,这里设计的很巧妙,直接延期到消息发送的时间点,就不用反复判断是否过期了
ScheduleMessageService.this.timer.schedule(
new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset),
countdown);
ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
return;
}
...
// 没有找到延时消息,则延时0.1s再次启动该定时器递归
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
failScheduleOffset), DELAY_FOR_A_WHILE);
}
В итоге
Суммировать
- Между производителем и темой назначения добавляется слой «транзитной станции» - названный
SCHEDULE_TOPIC_XXXX
тема - В этой специальной теме по умолчанию 18 очередей, соответствующих разным уровням задержки.
- Каждая очередь в теме будет иметь задачу для определения того, истек ли срок действия сообщения в очереди.
характеристика
- 1. Все сообщения разделены по уровню задержки, что повышает производительность поиска файлов.
Преимущества: с большим количеством каталогов определенно легче находить файлы, а скорость позиционирования при чтении и записи файлов была улучшена.
- 2. Для каждого уровня каталога разделов поддерживается упорядоченная очередь в соответствии со временем задержки от малого к большому.
Преимущество: Для одного и того же уровня задержка вновь добавленного сообщения должна быть наибольшей, и оно помещается в конец очереди.Нам нужно только обратить внимание на первое в очереди, потому что первое истекает раньше (гарантированная задержка срабатывает в режиме реального времени). Прочитайте заголовок очереди и добавьте новые сообщения в конец (последовательное чтение и запись повышает производительность).
догадка
- Если поддерживается произвольная точность
Независимо от того, какой стандарт используется для секционирования сообщений, нет гарантии, что сообщения в секционированной очереди читаются и записываются последовательно (в этой файловой модели и никаком другом промежуточном программном обеспечении не используется).
В итоге:Отложенное сообщение RocketMQ учитывает производительность чтения и записи и характер запуска задержки в реальном времени после сохранения сообщений с различными задержками, а также вводит «уровень задержки» — решение, которое уравновешивает производительность и производительность в реальном времени.
Обсуждать
Я не большой технический эксперт.Я могу только догадываться о преимуществах этого дизайна, исходя из логики кода, и есть ли другие проекты, которые не спроектированы таким образом, и возможно ли это.Если у вас есть другие решения, пожалуйста, оставить сообщение для обсуждения.