Промежуточное программное обеспечение сообщений, которое использовалось ранее,ActiveMQ
, из-за отсутствия документов по техническому обслуживанию и данным проблема не проста в устранении, поэтому при этой возможности после исследования и выбора для использованияRocketMQ
, давайте поделимся основными знаниями об этом.
RocketMQ
чистыйJava
, распределенное ПО промежуточного слоя для сообщений с открытым исходным кодом, ранее известное какMetaQ
, представляет собой промежуточное программное обеспечение сообщений модели очереди, разработанное Али, а затем открытое дляapache
фундамент сталapache
Проект с открытым исходным кодом высшего уровня с высокой производительностью, высокой надежностью, высокими характеристиками реального времени и распределенными характеристиками.
заимствовано изKafka
, сравните два,RocketMQ
Предпочитают стабильную и бизнес-ориентированную деятельность.
1. Общая структура
Есть четыре основных компонентаСервер имен, Брокер, Производитель, Потребитель
Каждый модуль на приведенном выше рисунке будет устанавливать длинные связи друг с другом для связи друг с другом.Этот архитектурный шаблон связан сDubbo
Это немного похоже, есть реестр для поддержания соответствующей информации, разница в том, чтоBroker
Этот модуль играет важную роль в системе сообщений.
2. Преимущества и особенности
Стабильность, высокая производительность и расширенные типы сообщений
В Интернете уже есть много связанных преимуществ и недостатков. Вот два рекомендуемых:
Введение в производительность Ali RocketMQ
Сравнение RocketMQ и kafka (18 отличий)
3. Кратко опишите понятия каждого существительного
Topic
Тема сообщения, представляет собой логическую классификацию сообщений, например, к какому типу операций относится этот тип сообщения. Например, связанные с запасами, связанные с заказами, связанные с деятельностью и т. д. Это понимается как абстрактная спецификация классификации.Операции каждого классифицируются в соответствии с темой.Разные производители отправляют сообщения в указанную тему, а разные потребители подписываются на указанную тему, извлекают потребление сообщений сверху и защищают базовое хранилище сообщений.
Tag
Для дальнейшего уточнения темы в официальной документации Alibaba Cloud есть эта строка комментариев.«Тег сообщения, который можно понимать как тег в Gmail, переклассифицирует сообщения, чтобы потребители могли указать условия фильтрации для фильтрации на сервере версии очереди сообщений RocketMQ»
Подробнее см. здесь:Рекомендации по темам и тегам
Message
это в очереди сообщенийноситель сообщения.
Отправка сообщения относится к отправке в тему темы, где каждое сообщение включает следующие части:
- Message ID
Глобально уникальный идентификатор сообщения, заданныйRocketMQ
Автоматически генерируется в процессе отправки для уникальной идентификации фрагмента информации.
- Message key
Бизнес-идентификатор сообщения от производителя сообщения.Producre
Установите для уникальной идентификации бизнес-логики.
- Message Body
Тело содержимого, переносимое сообщением, как правило, настраивает содержимое, доставляемое здесь, не забудьте сериализовать содержимое сообщения.
Конкретная структура ядра сообщения может относиться к отображению в консоли.Message Detail
MessageQueue
упоминалось ранееTopic
это абстрактная концепция, в которой фактическое сообщение отправляется и потребляетсяMessage Queue
, каждыйTopic
Ниже может быть несколько очередей сообщений. Причиной введения очередей является повышение доступности и гибкости в зависимости от характера очередей.FIFO
, сообщения, отправленные первыми, используются первыми.
Например, по умолчаниюTopic
назначит четыреMessage Queue
(конфигурация параметра: defaultTopicQueueNums), если их дваBroker
, он будет разделен на два, если один строить локальноBroker
, то вы должны увидеть, как я:Четыре очереди сообщений под одним брокером
Пункт назначения отправки сообщения и источник получения потребленияMessage Queue
Group
Информация о группе, группа может подписаться на несколькоTopic
.
В частности, его можно разделить наProducer Group
а такжеConsumer Group
, в приложении может быть создано несколько групп отправителей и групп потребителей, но рекомендуемое использование заключается в том, что приложение указываетProducer Group
, информация об отправителе единой системы обмена сообщениями.
Как правило, приложению нужно только настроить группу потребителей и отдельно подписаться на тему для потребления. еслиTopic
В приложении я хочу установить две логики обработки,Затем вы можете настроить разные группы потребителей и установить разные процессоры для одного и того же сообщения темы.Handler
.
Offset
Величина смещения, используемая для сохранения хода потребления сообщения.
Из вышеизложенного понятно, чтоBroker
Будет несколькоMessage Queue
, нам нужно использовать индекс для записи места потребления сообщения. пройти черезOffset
Вы можете найти позицию сообщения о текущем потреблении, указывающуюConsumer
следующий изOffest
Последняя позиция потребляет сообщение.
В кодеOffset
даlong
базовый тип, в соответствии с которым осуществляется доступMessage Queue
сообщение в указанном месте.
Заказать заказ сообщения
Тип сообщения, который публикуется и используется последовательно, разделенный наГлобальные последовательные сообщения и последовательные сообщения разделов.
глобальный порядок: Это легче понять, для того жеTopic
Сообщения, независимо от того, сколько потребителей, сообщения могут быть удалены из очереди только одно за другим по порядку, а следующееMessage
Потребление зависит от завершения предыдущего потребления. Он подходит для сценариев с низкими требованиями к производительности, но этот режим выбирается редко.
порядок разделов: черезSharding Key
для разделения блоков. Сообщения в пределах одного раздела следуют строгимFIFO
Публикуйте и потребляйте последовательно.
Вот пример из документации:
Когда создается заказ электронной коммерции, идентификатор заказа используется в качестве ключа разделения, тогда сообщение о создании заказа, сообщение об оплате заказа, сообщение о возврате заказа и сообщение о логистике заказа, относящиеся к тому же заказу, будут использоваться в порядке, в котором они освобождены.
в состоянии пройтиSharding Key
Чтобы обеспечить последовательную отправку и потребление сообщений одного типа и пользователей, он не только обеспечивает высокую параллельную обработку сообщений, но и обеспечивает непрерывность бизнеса.
На картинке выше показанопорядок разделов, с точки зрения реализации кода, вам нужно установить пользовательскийSelector
, а затем для входного параметраarg
парсинг и выбор политикиmq
, такие как общий хэш по модулю, стратегия выбора может относиться к этому классу:SelectMessageQueueByHash
шаблон потребления сообщений
существуетRocketMQ
, реализовано два режима потребления
-
ВЫТЯГИВАЮЩИЙ режим:
Consumer
Активен с сервера сообщенийBroker
Получите сообщение. -
НАЖИМНОЙ режим:сервер сообщений
Broker
Активно отправляйте сообщения наConsumer
.
Цитируя описание [Магазин тофу Fujiwara-]:
-
MQPullConsumer:Процесс получения сообщений должен быть написан пользователем, сначала через предполагаемое потребление.
Topic
получитьMessageQueue
сбор, обходMessageQueue
установить, то для каждогоMessageQueue
Извлекать сообщения пакетами, после одной выборки записывать начало очереди для выборки в следующий разoffset
, пока он не закончится, затем измените другойMessageQueue
. -
MQPushConsumer:
consumer
Инкапсулировать процесс опроса и зарегистрироватьсяMessageListener
Слушатель, проснись после получения сообщенияMessageListener
изconsumeMessage()
для потребления пользователю кажется, что сообщение отправляется (push
) над.
ПроверятьSpringBoot
внутри@RocketMQMessageListener
Реализация аннотаций и групповое использование вторичной инкапсуляцииRocketMQ
, обнаружил, что используя обаMQPushConsumer
, упакованныйpull
процесс голосования, поэтому можно считать, чтоRocketMQ
используетPull
Режим потребления режима вытягивания.
повтор сообщения
Существует три семантики сообщений:
- Не более одного раза
- Хотя бы один раз
- Ровно раз
Из-за колебаний сети неизбежно, что при передаче сообщения по сети отправитель считает, что первая отправка не удалась, и повторяет попытку отправки, поэтому проблему, которую мы хотим решить, можно понять как:Два одинаковых сообщения, как сделать так, чтобы программа их корректно обработала?
В настоящее время распространены две практики:
1. Гарантированные новостиидемпотентность
2,система сообщенийотфильтровать повторяющиеся сообщения илипотребительФильтровать повторяющиеся сообщенияИдемпотентность:Используйте математические понятия, чтобы углубить понимание, например функциюf(x)
, x — это сообщение, то независимо от того, сколько раз сообщение будет использовано повторно,f(f(x))
Результаты такие же. Не будет побочных эффектов из-за многократного употребления, что гарантирует правильность данных.
Дедупликация сообщений:Это легко понять, каждое сообщение имеет глобально уникальныйMessage ID
, доступно в системе сообщенийBroker
фильтр у потребителяConsumer
фильтровать в.
В настоящее время во введении, не виделRocketMQ
существуетBroker
Фильтрация и дедупликация на месте, поэтому фильтровать нужно на стороне потребителя. Вы можете рассмотреть возможность добавления таблицы базы данных для записи обработанныхMessage ID
, если встречается дублирующее сообщение, оно не будет обработано, и сообщение в обработке может быть помещено первымRedis
, чтобы не потреблять одно и то же сообщение в одно и то же время~
В-четвертых, детализируйте основные модули
Из предыдущей диаграммы архитектуры мы видим, что есть четыре основных модуля, От сообщения, отправленного производителем, до потребления потребителем были испытаны следующие процессы:
сначала сProducer
перспективы, с одним изns
Установите длинную ссылку, а затем периодически отправляйте пульсации для поддержания статуса, получайтеTopic
информация о маршрутизации темы, затем с помощьюBroker Master
Установите длинные ссылки и регулярно отправляйте контрольные сообщения, чтобы определить, доступны ли они. По типу отправляемого сообщения определите, нужно лиBroker
Возвращаемое значение.
отConsumer
Под углом, сProducer
Отличие в том, что это может бытьBroker Master
подписаться на сообщения или изBroker Salve
Подпишитесь на новости, и розыгрыш здесь повторяться не будет.
потому чтоRocketMQ
чистыйJava
язык, так что вы можетеGithub
Загрузите исходный код, чтобы просмотреть подробный дизайн каждого модуля.
Nameserver
Nameserver
Для управления информацией о подписке на сообщения, отправке и потреблении сообщений каждая служба в кластере должна пройтиNameserver
понять их состояние.
Немного какDubbo
реестр вZookeeper
,NameServer
поддерживается вProducer
кластер,Broker
кластер,Consumer
Состояние службы кластера. Поддерживайте и обновляйте статус каждой службы, отправляя пакеты пульса через регулярные промежутки времени (по умолчанию 30 секунд).
①, получитьBroker
запросить, зарегистрироватьсяBroker
информация о маршрутизации
②、ИнтерфейсClient
просьба, согласноTopic
получить егоBroker
информация о маршрутизации
NameServer
Нет состояния, можно масштабировать по горизонтали. каждыйBroker
прибудет при запускеNameServer
регистр;Producer
Перед отправкой сообщения оно будет основываться наTopic
прибытьNameServer
получить маршрут (кBroker
)Информация;Consumer
также будет получаться периодическиTopic
маршрутная информация.
оNamesrv
, после понимания вышеуказанных понятий вы можете просмотреть конкретный процесс запуска в коде:
org.apache.rocketmq.namesrv.NamesrvStartup#main
Broker
Broker
Расположение сервера хранилища брокера сообщений отвечает за сохранение сообщений и управление ходом их использования.
Представьте его особенности:
① со всемиNamesrv
Узел поддерживает длинную связь и пульсацию, а время (по умолчанию 30 с) будетTopic
информация, зарегистрированная наNamesrv
.
② Отвечает за хранение сообщений вTopic
Для поддержки облегченных очередей одна машина может поддерживать десятки тысяч очередей и поддерживать модель push-pull сообщений.
③ Он может накапливать сотни миллионов сообщений и может строго гарантировать порядок сообщений.
Конкретная запись кода запуска:
org.apache.rocketmq.broker.BrokerStartup#main
В процессе инициализации много чего нужно сделать, например хранение сообщений, удаленные сервисы, фильтры и т.д., на первый взгляд немного кружится голова, и дальше я не слежу. можете перейти к исходному коду, чтобы увидеть =-=
Producer
Отправитель сообщений, основная роль отправки сообщений в очередь сообщений.
На картинке нижеRocketMQTemplate
перечислитьconvertAndSend
Схема последовательности вызова метода:
Основные шагиDefaultMQProducerImpl
Метод реализации отправки , нижний слой выбирает тот, на который нужно отправитьMessageQueue
, выполнить предварительный хук, передатьNettyClinet
отправить запрос, выполнить post-hook после отправки и, наконец, вернутьSendResult
.
В частности, его можно найти вSpringBoot
введен вRocketMQ-Starter
Зависимость, а затем отправьте сообщение для просмотра всей цепочки вызовов.
Consumer
- Основа идентификации
В потребительской группе можно задать множествоListener
, потреблять разныеTopic
сообщение ниже.
При использовании сообщений вам необходимо пройтиConsumerGroup
+ Topic
+ Tag
быть уникальнымListener
.
Поэтому одна и та же группа потребления и одна и та же тема не могут отображаться одинаково.Tag
изListener
, приложение сообщит об ошибке при запуске.
- Код
Код реализации сProducer
В этом же модуле:client
Прежде чем говорить о потреблении сообщений, давайте посмотрим, что при запуске приложение будет сканироватьRocketMQMessageListener
аннотированныйbeans
, а затем зарегистрируйте контейнер
По моему мнению, вfor
Регистрация отсканирована в циклеbean
, затем вcreateRocketMQListenerContainer
собратьDefaultRocketMQListenerContainer
, затем вSpring
Зарегистрируйтесь в контейнере и дождитесь последующего потребления.
В то же время поток демона постоянноBroker
Поднимите сообщение и используйте его после прослушивания сообщения, которое соответствует условиям:
Выше приведена примерно регистрация при запуске приложенияListener
И процесс зацикливания для получения сообщения, в частностиConsumer
Запись кода при запуске находится здесь:
org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer#start
Процесс регистрации потребителей относительно долгий, и вам нужно следить за ним медленно ~
5. Специальное использование (код и пользовательский интерфейс мониторинга)
Код загрузки (внешний и бинарный стартовые пакеты)
-
external: расширенный контент, чтобы получить станцию мониторинга
Console
Адрес следующий:
https://github.com/apache/rocketmq-externals.git
-
install: Стартовый пакет приложения для развертывания
Namesrv
а такжеBroker
https://rocketmq.apache.org/dowloading/releases/
-
код реализации программы: Открытый исходный код, внес вклад
Github
, вы можете загрузить и упаковать свои собственные идеи использования и ссылки на реализацию, учиться и учиться~
https://github.com/apache/rocketmq
Запустить сервер имен и брокер
Перед запуском убедитесь, что локальная глобальная переменная содержитJAVA_HOME
переменные, например:
$ echo ${JAVA_HOME}
/Library/Java/JavaVirtualMachines/jdk1.8.0_221.jdk/Contents/Home
в загрузкуreleases
Каталог бинарного установочного пакета
$ cd rocketmq-all-4.7.0-bin-release/bin
$ sh paly.sh
Нажмите, чтобы просмотретьplay.sh
скрипт, вы можете видеть, что он запуститсяNamesrv
а такжеBroker
После запуска двух служб вы можете перейти кBroker
Отправка и использование сообщений
Стартер с интеграцией Springboot
используется здесьSpringboot
интегрированныйstarter
Модуль, вы можете обратиться к этой статье для получения подробной информации:
Woohoo. Бросьте Arlington Terrier.com/Apache-rock…
После личной интеграции введитеDemo
изRocket
Под содержанием:
GitHub.com/VIP-AU Stories/ Это…
здесь, чтобы сказатьListener
Процесс обработки потребителя сообщений приложение продолжаетbroker
попасть под наблюдениеTopic
сообщение, а затем найдите соответствующееConsumer
Потреблять:
Вы можете проследить ссылку вызова в левой части рисунка выше, чтобы понять общую связь потребителей, потребляющих сообщения.
Начать мониторинг использования пользовательского интерфейса
введите скачанныйexternal
путь, естьrocketmq-console
каталог, чтобы предотвратить занятие порта 8080, его необходимо изменить.Есть два конкретных места для изменения:
# UI 监控系统的访问端口
server.port=10010
# Namesrv 的地址,如果有多个,请用分号 ; 分隔开
#if this value is empty,use env value rocketmq.config.namesrvAddr NAMESRV_ADDR | now, you can set it in ops page.default localhost:9876
rocketmq.config.namesrvAddr=localhost:9876
rocket-console
ЯвляетсяSpringboot
Проект после изменения соответствующей конфигурации необходимо упаковать, а затем развернуть.
$ mvn clean install -DskipTests
$ java -jar target/rocketmq-console-ng-1.0.1.jar
Затем войдите в порт, установленный ранее, вы можете увидеть платформу мониторинга:
Панель навигации в верхней части определяет имеющиеся у нее функции, например, желание увидетьTopic
которомуBroker
, а затем подписавшиеся группы потребителей можно просмотреть с помощью мониторинга:
6. План последующего исследования
В повседневном использовании в основном нет необходимости модифицироватьNameserver
а такжеBroker
Услуги этих двух модулей больше касаютсяProducer
а такжеConsumer
использовать, в то же время, они могут быть инкапсулированы дважды, чтобы заменитьDefaultMQProducer
а такжеMQConsumer
Реализуйте классы для создания отправителей и потребителей, которые подходят вашему бизнесу.
Этот обмен представляетRocketMQ
Архитектура проекта, дизайн основных модулей, расположение исходного кода, общий процесс отправки и потребления сообщений, а также введениеRocketMQTemplate
Базовое использование и мониторингConsole UI
Тем не менее, я не понял подробного дизайна величины смещения, формата хранения сообщений, синхронизации, метода асинхронной очистки, повторения сообщений и т. Д., Если будет возможность, я узнаю и поделюсь этим позже ~
использованная литература
1,Официальная документация по облачным продуктам Alibaba
2,Глоссарий Alibaba Cloud RocketMQ
3.«Глубокое понимание RocketMQ» — механизм доставки сообщений MQ.
6.RocketMQ делится полным документальным фильмом
7.Введение в производительность Ali RocketMQ