Возможности RocketMQ и интервью (часть 1)

RocketMQ

1. Последовательное сообщение

RocketMQ решает проблему упорядочения сообщений по-своему:
RocketMQ определяет, в какую очередь отправляется сообщение, путем опроса всех очередей (стратегия балансировки нагрузки). В соответствии с различными службами идентификатор службы может использоваться в качестве очереди вычислений, чтобы сообщения с одним и тем же идентификатором службы последовательно отправлялись в одну и ту же очередь.После получения информации о маршрутизации очередь будет выбрана в соответствии с реализованным алгоритмом. by MessageQueueSelector Получен тот же OrderId Определенно та же очередь.

Во-вторых, дубликат сообщения

В случае нестабильной сети у RocketMQ возникнет проблема дублирования сообщений:

Дублирование сообщения при отправке

  • Когда сообщение было успешно отправлено на сервер и сохраняется, происходит сбой в сети или клиент не работает, в результате чего сервер не отвечает клиенту. Если производитель понимает, что сообщение не удалось отправить, и пытается отправить сообщение снова, потребитель получит два сообщения с одинаковым содержимым и одним и тем же идентификатором сообщения.

Дублирование сообщения о доставке

  • В сценарии потребления сообщения сообщение было доставлено потребителю и бизнес-обработка завершена.Когда клиент отправляет ответ на сервер, сеть отключается. Чтобы гарантировать, что сообщение будет потреблено хотя бы один раз, сервер очереди сообщений RocketMQ попытается снова доставить ранее обработанное сообщение после восстановления сети, а потребитель впоследствии получит два сообщения с одинаковым содержимым и одинаковым Идентификатор сообщения.

Дублирование сообщений при балансировке нагрузки

  • Когда брокер или клиент по очереди сообщений ROCKETMQ перезагружается, расширяется или сжимается, будет запущен ребакантринг, и потребители могут получать дубликаты сообщения в это время.

Основной причиной дублирования сообщений является недоступность сети. Этой проблемы нельзя избежать, пока происходит обмен данными по сети. Таким образом, решение этой проблемы заключается в том, чтобы обойти его. Тогда возникает вопрос: если потребитель получает два одинаковых сообщения, что ему делать?

  1. Бизнес-логика обработки сообщений на стороне потребителя остается идемпотентной.
  2. Убедитесь, что каждое сообщение имеет уникальный номер, и убедитесь, что обработка сообщения прошла успешно, и журнал таблицы дедупликации отображается одновременно.

Пункт 1 легко понять, пока поддерживается идемпотентность, неважно, сколько придет дублирующихся сообщений, окончательный результат будет одним и тем же.
Второй принцип заключается в использовании таблицы журнала для записи идентификатора сообщения, которое было успешно обработано.Если идентификатор вновь пришедшего сообщения уже есть в таблице журнала, то это сообщение не будет обработано.
Первое решение, очевидно, должно быть реализовано на стороне потребителя, и оно не относится к функции, которую должна реализовать система сообщений.
Статья 2 может быть реализована система обмена сообщениями, бизнес-конце может быть достигнута. Вероятность дублирующих сообщений в нормальных обстоятельствах на самом деле очень мала, если это реализуется системой обмена сообщениями, она, безусловно, будет очень доступна пропускной способностью и влиянием системы обмена сообщениями, настолько наилучшим образом иметь дело со своими собственными сообщениями от бизнеса. Причина RocketMQ не решает проблему дублирующих сообщений.

RocketMQ Нет, не гарантирует, что сообщение не повторяется, если ваш бизнес не нужно строить дублирующие сообщения, вам нужно идти тяжело на деловой стороне.

Как не акцентировать внимание на бизнес-стороне? Принцип очень прост, шаги следующие:

  1. Запишите msgID каждого сообщения
  2. Когда приходит новое сообщение, проверьте, был ли записан msgID сообщения, если да, отбросьте его, в противном случае используйте его Затем сохраните msgID в кэше.Если это сообщение будет отправлено повторно, проверьте, есть ли значение ключа в redis.Если оно существует, сообщение не будет обработано.

3. Деловые новости

3.1 Введение концепции

Сообщение транзакции: очередь сообщений RocketMQ обеспечивает распределенную функцию транзакции, аналогичную X / Open XA, и возможная согласованность распределенных транзакций может быть достигнута благодаря сообщению Queue Transectmq. Semi-Message: сообщение, которое не может быть доставлено временно. Отправитель успешно отправил сообщение на сервер RocketMQ по очереди сообщений, но сервер не получил вторичное подтверждение сообщения от производителя. В это время сообщение отмечается как «временно недовериемость». «Состояние, сообщения в этом состоянии являются полуобусину. Проверка сообщений: Благодаря отключателям сети перезапускается приложение производителей, перезапускается и т. Д., Вторичное подтверждение сообщения транзакции теряется. Когда сервер rocketmq roadeTmq сообщений находит, что сообщение «Половина сообщения» в течение длительного времени через сканирование, нужно Чтобы принять инициативу, чтобы отправить сообщение в сообщение. Производитель запрашивает окончательный статус (Commun или Rollback) сообщения, и этот процесс является сообщением об ошибках.

3.2 Применимые сценарии

Примеры применимых сценариев для сообщений о транзакциях: В процессе размещения заказа через корзину для покупок запись пользователя находится в системе корзины покупок, а запись заказа транзакции - в системе транзакций.Данные между двумя системами должны быть в конечном счете согласованы, что может быть обработано через сообщения о транзакциях. После того, как система транзакций размещает заказ, она отправляет сообщение заказа транзакции в очередь сообщений RocketMQ, а система корзины покупок подписывается на сообщение заказа транзакции очереди сообщений RocketMQ, выполняет соответствующую бизнес-обработку и обновляет данные корзины покупок.

3.3 Как использовать

Интерактивный процесс

Процесс взаимодействия с транзакционным сообщением очереди сообщений RocketMQ выглядит следующим образом: сообщение о транзакции

в:

  1. Отправитель отправляет сообщения на сервер очереди сообщений RocketMQ.
  2. После того, как сервер успешно удерживается, сервер успешно отправляет отправителю подтверждающее сообщение ACK, и сообщение представляет собой половину сообщения.
  3. Отправитель начинает выполнять локальную логику транзакции.
  4. Отправитель отправляет вторичное подтверждение (Commit или Rollback) на сервер в соответствии с результатом выполнения локальной транзакции, и сервер пометит полусообщение как доставляемое после получения статуса Commit, и подписчик в конечном итоге получит сообщение; сервер получает Rollback Состояние удаляет половину сообщения, и подписчик не примет сообщение.
  5. В особом случае отключения сети или перезапуска приложения вторичное подтверждение, отправленное на шаге 4 выше, в конце концов не достигает сервера, и сервер инициирует проверку сообщения для сообщения через фиксированное время.
  6. После того, как отправитель получит ответное сообщение, ему необходимо проверить окончательный результат выполнения локальной транзакции соответствующего сообщения.
  7. Отправитель снова отправляет второе подтверждение в соответствии с окончательным состоянием локальной транзакции, полученным в результате проверки, и сервер по-прежнему обрабатывает половинное сообщение в соответствии с шагом 4.

инструкция: Сообщение транзакции передает шаги соответствия 1, 2, 3, 4, проверка сообщений транзакции соответствует шагам 5, 6, 7.

3.4 Примечания

  1. Идентификатор группы сообщения о транзакции не может использоваться совместно с идентификатором группы других типов сообщений. В отличие от других типов сообщений, сообщения о транзакциях имеют механизм проверки.Во время проверки сервер очереди сообщений RocketMQ будет запрашивать клиента в соответствии с идентификатором группы.
  2. При создании производителя сообщений о транзакциях с помощью ONSFactory.createTransactionProducer необходимо указать класс реализации LocalTransactionChecker для обработки проверки сообщений о транзакциях в нештатных случаях.
  3. После отправки сообщения транзакции и завершения локальной транзакции в методе execute могут быть возвращены следующие три состояния:
    • TransactionStatus.CommitTransaction фиксирует транзакцию, позволяя подписчикам использовать сообщение.
    • TransactionStatus.RollbackTransaction откатывает транзакцию, сообщение отбрасывается и не может быть использовано.
    • TransactionStatus.Unknow временно не может судить о статусе Ожидается, что сервер очереди сообщений RocketMQ проверит сообщение отправителю через фиксированное время.
  4. Максимальное время первого просмотра сообщения можно установить для каждого сообщения следующими способами:
Message message = new Message();
 // 在消息属性中添加第一次消息回查的最快时间,单位秒。例如,以下设置实际第一次回查时间为 120 秒 ~ 125 秒之间
 message.putUserProperties(PropertyKeyConst.CheckImmunityTimeInSeconds,"120");
// 以上方式只确定事务消息的第一次回查的最快时间,实际回查时间向后浮动0~5秒;如第一次回查后事务仍未提交,后续每隔5秒回查一次。

4. Как продюсер отправляет сообщения

4.1 Надежная синхронная передача

Синхронная отправка относится к методу связи, при котором отправитель сообщения отправляет данные и отправляет следующий пакет данных после получения ответа от получателя.

Сценарии применения: Этот метод имеет широкий спектр сценариев применения, таких как электронные письма с важными уведомлениями, регистрационные SMS-уведомления и маркетинговые SMS-системы.

4.2 Надежная асинхронная коробка передач

Асинхронная отправка означает, что после того, как отправитель отправит данные, он не ждет, пока получатель отправит ответ, а затем отправляет следующий пакет данных. Асинхронная отправка MQ требует, чтобы пользователь реализовал интерфейс обратного вызова для асинхронной отправки (SendCallback). После отправки сообщения отправителю сообщения не нужно ждать ответа от сервера, чтобы вернуться и отправить второе сообщение. Отправитель получает ответ сервера через интерфейс обратного вызова и обрабатывает результат ответа.

Сценарии применения: асинхронная отправка обычно используется в бизнес-сценариях, где ссылка занимает много времени и зависит от времени отклика RT, например, уведомление о запуске службы перекодирования после загрузки видео пользователя и уведомление о отправке результата перекодирования после транскодирование завершено.

4.3 Односторонняя (Oneway) отправка

Односторонняя (Oneway) отправка характеризуется тем, что отправитель отвечает только за отправку сообщений, не ждет ответа сервера и не срабатывает функция обратного вызова, то есть только отправляет запрос и не ждет ответа. Процесс отправки сообщений таким образом занимает очень короткое время, обычно на уровне микросекунд.

Сценарии применения: Подходит для некоторого потребления времени, но сценарий для требований надежности, таких как сбор журнала. В следующей таблице приведены характеристики и основные различия трех.

4.4 Сравнение трех методов

5. Новости в реальном времени

RocketMQ использует режим длительного опроса + PULL для обеспечения сохранения сообщений.

6. Подписка на новости

Очередь сообщений RocketMQ поддерживает следующие два метода подписки:

Кластерная подписка: все потребители, идентифицированные одним и тем же идентификатором группы, одинаково разделяют сообщения о потреблении. Например, если тема имеет 9 сообщений, а идентификатор группы имеет 3 экземпляра потребителя, то в режиме потребления кластера каждый экземпляр распределяется равномерно, и потребляются только 3 сообщения.

 // 集群订阅方式设置(不设置的情况下,默认为集群订阅方式)
 properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING);

подписка на трансляцию: все потребители, идентифицированные одним и тем же идентификатором группы, будут использовать сообщение один раз. Например, если тема имеет 9 сообщений, а идентификатор группы имеет 3 экземпляра потребителя, то каждый экземпляр будет потреблять 9 сообщений в режиме широковещательного потребления.

 // 广播订阅方式设置
 properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);

7. Сообщения о времени и сообщения с задержкой

  • Сообщение о времени: источник отправляет сообщение на сервер RocketMQ из очереди сообщений, но не ожидает, что сообщение будет доставлено немедленно, а откладывает доставку потребителю для потребления в определенное время после текущей точки времени. сообщение о времени.
  • Отложенное сообщение: Производитель отправляет сообщение на сервер RocketMQ из очереди сообщений, но не ожидает, что сообщение будет доставлено немедленно. Вместо этого оно доставляется Потребителю для потребления после определенной задержки. Сообщение является задержанным сообщением.

Сообщение о времени и задержкам. Есть некоторые различия в конфигурации кода, но окончательное достижение одинаково: сообщение не сразу доставляется после отправки в службу очереди сообщений RocketMQ, но в соответствии с задержкой атрибута в соответствии с атрибутом Сообщение дают потребителям.

Временные сообщения и сообщения с задержкой подходят для некоторых из следующих сценариев:

  • Существуют требования к временному окну для создания и потребления сообщений: например, в транзакции электронной коммерции, когда заказ закрывается без оплаты, при создании заказа будет отправлено отложенное сообщение. Это сообщение будет доставлено потребителю через 30 минут.После получения этого сообщения потребителю необходимо определить, был ли оплачен соответствующий заказ. Если оплата не будет завершена, заказ будет закрыт. Игнорировать, если оплата уже была произведена.
  • Запускайте некоторые запланированные задачи с помощью сообщений, например, отправляйте пользователю напоминание в фиксированный момент времени.