Статьи по грамотности RocketMQ

Java

Основная ссылка этого блога:

«Легко учиться» — RocketMQАо Бинг

APACHE-RocketMQ GiteeОфициальная документация RocketMQ

RocketMQ реальный бой и продвинутый GitChat

Прошло много времени с тех пор, как я снова вел блог, хотя я могу найти бесчисленное множество причин, чтобы не вести блог, в конце концов, это все еще одно слово «ленивый». Сегодня я наконец приняла таблетку от ленивого рака и решила написать в блог. Что хорошего представить?Подумав об этом, давайте представим RocketMQ.В конце концов, я написал более 30 блогов, и я не написал блог о MQ. Этот блог относительно простой, он не включает в себя анализ исходного кода, только грамотность.

В чем польза МК

разъединение

Я думаю, что с определенной точки зрения микросервисы способствовали бурному развитию MQ.Первоначально система состояла из N нескольких модулей, и все модули были сильно связаны друг с другом.Теперь, с микросервисами, модуль — это система, а системы определенно есть три распространенных метода взаимодействия, один — RPC, другой — HTTP, а третий — MQ.

асинхронный

Если изначально бизнес был разбит на N шагов, и окончательный результат можно было вернуть пользователю только после его пошаговой обработки, то теперь с MQ сначала обрабатывается наиболее критическая часть, а затем отправляется сообщение в MQ, который непосредственно возвращается пользователю.Хорошо, что касается последнегоШаги обрабатываются медленно в фоновом режиме, это действительно артефакт для улучшения взаимодействия с пользователем.

отсечение пика

Внезапный всплеск объема запросов определенного интерфейса неизбежно окажет большую нагрузку на сервер приложений и сервер базы данных.Теперь, когда MQ доступен, независимо от того, сколько запросов приходит, они будут медленно обрабатываться в фоновом режиме.

Введение в RocketMQ

RocketMQ написан на Java и является промежуточным программным обеспечением для сообщений Alibaba с открытым исходным кодом, которое вбирает в себя многие преимущества Kafka. Kafka также является популярным промежуточным программным обеспечением для сообщений, но Kafka написана на Scala, что не позволяет программистам Java читать исходный код, а программистам Java выполнять какую-либо индивидуальную разработку. Друзья, которые были в контакте с Kafka, знают, что хорошо использовать Kafka нелегко. Условно говоря, RocketMQ намного проще, и RocketMQ имеет благословение Alibaba. Он прошел N двойных 11 тестов и больше подходит для отечественных интернет-компаний. , поэтому он используется внутри страны.В RocketMQ много компаний.

Четыре компонента RocketMQ

image.pngИзображение изgit ee.com/mirrors/ROC…

Вы можете видеть, что RocketMQ в основном состоит из четырех компонентов:

NameServer

  • Служба без сохранения состояния, реестр, развертывание кластера, но взаимодействие данных между узлами NameServer отсутствует.
  • Брокер будет периодически сообщать информацию о маршрутизации темы всем серверам имен. Производитель и потребитель случайным образом выбирают сервер имен для обновления информации о маршрутизации в регулярной теме.
  • Информация о маршрутизации темы принимает возможную согласованность в кластере NameServer.
  • Гарантированный АП.

Broker

  • Сервер RocketMQ используется для хранения и распространения сообщений.
  • Broker будет регулярно сообщать всю информацию о маршрутизации тем, которой он владеет, на NameServer.
  • Брокер имеет две роли: мастер и ведомый.Мастер берет на себя операции чтения (потребления сообщений) и записи (производства сообщений).Если мастер занят или недоступен, ведомый может выполнять операции чтения. BrokerId=0 означает Matser, BrokerId!=0 означает Follower, обратите внимание на два момента:

Во-первых, пока только фолловеры с BrokerId=1 могут выполнять операции чтения; Во-вторых, только более поздняя версия RocketMQ поддерживает автоматическое обновление Последователя до Мастера при зависании Мастер-узла.

Producer

Производитель через регулярные промежутки времени инициирует запрос информации о маршрутизации темы к серверу имен.

Consumer

Потребители инициируют тематические запросы информации о маршрутизации к NameServer через регулярные промежутки времени.

Почему реестр не выбирает Zookeeper

На самом деле в младших версиях RocketMQ в качестве центра регистрации действительно был выбран Zookeeper, но позже он был заменен на текущий NameServer.Основная причина угадывается в том, что:

  • RocketMQ уже является промежуточным программным обеспечением, и я больше не хочу зависеть от другого промежуточного программного обеспечения.
  • Zookeeper относительно тяжелый, и есть много функций, которые RocketMQ не может использовать, поэтому лучше написать легковесный реестр.
  • Zookeeper - это CP. После запуска выборов лидера реестр будет недоступен, в то время как реестр RocketMQ не требует строгой согласованности, если гарантируется окончательная согласованность.

Модель предметной области сообщений RocketMQ

Message

  • переданное сообщение.
  • Сообщение должно иметь тему.
  • Сообщение может иметь несколько тегов и несколько ключей, которые можно рассматривать как дополнительные свойства сообщения.

Topic

  • Коллекция сообщений класса.
  • Каждое сообщение должно иметь тему.
  • Тип сообщения первого уровня.

Tag

  • В дополнение к темам сообщение также может иметь теги, которые используются для разделения различных типов сообщений по одной и той же теме.
  • Тег не требуется.
  • Тип сообщения второго уровня.

Group

Разделенные на ProducerGroup, ConsumerGroup, нас больше беспокоит ConsumerGroup, ConsumerGroup содержит несколько потребителей.

В режиме потребления кластера потребители в ConsumerGroup вместе потребляют тему, и каждому потребителю будет назначено N очередей, но очередь будет использоваться только одним потребителем.Разные группы потребителей могут использовать одну и ту же тему, и сообщение будет потреблено все ConsumerGroups подписались на эту тему.

Queue

  • Тема по умолчанию содержит четыре очереди.
  • В режиме потребления кластера потребители в одной и той же группе потребителей могут получать сообщения из нескольких очередей, но очередь может потребляться только одним потребителем.
  • Сообщения в очереди упорядочены.
  • Он делится на очередь чтения и очередь записи.Вообще говоря, количество очередей чтения совпадает с количеством очередей записи, иначе легко вызвать проблемы.

структура потребления

Существует два режима потребления: Clustering (кластерное потребление) и Broadcasting (широковещательное потребление).

В отличие от других MQ, другие MQ определяют потребление кластера или широковещательное потребление при отправке сообщений RocketMQ устанавливает, следует ли использовать кластерное потребление или широковещательное потребление на стороне потребителя.

Кластеризация (потребление кластера)

По умолчанию используется кластерный режим потребления, в этом режиме все потребители в ConsumerGroup вместе потребляют сообщения из одной темы, каждый потребитель отвечает за потребление сообщений из N очередей (N также может быть равно 1 или даже 0, что не является выделены очереди), но очередь будет потребляться только одним потребителем. Если Потребитель умирает, другие Потребители в группе Потребителей продолжат потреблять вместо приостановленного Потребителя.

В режиме потребления кластера процесс потребления поддерживается на стороне брокера, а путь хранения${ROCKET_HOME}/store/config/ consumerOffset.json,Как показано ниже:image.pngиспользоватьtopicName@consumerGroupNameявляется Ключом, прогресс потребления — Ценностью, а форма Ценности —queueId:offset, указывая на то, что при наличии нескольких ConsumerGroups процесс потребления каждой ConsumerGroup отличается и должен храниться отдельно.

Вещание

Широковещательное сообщение о потреблении будет отправлено всем потребителям в ConsumerGroup.

В режиме широковещательного потребления процесс потребления поддерживается на стороне потребителя.

Алгоритм загрузки очереди потребления и механизм ребалансировки

Алгоритм загрузки очереди потребления

Мы знаем, что в режиме потребления кластера все потребители в ConsumerGroup потребляют сообщения темы вместе, и каждый потребитель отвечает за потребление сообщений из N очередей, так как же он распределяется? Это включает в себя алгоритм загрузки очереди потребления.

RocketMQ предоставляет множество алгоритмов загрузки очереди потребления, два из которых наиболее часто используются: AllocateMessageQueueAveragely и AllocateMessageQueueAveragelyByCircle. Давайте посмотрим на разницу между этими двумя алгоритмами.

Предположим, что тема теперь имеет 16 очередей, представленных q0~q15, и 3 потребителя, представленных c0-c2.

Результаты использования алгоритма загрузки очереди потребления AllocateMessageQueueAveragely следующие:

  • с0: q0 q1 q2 q3 q4 q5
  • с1: д6 д7 д8 д9 д10
  • c2: q11 q12 q13 q14 q15

Результаты использования алгоритма загрузки очереди потребления AllocateMessageQueueAveragelyByCircle следующие:

  • с0: q0 q3 q6 q9 q12 q15
  • с1: д1 д4 д7 д10 д13
  • c2: q2 q5 q8 q11 q14

Все потребители в ConsumerGroup вместе потребляют сообщения темы. Каждый потребитель отвечает за потребление сообщений из N очередей, но очередь не может потребляться N потребителями одновременно. Что это значит?

Вы должны быть достаточно умны, чтобы думать, что если тема имеет только 4 очереди и 5 потребителей, то один потребитель не будет назначен ни на одну очередь, поэтому в RocketMQ количество очередей под темой напрямую определяет максимальное количество потребителей. число, что означает, что скорость потребления не может быть увеличена путем добавления только потребителей.

перебалансировать

Хотя предполагается, что количество очередей следует полностью учитывать при создании темы, реальная ситуация часто неудовлетворительна.Даже если количество очередей не изменится, количество потребителей обязательно изменится, например, онлайн и офлайн потребители, такие как Определенный Потребитель умер, например, был добавлен новый Потребитель. Расширение и сжатие очереди, а также расширение и сжатие Потребителя приведут к перебалансировке, то есть к перераспределению очереди потребления для Потребителя.

В RocketMQ Потребитель будет регулярно запрашивать количество очередей тем, и если количество Потребителей изменится, будет запущена перебалансировка.

Ребалансировка реализована внутри RocketMQ, и программисту не нужно об этом заботиться.

Тянуть ИЛИ толкать?

Вообще говоря, у MQ есть два способа получения сообщений:

  • Вытягивание: потребитель активно извлекает сообщения. Преимущество состоит в том, что потребитель может контролировать частоту и количество извлекаемых сообщений. Потребитель знает свою собственную емкость потребления, поэтому вызвать накопление сообщений на стороне потребителя непросто, но реально Показатели времени не очень хорошие, а эффективность относительно низкая.
  • Push: Брокер активно отправляет сообщения. Преимущество в режиме реального времени и высокой эффективности, но Брокер не может знать потребляемую мощность Потребителя. Если Потребителю отправляется слишком много сообщений, сообщения будут накапливаться на стороне Потребителя; если данных, отправляемых Потребителю, слишком мало. Это также приведет к бездействию на стороне Потребителя.

Будь то Pull или Push, Потребитель всегда будет взаимодействовать с Брокером Обычно существует три способа взаимодействия: короткое соединение, длинное соединение и опрос.

Кажется, что RocketMQ поддерживает как Pull, так и Push, но на самом деле Push также реализован с помощью Pull, так как же Consumer взаимодействует с Broker?

Это гениальная конструкция RocketMQ, это не короткое соединение, не длинное соединение, не опрос, а долгий опрос.

долгий опрос

Потребитель инициирует запрос на получение сообщения, который делится на два случая:

  • Есть сообщение: После того, как Потребитель получает сообщение, соединение разрывается.
  • Нет сообщения: Брокер Удерживать (держать) соединение определенное время, каждые 5 секунд проверять есть ли сообщение, если есть сообщение, отправить его Потребителю, и соединение будет разорвано.

сообщение о транзакции

RocketMQ поддерживает транзакционные сообщения.После того, как Производитель отправит транзакционное сообщение Брокеру, Брокер сохранит сообщение в системной теме:RMQ_SYS_TRANS_HALF_TOPIC, чтобы потребитель не мог использовать это сообщение.

Брокер будет иметь запланированную задачу для потребленияRMQ_SYS_TRANS_HALF_TOPICсообщения, инициируйте обратную проверку для производителя. Существует три статуса обратной связи: фиксация, откат и неизвестно.

  • Если статус проверки отправлен и откатывается, это инициирует отправку и откат сообщения;
  • Если он неизвестен, он будет ждать следующей проверки. RocketMQ может установить интервал проверки и время проверки сообщения. Если время проверки превышает определенное количество проверок, сообщение будет автоматически отброшено.

сообщение с задержкой

Отложенное сообщение означает, что после того, как сообщение отправлено Брокеру, оно не может быть использовано Потребителем немедленно, и ему необходимо подождать определенный период времени, прежде чем его можно будет использовать. RocketMQ поддерживает только определенное время задержки:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h.

форма потребления

RocketMQ поддерживает две формы потребления: параллельное потребление и последовательное потребление. Если он потребляется последовательно, необходимо убедиться, что упорядоченные сообщения находятся в одной очереди. Как выбрать очередь для отправки?В RocketMQ есть несколько перегрузок для отправки сообщений, одна из которых поддерживает выбор очереди.

Синхронная чистка, асинхронная чистка

Производитель отправляет сообщение брокеру. Брокер должен сохранить сообщение. RocketMQ поддерживает две стратегии сохранения:

  • Синхронная прокрутка: брокер возвращает ACK производителю после сохранения сообщения.Преимущество заключается в том, что сообщение очень надежно, но эффективность низкая.
  • Асинхронная очистка: брокер записывает сообщение в PageCache и возвращает ACK производителю. Преимущество в том, что эффективность чрезвычайно высока, но если сервер зависнет, сообщение может быть потеряно, если зависнет только сервис RocketMQ, сообщение не потеряется.

Синхронная репликация, Асинхронная репликация

Для надежности и доступности MQ в производственной среде обычно развертываются узлы-последователи, и узлы-последователи будут реплицировать данные мастера. RocketMQ поддерживает две стратегии репликации:

  • Синхронная репликация: как мастер, так и ведомый успешно записывают сообщение, прежде чем вернуть ACK производителю.Надежность высока, но эффективность низкая.
  • Асинхронная репликация: пока мастер записывает успешно, он возвращает ACK производителю, что более эффективно, но может привести к потере сообщений.

Записывается ли «запись» в PageCache или на жесткий диск, зависит от конфигурации Follower Broker.

Давайте снова поговорим о продюсере.

RocketMQ предоставляет три метода отправки сообщений:

  • oneway: выстрелил и забыл, одностороннее сообщение, что означает, что после отправки сообщение игнорируется.Этот метод не имеет возвращаемого значения.
  • Синхронизация: после отправки сообщения дождитесь ответа брокера синхронно.
  • Асинхронный: после отправки сообщение сразу же возвращается, после получения ответа от брокера будет выполнен метод вызова функции.

В реальной разработке обычно используется метод синхронизации.Если производительность RocketMQ должна быть улучшена, параметры на стороне брокера обычно изменяются, особенно стратегия сброса и стратегия репликации.

отправить сообщение повторить попытку

При отправке сообщения, если используется MessageQueueSelector, механизм повторной отправки сообщения будет недействительным.

Существует четыре возможных ответа на отправку сообщения:

public enum SendStatus {
    SEND_OK,
    FLUSH_DISK_TIMEOUT,
    FLUSH_SLAVE_TIMEOUT,
    SLAVE_NOT_AVAILABLE,
}

Кроме первой, другие ситуации являются проблемными.Чтобы сообщения не терялись, необходимо установить параметры Producer:RetryAnotherBrokerWhenNotStoreOKправда.

Механизм предотвращения сбоев

Если сообщение не удается отправить, оно все равно отправляется этому Брокеру при повторной попытке, тогда высокая вероятность отправки все равно будет неудачной. RocketMQ разработан гениально в том, что он автоматически избегает этого Брокера и выбирает других Брокеров при повторной попытке, но Пока , асинхронная отправка не так умна и будет повторяться только на одном Брокере, поэтому настоятельно рекомендуется выбирать синхронную отправку.

RocketMQ предоставляет два механизма предотвращения сбоев. с параметрамиSendLatencyFaultEnableконтролировать.

  • false: значение по умолчанию, механизм предотвращения сбоев будет включен только при повторной попытке, например, если отправка сообщения BrokerA не удалась, при повторной попытке будет выбран BrokerB, но при следующей отправке сообщения оно все равно будет отправлено Брокеру А.
  • true: механизм задержки задержки включен. Если сообщение не будет отправлено BrokerA, будет пессимистично предположить, что BrokerA будет недоступен в течение определенного периода времени и не будет отправлять сообщения BrokerA в течение определенного периода времени в будущем.

Механизм задержки задержки кажется очень полезным, но, вообще говоря, брокер занят, что приводит к недоступности брокера или недоступности сети.Это только мгновенная вещь, и ее можно немедленно восстановить.Внутри избегали , другие Брокеры были более заняты, и это могло быть хуже.

Давайте снова поговорим о потребителе

Рекомендации для потребителей

Потребитель имеет два параметра: степень параллелизма, которую можно использовать, а именноConsumeThreadMin,ConsumeThreadMax, создается впечатление, что чем меньше сообщений накапливается на стороне потребителя, тем больше число потоков-потребителей.ConsumeThreadMin; Если на стороне потребителя накопилось слишком много сообщений, автоматически запускается новый поток для потребления до тех пор, пока количество потоков-потребителей не будетConsumeThreadMax. Но это не так.Consumer держит внутренний пул потоков и выбирает неограниченную очередь, т. е.ConsumeThreadMaxПараметр недействителен, поэтому в фактической разработкеConsumeThreadMin,ConsumeThreadMaxчасто устанавливаются одинаковые.

ConsumeFromWhere

Если ход потребления не может быть запрошен, где Потребитель начинает потреблять, RocketMQ поддерживает потребление из самого последнего сообщения, самого раннего сообщения и указанной метки времени.

Использовать повтор сообщения

RocketMQ установит имя темы для каждой ConsumerGroup как%RETRY%+consumerGroupОчередь повторных попыток используется для сохранения сообщения, которое необходимо повторить, в ConsumerGroup, но для повторной попытки требуется определенное время задержки.SCHEDULE_TOPIC_XXXXВ очереди задержки фоновая задача синхронизации откладывается на соответствующее время, а затем сохраняется в%RETRY%+consumerGroupв очереди повторных попыток.

Что делать, если новостей навалено, а мощности потребления не хватает?

  • Улучшите прогресс потребления, это лучший способ.
  • Увеличивайте очередь, увеличивайте Потребителя.
  • Первоначальный Потребитель, как посредник, «перемещает» сообщения в несколько новых тем в соответствии с определенными правилами, а затем открывает несколько групп потребителей для использования разных тем.
  • Откройте новую ConsumerGroup для потребления, то есть две ConsumerGroup используют тему одновременно, но вам нужно обратить внимание на оценку смещения.Например, ConsumerGroup использует сообщения с нечетным смещением, а ConsumerGroup использует сообщения с даже смещение.

Я думал, что грамотность написания должна быть очень гладкой, но я все равно слишком много думал, потому что это грамотность, и она нацелена на мелких партнеров, которые не имели большого контакта с RocketMQ, но RocketMQ не так прост, его невозможно использовать блог, Просто дайте друзьям, которые не имели большого контакта с RocketMQ, начать плавно, поэтому, когда я писал блог, я все время думал, важна ли эта вещь и нужно ли ее подробно описывать; эту вещь можно игнорировать, может ли это не быть представленным и т. д., вы можете видеть. Эта статья в основном знакомит с различными концепциями и почти не затрагивает уровень API, потому что, как только API задействован, предполагается, что он не будет завершен через две недели.

End