Совместное использование обучения RocketMQ

RocketMQ

Промежуточное программное обеспечение сообщений, которое использовалось ранее,ActiveMQ, из-за отсутствия документов по техническому обслуживанию и данным проблема не проста в устранении, поэтому при этой возможности после исследования и выбора для использованияRocketMQ, давайте поделимся основными знаниями об этом.

RocketMQчистыйJava, распределенное ПО промежуточного слоя для сообщений с открытым исходным кодом, ранее известное какMetaQ, представляет собой промежуточное программное обеспечение сообщений модели очереди, разработанное Али, а затем открытое дляapacheфундамент сталapacheПроект с открытым исходным кодом высшего уровня с высокой производительностью, высокой надежностью, высокими характеристиками реального времени и распределенными характеристиками.

заимствовано изKafka, сравните два,RocketMQПредпочитают стабильную и бизнес-ориентированную деятельность.

1. Общая структура

Есть четыре основных компонентаСервер имен, Брокер, Производитель, Потребитель

Каждый модуль на приведенном выше рисунке будет устанавливать длинные связи друг с другом для связи друг с другом.Этот архитектурный шаблон связан сDubboЭто немного похоже, есть реестр для поддержания соответствующей информации, разница в том, чтоBrokerЭтот модуль играет важную роль в системе сообщений.

2. Преимущества и особенности

Стабильность, высокая производительность и расширенные типы сообщений

В Интернете уже есть много связанных преимуществ и недостатков. Вот два рекомендуемых:

Введение в производительность Ali RocketMQ

Сравнение RocketMQ и kafka (18 отличий)

3. Кратко опишите понятия каждого существительного

Topic

Тема сообщения, представляет собой логическую классификацию сообщений, например, к какому типу операций относится этот тип сообщения. Например, связанные с запасами, связанные с заказами, связанные с деятельностью и т. д. Это понимается как абстрактная спецификация классификации.Операции каждого классифицируются в соответствии с темой.Разные производители отправляют сообщения в указанную тему, а разные потребители подписываются на указанную тему, извлекают потребление сообщений сверху и защищают базовое хранилище сообщений.

Tag

Для дальнейшего уточнения темы в официальной документации Alibaba Cloud есть эта строка комментариев.«Тег сообщения, который можно понимать как тег в Gmail, переклассифицирует сообщения, чтобы потребители могли указать условия фильтрации для фильтрации на сервере версии очереди сообщений RocketMQ»

Подробнее см. здесь:Рекомендации по темам и тегам

Message

это в очереди сообщенийноситель сообщения.

Отправка сообщения относится к отправке в тему темы, где каждое сообщение включает следующие части:

  • Message ID

Глобально уникальный идентификатор сообщения, заданныйRocketMQАвтоматически генерируется в процессе отправки для уникальной идентификации фрагмента информации.

  • Message key

Бизнес-идентификатор сообщения от производителя сообщения.ProducreУстановите для уникальной идентификации бизнес-логики.

  • Message Body

Тело содержимого, переносимое сообщением, как правило, настраивает содержимое, доставляемое здесь, не забудьте сериализовать содержимое сообщения.

Конкретная структура ядра сообщения может относиться к отображению в консоли.Message Detail

MessageQueue

упоминалось ранееTopicэто абстрактная концепция, в которой фактическое сообщение отправляется и потребляетсяMessage Queue, каждыйTopicНиже может быть несколько очередей сообщений. Причиной введения очередей является повышение доступности и гибкости в зависимости от характера очередей.FIFO, сообщения, отправленные первыми, используются первыми.

Например, по умолчаниюTopicназначит четыреMessage Queue(конфигурация параметра: defaultTopicQueueNums), если их дваBroker, он будет разделен на два, если один строить локальноBroker, то вы должны увидеть, как я:Четыре очереди сообщений под одним брокером

Пункт назначения отправки сообщения и источник получения потребленияMessage Queue

Group

Информация о группе, группа может подписаться на несколькоTopic.

В частности, его можно разделить наProducer Groupа такжеConsumer Group, в приложении может быть создано несколько групп отправителей и групп потребителей, но рекомендуемое использование заключается в том, что приложение указываетProducer Group, информация об отправителе единой системы обмена сообщениями.

Как правило, приложению нужно только настроить группу потребителей и отдельно подписаться на тему для потребления. еслиTopicВ приложении я хочу установить две логики обработки,Затем вы можете настроить разные группы потребителей и установить разные процессоры для одного и того же сообщения темы.Handler.

Offset

Величина смещения, используемая для сохранения хода потребления сообщения.

Из вышеизложенного понятно, чтоBrokerБудет несколькоMessage Queue, нам нужно использовать индекс для записи места потребления сообщения. пройти черезOffsetВы можете найти позицию сообщения о текущем потреблении, указывающуюConsumerследующий изOffestПоследняя позиция потребляет сообщение.

В кодеOffsetдаlongбазовый тип, в соответствии с которым осуществляется доступMessage Queueсообщение в указанном месте.

Заказать заказ сообщения

Тип сообщения, который публикуется и используется последовательно, разделенный наГлобальные последовательные сообщения и последовательные сообщения разделов.

глобальный порядок: Это легче понять, для того жеTopicСообщения, независимо от того, сколько потребителей, сообщения могут быть удалены из очереди только одно за другим по порядку, а следующееMessageПотребление зависит от завершения предыдущего потребления. Он подходит для сценариев с низкими требованиями к производительности, но этот режим выбирается редко.

порядок разделов: черезSharding Keyдля разделения блоков. Сообщения в пределах одного раздела следуют строгимFIFOПубликуйте и потребляйте последовательно.

Вот пример из документации:

Когда создается заказ электронной коммерции, идентификатор заказа используется в качестве ключа разделения, тогда сообщение о создании заказа, сообщение об оплате заказа, сообщение о возврате заказа и сообщение о логистике заказа, относящиеся к тому же заказу, будут использоваться в порядке, в котором они освобождены.

в состоянии пройтиSharding KeyЧтобы обеспечить последовательную отправку и потребление сообщений одного типа и пользователей, он не только обеспечивает высокую параллельную обработку сообщений, но и обеспечивает непрерывность бизнеса.

На картинке выше показанопорядок разделов, с точки зрения реализации кода, вам нужно установить пользовательскийSelector, а затем для входного параметраargпарсинг и выбор политикиmq, такие как общий хэш по модулю, стратегия выбора может относиться к этому классу:SelectMessageQueueByHash

шаблон потребления сообщений

существуетRocketMQ, реализовано два режима потребления

  • ВЫТЯГИВАЮЩИЙ режим: ConsumerАктивен с сервера сообщенийBrokerПолучите сообщение.
  • НАЖИМНОЙ режим:сервер сообщенийBrokerАктивно отправляйте сообщения наConsumer.

Цитируя описание [Магазин тофу Fujiwara-]:

  • MQPullConsumer:Процесс получения сообщений должен быть написан пользователем, сначала через предполагаемое потребление.TopicполучитьMessageQueueсбор, обходMessageQueueустановить, то для каждогоMessageQueueИзвлекать сообщения пакетами, после одной выборки записывать начало очереди для выборки в следующий разoffset, пока он не закончится, затем измените другойMessageQueue.

  • MQPushConsumer: consumerИнкапсулировать процесс опроса и зарегистрироватьсяMessageListenerСлушатель, проснись после получения сообщенияMessageListenerизconsumeMessage()для потребления пользователю кажется, что сообщение отправляется (push) над.

ПроверятьSpringBootвнутри@RocketMQMessageListenerРеализация аннотаций и групповое использование вторичной инкапсуляцииRocketMQ, обнаружил, что используя обаMQPushConsumer, упакованныйpullпроцесс голосования, поэтому можно считать, чтоRocketMQиспользуетPullРежим потребления режима вытягивания.

повтор сообщения

Существует три семантики сообщений:

  • Не более одного раза
  • Хотя бы один раз
  • Ровно раз

Из-за колебаний сети неизбежно, что при передаче сообщения по сети отправитель считает, что первая отправка не удалась, и повторяет попытку отправки, поэтому проблему, которую мы хотим решить, можно понять как:Два одинаковых сообщения, как сделать так, чтобы программа их корректно обработала?

В настоящее время распространены две практики:

1. Гарантированные новостиидемпотентность

2,система сообщенийотфильтровать повторяющиеся сообщения илипотребительФильтровать повторяющиеся сообщения

Идемпотентность:Используйте математические понятия, чтобы углубить понимание, например функциюf(x), x — это сообщение, то независимо от того, сколько раз сообщение будет использовано повторно,f(f(x))Результаты такие же. Не будет побочных эффектов из-за многократного употребления, что гарантирует правильность данных.

Дедупликация сообщений:Это легко понять, каждое сообщение имеет глобально уникальныйMessage ID, доступно в системе сообщенийBrokerфильтр у потребителяConsumerфильтровать в.

В настоящее время во введении, не виделRocketMQсуществуетBrokerФильтрация и дедупликация на месте, поэтому фильтровать нужно на стороне потребителя. Вы можете рассмотреть возможность добавления таблицы базы данных для записи обработанныхMessage ID, если встречается дублирующее сообщение, оно не будет обработано, и сообщение в обработке может быть помещено первымRedis, чтобы не потреблять одно и то же сообщение в одно и то же время~

В-четвертых, детализируйте основные модули

Из предыдущей диаграммы архитектуры мы видим, что есть четыре основных модуля, От сообщения, отправленного производителем, до потребления потребителем были испытаны следующие процессы:

сначала сProducerперспективы, с одним изnsУстановите длинную ссылку, а затем периодически отправляйте пульсации для поддержания статуса, получайтеTopicинформация о маршрутизации темы, затем с помощьюBroker MasterУстановите длинные ссылки и регулярно отправляйте контрольные сообщения, чтобы определить, доступны ли они. По типу отправляемого сообщения определите, нужно лиBrokerВозвращаемое значение.

отConsumerПод углом, сProducerОтличие в том, что это может бытьBroker Masterподписаться на сообщения или изBroker SalveПодпишитесь на новости, и розыгрыш здесь повторяться не будет.

потому чтоRocketMQчистыйJavaязык, так что вы можетеGithubЗагрузите исходный код, чтобы просмотреть подробный дизайн каждого модуля.

Nameserver

NameserverДля управления информацией о подписке на сообщения, отправке и потреблении сообщений каждая служба в кластере должна пройтиNameserverпонять их состояние.

Немного какDubboреестр вZookeeper,NameServerподдерживается вProducerкластер,Brokerкластер,ConsumerСостояние службы кластера. Поддерживайте и обновляйте статус каждой службы, отправляя пакеты пульса через регулярные промежутки времени (по умолчанию 30 секунд).

①, получитьBrokerзапросить, зарегистрироватьсяBrokerинформация о маршрутизации

②、ИнтерфейсClientпросьба, согласноTopicполучить егоBrokerинформация о маршрутизации

NameServerНет состояния, можно масштабировать по горизонтали. каждыйBrokerприбудет при запускеNameServerрегистр;ProducerПеред отправкой сообщения оно будет основываться наTopicприбытьNameServerполучить маршрут (кBroker)Информация;Consumerтакже будет получаться периодическиTopicмаршрутная информация.

оNamesrv, после понимания вышеуказанных понятий вы можете просмотреть конкретный процесс запуска в коде:

org.apache.rocketmq.namesrv.NamesrvStartup#main

Broker

BrokerРасположение сервера хранилища брокера сообщений отвечает за сохранение сообщений и управление ходом их использования.

Представьте его особенности:

① со всемиNamesrvУзел поддерживает длинную связь и пульсацию, а время (по умолчанию 30 с) будетTopicинформация, зарегистрированная наNamesrv.

② Отвечает за хранение сообщений вTopicДля поддержки облегченных очередей одна машина может поддерживать десятки тысяч очередей и поддерживать модель push-pull сообщений.

③ Он может накапливать сотни миллионов сообщений и может строго гарантировать порядок сообщений.

Конкретная запись кода запуска:

org.apache.rocketmq.broker.BrokerStartup#main

В процессе инициализации много чего нужно сделать, например хранение сообщений, удаленные сервисы, фильтры и т.д., на первый взгляд немного кружится голова, и дальше я не слежу. можете перейти к исходному коду, чтобы увидеть =-=

Producer

Отправитель сообщений, основная роль отправки сообщений в очередь сообщений.

На картинке нижеRocketMQTemplateперечислитьconvertAndSendСхема последовательности вызова метода:

Основные шагиDefaultMQProducerImplМетод реализации отправки , нижний слой выбирает тот, на который нужно отправитьMessageQueue, выполнить предварительный хук, передатьNettyClinetотправить запрос, выполнить post-hook после отправки и, наконец, вернутьSendResult.

В частности, его можно найти вSpringBootвведен вRocketMQ-StarterЗависимость, а затем отправьте сообщение для просмотра всей цепочки вызовов.

Consumer

  • Основа идентификации

В потребительской группе можно задать множествоListener, потреблять разныеTopicсообщение ниже.

При использовании сообщений вам необходимо пройтиConsumerGroup + Topic + Tagбыть уникальнымListener.

Поэтому одна и та же группа потребления и одна и та же тема не могут отображаться одинаково.TagизListener, приложение сообщит об ошибке при запуске.

  • Код

Код реализации сProducerВ этом же модуле:client

Прежде чем говорить о потреблении сообщений, давайте посмотрим, что при запуске приложение будет сканироватьRocketMQMessageListenerаннотированныйbeans, а затем зарегистрируйте контейнер

По моему мнению, вforРегистрация отсканирована в циклеbean, затем вcreateRocketMQListenerContainerсобратьDefaultRocketMQListenerContainer, затем вSpringЗарегистрируйтесь в контейнере и дождитесь последующего потребления.

В то же время поток демона постоянноBrokerПоднимите сообщение и используйте его после прослушивания сообщения, которое соответствует условиям:

Выше приведена примерно регистрация при запуске приложенияListenerИ процесс зацикливания для получения сообщения, в частностиConsumerЗапись кода при запуске находится здесь:

org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer#start

Процесс регистрации потребителей относительно долгий, и вам нужно следить за ним медленно ~

5. Специальное использование (код и пользовательский интерфейс мониторинга)

Код загрузки (внешний и бинарный стартовые пакеты)

  • external: расширенный контент, чтобы получить станцию ​​мониторингаConsoleАдрес следующий:
https://github.com/apache/rocketmq-externals.git
  • install: Стартовый пакет приложения для развертыванияNamesrvа такжеBroker
https://rocketmq.apache.org/dowloading/releases/
  • код реализации программы: Открытый исходный код, внес вкладGithub, вы можете загрузить и упаковать свои собственные идеи использования и ссылки на реализацию, учиться и учиться~
https://github.com/apache/rocketmq

Запустить сервер имен и брокер

Перед запуском убедитесь, что локальная глобальная переменная содержитJAVA_HOMEпеременные, например:

$ echo ${JAVA_HOME}
/Library/Java/JavaVirtualMachines/jdk1.8.0_221.jdk/Contents/Home

в загрузкуreleasesКаталог бинарного установочного пакета

$ cd rocketmq-all-4.7.0-bin-release/bin
$ sh paly.sh

Нажмите, чтобы просмотретьplay.shскрипт, вы можете видеть, что он запуститсяNamesrvа такжеBrokerПосле запуска двух служб вы можете перейти кBrokerОтправка и использование сообщений

Стартер с интеграцией Springboot

используется здесьSpringbootинтегрированныйstarterМодуль, вы можете обратиться к этой статье для получения подробной информации:

Woohoo. Бросьте Arlington Terrier.com/Apache-rock…

После личной интеграции введитеDemoизRocketПод содержанием:

GitHub.com/VIP-AU Stories/ Это…

здесь, чтобы сказатьListenerПроцесс обработки потребителя сообщений приложение продолжаетbrokerпопасть под наблюдениеTopicсообщение, а затем найдите соответствующееConsumerПотреблять:

Вы можете проследить ссылку вызова в левой части рисунка выше, чтобы понять общую связь потребителей, потребляющих сообщения.

Начать мониторинг использования пользовательского интерфейса

введите скачанныйexternalпуть, естьrocketmq-consoleкаталог, чтобы предотвратить занятие порта 8080, его необходимо изменить.Есть два конкретных места для изменения:

# UI 监控系统的访问端口
server.port=10010
# Namesrv 的地址,如果有多个,请用分号 ; 分隔开
#if this value is empty,use env value rocketmq.config.namesrvAddr  NAMESRV_ADDR | now, you can set it in ops page.default localhost:9876
rocketmq.config.namesrvAddr=localhost:9876

rocket-consoleЯвляетсяSpringbootПроект после изменения соответствующей конфигурации необходимо упаковать, а затем развернуть.

$ mvn clean install -DskipTests
$ java -jar target/rocketmq-console-ng-1.0.1.jar

Затем войдите в порт, установленный ранее, вы можете увидеть платформу мониторинга:

Панель навигации в верхней части определяет имеющиеся у нее функции, например, желание увидетьTopicкоторомуBroker, а затем подписавшиеся группы потребителей можно просмотреть с помощью мониторинга:

6. План последующего исследования

В повседневном использовании в основном нет необходимости модифицироватьNameserverа такжеBrokerУслуги этих двух модулей больше касаютсяProducerа такжеConsumerиспользовать, в то же время, они могут быть инкапсулированы дважды, чтобы заменитьDefaultMQProducerа такжеMQConsumerРеализуйте классы для создания отправителей и потребителей, которые подходят вашему бизнесу.

Этот обмен представляетRocketMQАрхитектура проекта, дизайн основных модулей, расположение исходного кода, общий процесс отправки и потребления сообщений, а также введениеRocketMQTemplateБазовое использование и мониторингConsole UIТем не менее, я не понял подробного дизайна величины смещения, формата хранения сообщений, синхронизации, метода асинхронной очистки, повторения сообщений и т. Д., Если будет возможность, я узнаю и поделюсь этим позже ~

использованная литература

1,Официальная документация по облачным продуктам Alibaba

2,Глоссарий Alibaba Cloud RocketMQ

3.«Глубокое понимание RocketMQ» — механизм доставки сообщений MQ.

4.Message Middleware Series (9): подробное объяснение дизайна архитектуры RocketMQ, ключевых функций и сценариев приложений.

5.«Легко учиться» — RocketMQ

6.RocketMQ делится полным документальным фильмом

7.Введение в производительность Ali RocketMQ

8,Сравнение RocketMQ и kafka (18 отличий)

9,Режим потребления сообщений RocketMQ Push-pull