Дело в том, что я просто случайно использовал RocketMq в своем проекте по написанию резюме, я только что получил сообщение от Boss, показывающее, что это рекрутер Али, и я только что отправил это резюме, всего несколько дней спустя, Раздался телефонный звонок, и первый вопрос, который возник, что вы говорили о принципе RocketMq, я был ошарашен, в голове было пусто...
После этого я также задумался. Я действительно использовал RocketMq для разработки приложений для производителей и потребителей. В RocketMq не так много основных вещей. Просто поймите память. Если вы мне не верите, читайте дальше:
Что такое RocketMQ?
Промежуточное ПО для сообщений модели распределенной очереди на чистом Java, отличающееся высокой доступностью, высокой надежностью, высокой производительностью в реальном времени и низкой задержкой. (Просто запомните это предложение)
Какова функция RocketMq
Сначала ответьте на основные функции промежуточного программного обеспечения сообщений.
- 1. Разделение бизнеса: это также модель сообщений публикации и подписки. Производитель отправляет инструкцию в MQ, а затем последующие потребители, которые подписываются на этот тип инструкции, получат инструкцию и выполнят соответствующую логику Весь процесс не имеет ничего общего с конкретным бизнесом и абстрагируется в процесс отправка инструкций, хранение инструкций и использование инструкций.
- 2. Сброс пиков фронтенда: запросы, инициированные фронтендом, не могут быть обработаны бэкендом за короткий промежуток времени, они могут накапливаться в MQ, а бэкенд обрабатывается в определенном порядке. Так реализована система seckill.
Поговорим об особенностях RocketMq:
- 3. Емкость накопления 100 миллионов сообщений уровня, совокупная емкость миллионов сообщений в одной очереди.
- 4. Высокая доступность. Сервер-посредник поддерживает синхронную двойную запись с несколькими ведущими и несколькими ведомыми и асинхронную репликацию с несколькими ведущими и ведомыми, при которой синхронная двойная запись может гарантировать, что сообщения не будут потеряны.
- 5. Высокая надежность: у производителя есть три способа отправить сообщение брокеру: синхронный, асинхронный и односторонний.И синхронный, и асинхронный могут обеспечить успешную отправку сообщения. У Broker есть две стратегии сброса сообщений: синхронный сброс и асинхронный сброс, где синхронный сброс может гарантировать успешное сохранение сообщений на диск. Существует также два типа режимов потребления потребителей: потребление кластера и широковещательное потребление.По умолчанию используется потребление кластера.Если потребитель зависает в режиме кластера, другие потребители в группе возьмут на себя управление. Подводя итог, можно сказать, что он очень надежен.
- 6. Поддержка распределенных транзакционных сообщений: здесь используется полуподтверждение сообщений и механизмы проверки сообщений для обеспечения распределенных транзакционных сообщений, которые будут подробно описаны ниже.
- 7. Поддержка фильтрации сообщений: рекомендуется использовать фильтрацию тегов на стороне потребительского бизнеса.
- 8. Поддержка последовательных сообщений: сообщения хранятся в режиме FIFO очереди в Брокере, то есть передача является последовательной, пока гарантируется последовательность потребления.
- 9. Поддержка сообщений о времени и отложенных сообщений: механизм сообщений о времени в Брокере, сообщения, отправленные Брокеру, не будут потребляться Потребителем немедленно, но не будут потребляться до определенного периода времени. То же самое верно и для отложенных сообщений, которые будут потребляться Потребителем после определенной задержки.
Хорошо, что можно так много сказать 😄
Архитектура RoctetMq
Вернемся к первому вопросу, в чем принцип RocketMq, то есть как его добиться, сначала посмотрите на картинку
RocketMq состоит из четырех частей: NameServer, Broker, Producer и Consumer, каждая из которых развернута в кластере.
NameServer
NameServer — это сервер без сохранения состояния, роль которого аналогична роли Zookeeper в Dubbo, но легче, чем у Zookeeper.
Функции:
- Каждый узел NameServer независим друг от друга и не имеет никакого информационного взаимодействия друг с другом.
- Сервер имен почти не имеет состояния. Он идентифицирует себя как псевдокластер, развертывая несколько узлов. Производитель получает информацию о маршрутизации темы от сервера имен перед отправкой сообщения, то есть, к какому брокеру и потребителю будут отправлены сообщения. также периодически отправлять сообщение с NameServer.Чтобы получить информацию о маршрутизации темы, Брокер регистрируется на NameServer при его запуске, регулярно устанавливает пульсирующее соединение и регулярно синхронизирует поддерживаемую тему с NameServer.
Есть две основные функции:
- 1. Поддерживать длительное соединение с узлом Брокера.
- 2. Сохраняйте информацию о маршрутизации темы.
Broker
Роль хранилища и ретрансляции сообщений отвечает за хранение и пересылку сообщений.
- Брокер поддерживает внутреннюю очередь сообщений, которая используется для хранения индекса сообщения Местом, где сообщение фактически хранится, является CommitLog (файл журнала).
- Один брокер поддерживает длительные соединения и тактовые импульсы со всеми серверами имен и будет регулярно синхронизировать информацию темы с серверами имен.Нижний уровень связи с серверами имен реализуется через Netty.
Producer
Производитель сообщений, бизнес-сторона, отвечает за отправку сообщений, которые реализуются и распространяются самими пользователями.
Балансировка нагрузки производителя
Балансировка нагрузки источника реализована с помощью MQFaultStratege.selectOneMessageQueue(). Этот метод заключается в случайном выборе брокера для отправки сообщения для достижения эффекта балансировки нагрузки.Критерии выбора: старайтесь не выбирать только что выбранного посредника, старайтесь не выбирать посредника, который отправляет последнее сообщение с большой задержкой или без ответа. , то есть найти доступного брокера. (Исходный код не выложен)
Три стратегии отправки
Producer отправляет сообщения тремя способами: синхронным, асинхронным и односторонним.
- Синхронный: синхронная отправка означает, что отправитель отправляет данные и ждет, пока получатель отправит ответ, прежде чем отправлять следующий пакет данных. Обычно используется для уведомлений о важных сообщениях, таких как электронные письма с важными уведомлениями или маркетинговые текстовые сообщения.
- Асинхронная: асинхронная отправка означает, что после того, как отправитель отправит данные, он отправляет следующий пакет данных, не дожидаясь, пока получатель отправит ответ. Обычно он используется в сценариях, где ссылка может занять много времени, а время отклика является критичным. Например, после загрузки видео уведомление запускает службу транскодирования.
- Односторонний: Односторонняя отправка означает, что он отвечает только за отправку сообщений, не дожидаясь ответа получателя, и нет функции обратного вызова.Это подходит для сценариев, которые занимают меньше времени и не требуют высокой надежности, таких как как сбор журналов.
Consumer
Потребитель сообщений, отвечающий за потребление сообщений, реализуется пользователем и развертывается в кластере.
Двухтактная модель потребления
- PULL: потребитель по запросу активно извлекает сообщения от брокера для потребления. Пока сообщение извлекается, запускается процесс потребления, который называется активным потреблением.
- PUSH: потребители push — это прослушиватели для регистрации сообщений, а прослушиватели реализуются самими пользователями. Когда сообщение достигает сервера брокера, прослушиватель извлекает сообщение, а затем запускает процесс потребления. Но на самом деле он все равно тянет сообщения от брокера, что называется пассивным потреблением.
кластер или широковещательный
В зависимости от бизнес-требований по умолчанию используется потребление кластера.
- Потребление кластера: сообщение в брокере будет отправлено единственному потребителю в группе потребителей, подписанной на эту тему для потребления. Если этот потребитель умрет, другие потребители в группе возьмут на себя его потребление.
- Широковещательное потребление: сообщение в брокере будет отправлено каждому потребителю в группе потребителей, подписанной на эту тему, для потребления.
Балансировка нагрузки потребителей
- Балансировка нагрузки потребителей относится к назначению очередей сообщений в MessageQueue конкретным потребителям в группе потребителей.
- Потребитель создаст экземпляр rebalanceImpl при запуске, этот класс отвечает за балансировку нагрузки на стороне потребителя. Вызовите allocateMesasgeQueueStratage.allocate() через rebalanceImpl для завершения балансировки нагрузки.
- Каждый раз, когда в группу добавляется новый потребитель, распределение будет переназначаться. Балансировка нагрузки выполняется автоматически каждые 10 секунд.
Модель сообщений RocketMq (специализированный термин)
Новички могут понять.
Message
Это сообщение должно быть передано.Сообщение должно иметь тему, а также сообщение может иметь необязательный тег (тег) и дополнительную пару ключ-значение, которую можно использовать для установки бизнес-ключа, который легко найти на сервере брокера во время разработки информация.
Topic
Тема — это тип сообщения первого уровня, каждое сообщение имеет тему, как и почтовый адрес письма. Тема - это наш конкретный бизнес.Например, система электронной коммерции может иметь новости о заказах, новости о товарах, новости о покупках, новости о транзакциях и так далее. Тема имеет очень свободную связь с производителями и потребителями.Производители и темы могут быть «один ко многим», «многие к одному» или «многие ко многим», и то же самое верно для потребителей.
Tag
Метка — это тип сообщения второго уровня, который может использоваться в качестве бизнес-различения второго уровня в рамках определенного типа бизнеса, его основная цель — фильтрация сообщений на стороне потребителя. Например, сообщения о покупке делятся на сообщения о создании покупки, сообщения о просмотре покупки, push-сообщения о покупке, сообщения о покупке на складе, сообщения о покупке и т. д. Эти сообщения имеют одну и ту же тему и разные теги и могут использоваться только тогда, когда потребителю нужно покупка складских сообщений.Тег для достижения фильтрации, теги, которые не куплены и не сохранены в сообщении, не будут обрабатываться.
Group
Группа, которая может быть разделена на группу потребителей ProducerGroup, производящую комбинацию ConsumerGroup, группа может подписаться на несколько тем. Вообще говоря, производители и потребители определенного вида одного и того же бизнеса помещаются в группу.
Message Queue
Очередь сообщений, тему можно разделить на несколько очередей сообщений. Тема — это просто логическое понятие.Очередь сообщений — это физическая единица управления сообщением.При отправке сообщения брокер опрашивает все очереди сообщений, содержащие тему, а затем отправляет сообщение. С помощью очереди сообщений хранилище сообщений может быть распределенным и кластеризованным с горизонтальной масштабируемостью.
offset
Относится к смещению в очереди сообщений, которое можно рассматривать как индекс, а очередь сообщений можно рассматривать как массив. Смещение — тип java long, бит 64. По идее, через 100 лет оно не переполнится, поэтому можно считать, что очередь сообщений — это структура данных с бесконечной длиной.
основная проблема
последовательное сообщение
- Как гарантировать последовательные сообщения?
Очередь сообщений, отправляемая производителем брокеру, представляет собой FIFO, поэтому отправка является последовательной, а сообщения в одной очереди — последовательными. Использование нескольких очередей одновременно не может гарантировать упорядочение сообщений. Следовательно, для одной и той же темы и одной и той же очереди поток отправляет сообщение при отправке сообщения, а поток потребляет сообщение в очереди при потреблении.
- Последующие действия: как убедиться, что сообщения отправляются в одну и ту же очередь?
RocketMQ предоставляет нам интерфейс MessageQueueSelector, который можно переписать для реализации нашего собственного алгоритма, такого как оценка i%2==0, а затем отправить сообщение в очередь1 или отправить его в очередь2.
фильтрация сообщений
- Как реализовать фильтрацию сообщений?
Есть два решения. Одно — фильтровать в соответствии с логикой дедупликации потребителя на стороне брокера. Преимущество этого — избежать передачи бесполезных сообщений на сторону потребителя. Недостаток — увеличение нагрузки на брокера и относительно сложна в реализации. Другой - фильтрация на стороне потребителя, например, дедупликация в соответствии с тегом, установленным в сообщении.Преимущество этого заключается в простоте реализации, но недостаток в том, что большое количество бесполезных сообщений достигает стороны потребителя и можно только выбросить и не обрабатывать.
дедупликация сообщений
- Если несколько повторяющихся сообщений доставляются Потребителю из-за сети или по другим причинам, как вы устраняете дубликаты сообщений?
Это должно сначала говорить о принципе идемпотентности сообщений: то есть результат нескольких запросов, инициированных пользователем для одной и той же операции, одинаков, и разные результаты не будут получены из-за нескольких операций. Пока поддерживается идемпотентность, независимо от того, сколько сообщений приходит, конечный результат обработки один и тот же, что должно быть реализовано на стороне потребителя.
Решение для дедупликации: поскольку каждое сообщение имеет идентификатор MessageId, гарантируется, что каждое сообщение имеет уникальный ключ, который может быть первичным ключом или ограничением уникальности базы данных или ключом в кэше Redis. Прежде чем использовать сообщение, сначала проверьте его. .Независимо от того, существует ли уникальный ключ в базе данных или кэше, если он существует, сообщение не будет обработано.Если потребление прошло успешно, убедитесь, что уникальный ключ вставлен в таблицу дедупликации.
Распределенный обмен транзакционными сообщениями
- Вы знаете полуновости? Как RocketMQ реализует сообщения о распределенных транзакциях?
Полусообщение: относится к сообщению, которое временно не может быть использовано потребителем.Производитель успешно отправляет сообщение брокеру, но сообщение помечается как «временно недоставленное» только после того, как производитель завершит выполнение локальной транзакции и подтвердит ее. дважды Потребитель может использовать это сообщение.
На приведенном выше рисунке показан процесс реализации сообщения о распределенной транзакции, который основан на полусообщении, вторичном подтверждении и механизме проверки сообщения.
- 1. Производитель отправляет полусообщение брокеру
- 2. Производитель получает ответ, и сообщение успешно отправляется.В это время сообщение представляет собой половинное сообщение, помеченное как «недоставленное», и Потребитель не может его использовать.
- 3. Сторона производителя выполняет локальные транзакции.
- 4. При нормальных обстоятельствах, когда выполняется локальная транзакция, Производитель отправляет Коммит/Откат Брокеру. Если это фиксация, Брокер помечает половинное сообщение как обычное сообщение, и Потребитель может его использовать. это Откат, Брокер отклонит сообщение.
- 5. В нештатной ситуации Брокерская сторона не может дождаться второго подтверждения. Через определенный период времени будут опрошены все полусообщения, а затем будет опрошена реализация полусообщений на стороне Продюсера.
- 6. Сторона производителя запрашивает статус локальной транзакции.
- 7. Отправьте брокеру фиксацию/откат в соответствии со статусом транзакции. (5, 6, 7 — обратные сообщения)
доступность сообщений
- Как RocketMQ может гарантировать доступность/надежность сообщений? (Другой способ задать этот вопрос: Как сделать так, чтобы сообщения не терялись)
Ответ заключается в следующем, с точки зрения производителя, потребителя и брокера.
С точки зрения производителя, как убедиться, что сообщение успешно отправлено брокеру?
- 1. Можно использовать синхронную передачу, то есть отправлять часть данных и ждать, пока получатель вернет ответ, прежде чем отправлять следующий пакет данных. Если ответ в порядке, это означает, что сообщение было успешно отправлено брокеру, а тайм-аут состояния или сбой запускает вторую повторную попытку.
- 2. Может быть принят способ доставки сообщений о распределенных транзакциях.
- 3. Если время ожидания сообщения истекло после отправки, вы также можете проверить, успешно ли оно сохранено в Брокере, запросив API журнала.
В общем, Producer по-прежнему гарантирует синхронную отправку.
С точки зрения брокера, как обеспечить постоянство сообщений?
- 1. Пока сообщение сохраняется в CommitLog (файл журнала), даже если брокер выходит из строя, неиспользованные сообщения могут быть восстановлены и использованы снова.
- 2. Механизм сброса брокера: синхронный сброс и асинхронный сброс, независимо от того, какой вид сброса может гарантировать, что сообщение должно быть сохранено в кэше страниц (в памяти), но синхронный сброс более надежен, это данные после того, как производитель отправляет После сохранения на диск ответ возвращается источнику.
- 3. Брокер поддерживает режимы синхронной двойной записи с несколькими ведущими и несколькими подчиненными и асинхронные режимы репликации с несколькими ведущими и несколькими подчиненными.Сообщения отправляются на главный хост, но потребление может потребляться как ведущим, так и подчиненным. Синхронный режим двойной записи может гарантировать, что даже в случае выхода из строя ведущего, сообщение должно быть сохранено в ведомом, что гарантирует, что сообщение не будет потеряно.
С точки зрения потребителя, как обеспечить успешное использование сообщений?
- Сам потребитель поддерживает постоянное смещение (соответствующее минимальному смещению в очереди сообщений), которое используется для пометки нижнего индекса сообщения, которое было успешно использовано и успешно отправлено обратно брокеру. Если Потребитель не может потреблять, он отправит обратно статус сбоя потребления Брокеру, и он обновит свое смещение, когда оно будет успешно отправлено обратно. Если брокер зависает при отправке обратно брокеру, потребитель будет периодически повторять попытку.Если потребитель и посредник зависают вместе, сообщение по-прежнему сохраняется на стороне посредника, а смещение на стороне потребителя также сохраняется. .После перезапуска продолжает тянуть предыдущее смещение.Сообщения расходуются.
Реализация щеточного диска
RocketMQ предоставляет две стратегии сброса: синхронный сброс и асинхронный сброс.
- Синхронная очистка: после того, как сообщение достигает памяти брокера, оно должно быть сброшено в файл журнала commitLog, чтобы считаться успешным, а затем возвращается сообщение о том, что данные отправителя были успешно отправлены.
- Асинхронная очистка. Асинхронная очистка означает, что после того, как сообщение достигает памяти посредника, он возвращает, что данные источника были успешно отправлены, и пробуждает поток для сохранения данных в файле журнала CommitLog.
Анализ преимуществ и недостатков: Синхронный сброс гарантирует, что сообщения не будут потеряны, но время отклика примерно на 10% больше, чем при асинхронном сбросе, что подходит для сценариев, требующих высокой надежности сообщений. Пропускная способность асинхронной очистки диска относительно высока, а RT невелико, но при выключении брокера некоторые данные в памяти будут потеряны, что подходит для сценариев с высокими требованиями к пропускной способности.
балансировки нагрузки
- Можете ли вы рассказать мне, как реализована балансировка нагрузки RocketMQ?
RocketMQ — это служба распределенных сообщений, и она выполняется клиентами, которые воспроизводят и потребляют во время балансировки нагрузки.Как упоминалось выше, nameServer хранит информацию о маршрутизации темы, а маршрутизация записывает коммуникационный адрес узла кластера брокера, имя брокера и количество очередей чтения и записи. Очередь записи writeQueue представляет количество очередей, в которые производитель может записывать.Если не настроено, значение по умолчанию равно 4, т. е. queueId равен 0, 1, 2, 3. После того, как посредник получает сообщение, он генерирует очередь сообщений в соответствии с queueId и процесс балансировки нагрузки производителя. Суть заключается в процессе выбора кластера брокера и queueId. Очередь чтения readQueue указывает количество очередей в брокере, которые могут использоваться потребителями для чтения информации.По умолчанию также равно 4, то есть queueId также равен 0, 1, 2 и 3. После того, как потребитель получит информацию о маршрутизации, он выберет идентификатор очереди и прочитает потребление данных от соответствующего брокера.
Ниже я объясню с точки зрения балансировки нагрузки производителя и балансировки нагрузки потребителя:
- Балансировка нагрузки производителя: суть в том, чтобы выбрать объект MessageQueue (внутренне содержащий BrokerName и queueId).Первый — это стратегия по умолчанию, которая случайным образом выбирает один из списка MessageQueue.Алгоритм использует самоинкрементное случайное число взять оставшуюся часть списка для получения информации о позиции, но кластер, в котором находится полученная MessageQueue, не может быть кластером, отказавшим в прошлый раз. Во-вторых, это стратегия допустимого времени ожидания. Во-первых, очередь сообщений выбирается случайным образом. Если отправка завершается сбоем из-за исключений, таких как время ожидания, другие очереди сообщений в кластере брокера будут предпочтительно выбраны для отправки. Политика по умолчанию используется, если она не найдена. .
- Балансировка нагрузки для потребителей: здесь есть шесть необязательных алгоритмов.
1. Алгоритм среднего распределения
2. Кольцевой алгоритм
3. Укажите алгоритм компьютерного зала
4. Алгоритм ближайшего компьютерного зала
5. Единый алгоритм хеширования
Алгоритм согласованного хеширования используется для загрузки, и при каждой загрузке будет перестраиваться таблица маршрутизации согласованного хэша, чтобы получить всю информацию об очереди, за которую отвечает локальный клиент. Алгоритм хеширования по умолчанию — MD5.Предполагается, что имеется 4 клиента-потребителя и 2 очереди сообщений mq1 и mq2.После хэширования они распределяются по разным позициям хэш-кольца.Согласно принципу последовательного хеширования по часовой стрелке, mq1 потребляется клиентом 2. mq2 потребляется клиентом 3.
6. Алгоритм ручной настройки
Суммировать
Сегодня я в основном говорил о базовой архитектуре RocketMq и о том, как сообщения отправляются, хранятся и потребляются.Необходимо освоить соответствующие обязанности NameServer, Broker, Producer и Consumer, а также какие усилия были предприняты с точки зрения сообщений, таких как как обеспечение высокой доступности, гарантированный порядок, обеспечение распределения, реализация фильтрации, реализация дедупликации, а также балансировка нагрузки и т. д.