Push или pull очередей сообщений, как это делают RocketMQ и Kafka?

RocketMQ

Во все времена к людям, способным учиться, не будут относиться плохо

Всем привет, меня зовут да.

Сегодня мы поговорим о режиме push-pull для очередей сообщений, который также является горячей темой на собеседованиях.Например, если вы напишите RocketMQ в своем резюме, вы в основном спросите, принимает ли RocketMQ режим push или режим pull? Это режим вытягивания? У вас нет PushConsumer?

Сегодня мы поговорим о двухтактной модели, а затем посмотрим, как это делают RocketMQ и Kafka.

Двухтактный режим

Во-первых, понятно, о каком шаге очереди сообщений идет речь в режиме push-pull.Режим push-pull относится к взаимодействию между Потребителем и Брокером..

По умолчанию между производителем и посредником существует метод push, то есть производитель отправляет сообщение посреднику, а не посредник активно извлекает сообщение.

Представим, что если Брокеру нужно тянуть сообщения, Продюсер должен сохранять сообщения локально в виде логов, чтобы дождаться пула Брокера.Если производителей много, то надежность сообщения зависит не только от самого Брокера , но и необходимость полагаться на сотни производителей.

Брокер также может полагаться на такие механизмы, как множественные копии, для обеспечения надежного хранения сообщений, но надежность сотен или тысяч производителей немного сложна, поэтому производители по умолчанию отправляют сообщения брокеру.

Поэтому в некоторых случаях лучше быть распределенным, а иногда лучше управляться централизованно.

толкающий режим

Режим push означает, что сообщения передаются от Брокера к Потребителю, то есть Потребитель пассивно получает сообщения, а Брокер доминирует в отправке сообщений.

Давайте подумаем о преимуществах push-режима?

Высокие новости в реальном времени, Брокер может отправить сообщение Потребителю сразу после получения сообщения.

Проще для потребительского использования, Всё просто, подожди, всё равно когда будут новости, её продавят.

Каковы недостатки режима push?

Скорость толчка трудно адаптировать к скорости потребления, целью режима push является отправка сообщений с максимальной скоростью. Когда скорость, с которой производитель отправляет сообщения брокеру, превышает скорость, с которой потребитель потребляет сообщения, потребитель может «взорваться» с течением времени, потому что Его вообще нельзя употреблять. Когда скорость отправки слишком высока, как при DDos-атаке, потребитель глуп.

И скорость потребления у разных потребителей разная. Как Брокеру трудно сбалансировать скорость отправки каждого потребителя. Если вы хотите добиться адаптивной скорости отправки, вам нужно сказать Брокеру, когда вы нажимаете, я не могу сделайте это, вы нажимаете медленно, затем Брокер должен поддерживать состояние каждого потребителя, чтобы подтолкнуть изменения скорости.

Это фактически увеличивает сложность самого брокера.

Таким образом, в режиме push-уведомлений трудно контролировать скорость отправки в зависимости от статуса потребителей, и он подходит для ситуаций, когда количество сообщений невелико, а емкость потребления велика и требует высокой производительности в реальном времени.

режим вытягивания

Режим получения означает, что Потребитель активно запрашивает у Брокера получение сообщений, то есть Брокер пассивно отправляет сообщения Потребителю.

Давайте подумаем о преимуществах режима вытягивания?

Инициатива в модели вытягивания принадлежит потребителям.Потребители могут инициировать запросы на вытягивание в соответствии со своими условиями.. Предполагая, что текущий потребитель чувствует, что он не может потреблять достаточно, он может перестать тянуть в соответствии с определенной стратегией или может тянуть через определенные промежутки времени.

Брокер относительно прост в режиме вытягивания, он сохраняет только сообщения, отправленные производителем.Что касается потребления, то оно, естественно, инициируется потребителем.Когда придет запрос, он выдаст ему сообщение.Откуда начать получать сообщение, и сказать ему, сколько потребителей это берет, это бесчувственный человек-инструмент, если потребитель не придет за ним, это не имеет значения.

Режим извлечения может быть более подходящим для пакетной отправки сообщений., в зависимости от режима отправки вы можете отправить сообщение или можете кэшировать некоторые сообщения, а затем отправить их, но когда вы отправляете, вы не знаете, смогут ли потребители обработать такое количество сообщений одновременно. Режим извлечения более разумен: он может ссылаться на информацию, запрошенную потребителем, чтобы определить, сколько сообщений следует кэшировать, а затем отправлять пакетами.

Каковы недостатки режима вытягивания?

задержка сообщения, в конце концов, это потребитель получает сообщение, но как потребитель узнает, что сообщение прибыло? Таким образом, его можно получать только постоянно, но его нельзя запрашивать очень часто, если слишком часто, он станет потребителями, атакующими брокера. Поэтому вам нужно уменьшить частоту запросов, например, если вы запрашиваете каждые 2 секунды, вы, скорее всего, будете задерживаться на 2 секунды, когда просматриваете сообщение.

сообщение занято запроснапример, занят запрос, сообщение приходит только через несколько часов, затем запрос потребителя недействителен в течение нескольких часов, и он делает бесполезную работу.

Это толчок или тяга?

Видно, что режим проталкивания и режим вытягивания имеют свои преимущества и недостатки Как выбрать?

И RocketMQ, и Kafka выбирают режим вытягивания.Конечно, в отрасли также существуют очереди сообщений, основанные на режиме проталкивания, такие как ActiveMQ.

Я лично считаю, что режим извлечения более подходит, потому что текущая очередь сообщений нуждается в сохранении сообщений, то есть у нее есть функция хранения, ее задача состоит в том, чтобы получать сообщения и сохранять хорошие сообщения, чтобы потребители могли их использовать. сообщения.

Существуют различные типы потребителей.Как брокер, вы не должны иметь тенденцию зависеть от потребителей.Я припас для вас хорошие новости, и вы можете получить их, если хотите.

Хотя Брокер в целом не станет узким местом, потому что бизнес-потребление на стороне потребителя относительно медленное, но Брокер — это, в конце концов, центральная точка, и он может быть максимально облегченным.

Так что даже RocketMQ и Kafka выбрали режим pull, разве их не пугают недостатки режима pull? Боялись, поэтому оперировали волной, сглаживая недостатки тягового режима.

долгий опрос

И RocketMQ, и Kafka используют «длинный опрос» для реализации режима извлечения, давайте посмотрим, как они работают.

Для упрощения ниже я буду описывать сообщения, которые не соответствуют количеству пулов, общему размеру и т.д. как будто сообщения еще нет, и все они все равно не соответствуют условиям.

Долгий опрос в RocketMQ

PushConsumer в RocketMQ на самом деле является методом в режиме извлечения.Это просто похоже на режим push.

Потому что RocketMQ тайно помогает нам запрашивать данные у брокера за кулисами.

В фоновом режиме будет работать поток RebalanceService. Этот поток будет выполнять балансировку нагрузки в соответствии с количеством очередей тем и количеством потребителей в текущей группе потребления. PullRequest, сгенерированный каждой очередью, помещается в очередь блокировки pullRequestQueue. Затем есть еще один поток PullMessageService, который постоянно получает запросы на вытягивание из очереди блокировки pullRequestQueue, а затем запрашивает брокера через сеть, чтобы получить сообщения на вытягивание в квазиреальном времени.

Я не буду вырезать эту часть кода, она просто такая, и позже покажу ее с картинками.

Затем метод processRequest в PullMessageProcessor брокера используется для обработки запроса на получение сообщения. Если есть сообщение, он вернется напрямую. Что, если сообщения нет? Давайте посмотрим на код.

Давайте посмотрим, что делает метод suspendPullRequest.

Поток PullRequestHoldService будет получать запрос PullRequest из pullRequestTable каждые 5 секунд, а затем проверять, меньше ли смещение извлекаемого запроса сообщения, чем максимальное смещение текущей очереди потребления.Если условие истинно, это означает, что есть является новым сообщением, и оно вызовет notifyMessageArriving. Наконец, вызывается метод executeRequestWhenWakeup() объекта PullMessageProcessor, чтобы повторить запрос на обработку сообщения, то есть еще раз, время по умолчанию всего длинного опроса составляет 30 секунд.

Проще говоря, время сообщения будет проверяться раз в 5 секунд, и если оно есть, то будет вызываться processRequest для его повторной обработки. Это не похоже на настоящее время, не так ли? 5 секунд?

Не беспокойтесь, существует также поток ReputMessageService, который используется для непрерывного анализа данных из журнала фиксации и распределения запросов, а также для создания двух типов данных: ConsumeQueue и IndexFile.И также будет операция запроса на пробуждение, чтобы компенсировать такую ​​​​медленную задержку каждые 5 секунд.

Код резать не буду, то есть пишется сообщение и будет вызываться pullRequestHoldService#notifyMessageArriving.

Наконец, позвольте мне нарисовать картинку, чтобы описать весь процесс.

Долгий опрос в Kafka

Как и в Kafka, в запросе на вытягивание есть параметры, которые могут привести к тому, что блок запроса потребителя будет ожидать в режиме «длительного опроса».

Проще говоря, потребитель обращается к брокеру, чтобы получить сообщение, и определяется период ожидания, то есть потребитель запрашивает сообщение и немедленно возвращает сообщение, если оно есть, если нет, потребитель ждет, пока не истечет время ожидания. , а затем снова инициирует запрос сообщения о вытягивании.

И брокер также должен сотрудничать.Если потребитель запрашивает, он немедленно вернет, если есть сообщение.Если сообщения нет, будет установлена ​​​​операция с задержкой, и возврат будет возвращен при выполнении условий.

Давайте кратко взглянем на исходный код, и я вырежу часть кода, чтобы выделить ключевые моменты.

Давайте сначала посмотрим на код на стороне потребителя.

Интерфейс опроса выше должен быть всем знаком, на самом деле прямо из аннотации известно, что он действительно ждет прихода данных или тайм-аута, давайте вкратце посмотрим вниз.

Давайте посмотрим, что представляет собой последний вызов client.poll.

НаконецВызывается селектор, обернутый Kafka, и в конечном итоге будет вызван селектор Java nio (тайм-аут)..

Теперь код на стороне потребителя ясен,Давайте посмотрим, как это делает Брокер..

Запись для Broker для обработки всех запросов на самом деле представлена ​​в предыдущей статье, прямо под методом handle файла KafkaApis.scala, на этот раз главным героем является handleFetchRequest.

Этот метод приходит, и я перехватываю самую важную часть.

На следующем рисунке показана внутренняя реализация метода fetchMessages. Комментарии в исходном коде очень четкие. Вы можете увеличить изображение, чтобы увидеть его.

Название этого чистилища очень интересное.Проще говоря, оно использует колесо времени, упомянутое в моей предыдущей статье, для выполнения задач на время.Например, вотdelayedFetchPurgatory, предназначенный для обработки операций отложенного извлечения.

Давайте кратко подумаем, какие методы необходимо реализовать для этой отложенной операции.Во-первых, построенная отложенная операция должна иметь механизм проверки, чтобы увидеть, прибыло ли сообщение, а затем должен быть метод, который будет выполняться после прибытия сообщения, и должен быть метод выполнения.Метод что делать после завершения, конечно должен быть метод что делать после таймаута.

Эти методы фактически соответствуют DelayedFetch в коде, этот класс наследует DelayedOperation и содержит:

  • Метод isCompleted для проверки выполнения условия
  • Метод, который будет выполняться после выполнения условия tryComplete.
  • Метод, вызываемый после выполнения onComplete
  • Метод, который необходимо выполнить после истечения срока действия onExpiration.

Судить о том, истек ли срок его действия, можно по колесу времени, но вам не терпится увидеть новости, когда он истечет, верно?

Здесь у Kafka тот же механизм, что и у RocketMQ. Он также будет напоминать об этих сообщениях с отложенным запросом, когда сообщение будет написано. Я не буду публиковать конкретный код. Вы можете увидеть два метода в методе ReplicaManager#appendRecords.

Но хоть код и не выложен, картинку еще надо нарисовать.

Резюме

Видно, что и RocketMQ, и Kafka используют механизм «длинного опроса». Конкретный метод заключается в ожидании сообщений через потребителей. Когда есть сообщение, брокер напрямую возвращает сообщение. Если сообщения нет, он принять стратегию отложенной обработки, и чтобы обеспечить своевременность сообщения, когда новое сообщение поступает в соответствующую очередь или раздел, сообщение будет напоминаться, и сообщение будет возвращено вовремя.

Одним словом, потребитель и брокер сотрудничают друг с другом, удерживают, когда запрос на вытягивание не соответствует условиям, избегают многократных частых действий на вытягивание и напоминают вернуться, как только приходит сообщение.

Наконец

В общем, у режима push-pull есть свои преимущества и недостатки, и лично я считаю, что режим pull-down вообще больше подходит для очередей сообщений.

Прочитав эту статью, я полагаю, что интервьюер спрашивал вас толкать или тянуть? Я предлагаю криво улыбнуться ему.


Я да, от мала до миллиарда, увидимся в следующей статье.