Что такое пульсар очереди сообщений следующего поколения

Java очередь сообщений

задний план

Давно об этом думал и давно хотел написать статью о пульсаре, но знаний мало, да и во многих деталях еще не разбираюсь, поэтому перепроверил много информации, и вот наконец-то могу придумай статью.

Pulsar — ​​это промежуточное ПО для сообщений, исходный код которого был открыт Yahoo в 2016 году, а в 2018 году он стал проектом верхнего уровня Apache. В своих предыдущих статьях я написал много статей о другом промежуточном программном обеспечении для сообщений, таком как kafka, RocketMQ и т. д. Если вы не знаете об очередях сообщений, вы можете прочитать мои предыдущие статьи ниже:

В индустрии с открытым исходным кодом уже так много промежуточного программного обеспечения очереди сообщений.Каковы преимущества pulsar как новой силы? С момента своего появления pulsar постоянно сравнивают с другими очередями сообщений (kafka, RocketMQ и т. д.), но идея дизайна pulsar отличается от большинства промежуточного программного обеспечения очередей сообщений высокой пропускной способностью, низкой задержкой, разделением вычислений и хранения. , мультитенантность, удаленная репликация и другие функции, поэтому pulsar также известен как промежуточное ПО для очередей сообщений следующего поколения, и я буду подробно разбирать его по порядку.

принцип пульсарной архитектуры

Общая архитектура не слишком отличается от другого промежуточного программного обеспечения очереди сообщений. Я думаю, вы видели много знакомых терминов. Далее я объясню вам значение этих терминов один за другим.

Глоссарий

  • Производитель: производители сообщений, сообщение будет отправлено брокеру.
  • Потребитель: потребитель сообщений, читает сообщения от брокера клиенту для обработки потребления.
  • Брокер: его можно рассматривать как сервер пульсара. И производитель, и потребитель рассматриваются как клиент. Узел обработки сообщений, брокер пульсара отличается от другого промежуточного программного обеспечения сообщений. Он не имеет состояния и не имеет хранилища, поэтому это может быть неограниченное расширение, которое будет подробно объяснено позже.
  • Букмекер: здесь используется Apache Bookeeper, отвечающий за сохранение всех сообщений.
  • ZK: Как и kafka, pulsar также использует zk для сохранения некоторых метаданных, таких как управление конфигурацией, распределение тем, арендаторы и т. д.
  • Обнаружение службы: в Pulsar это можно понимать как nginx. Он может работать со всем брокером только с одним URL-адресом. Конечно, он также может использовать собственное обнаружение службы. Первоначальный запрос от клиента на чтение, обновление или удаление темы отправляется посреднику, который может не обрабатывать эту тему. Если этот посредник не может обработать запрос темы, он перенаправит запрос посреднику, который может обработать запрос темы.

Будь то kafka, RocketMQ или наш пульсар, самое главное в качестве промежуточного программного обеспечения очереди сообщений, вероятно, делится на три части:

  • Как производитель производит сообщения и отправляет их в соответствующий брокер
  • Как брокер обрабатывает сообщение и будет эффективным и запрос
  • Как Consumer потребляет сообщения

И мы расширим эти три части позже.

Производитель создает сообщения

Давайте кратко рассмотрим, как отправлять сообщения с кодом:

PulsarClient client = PulsarClient.create("pulsar://pulsar.us-west.example.com:6650");

Producer producer = client.createProducer(
                "persistent://sample/standalone/ns1/my-topic");
 
// Publish 10 messages to the topic
for (int i = 0; i < 10; i++) {
    producer.send("my-message".getBytes());
}

  • Шаг 1: Сначала используйте наш URL-адрес для создания клиента. Этот URL-адрес является адресом обнаружения нашей службы. Если мы используем автономный режим, мы можем установить прямое соединение.
  • Шаг 2: Мы передаем параметр, похожий на URL-адрес, нам нужно передать его только для указания, в какой теме или пространстве имен мы создали:

Формат URL - это: {постоянный | непостоянный}: // арендатор / пространство имен / тема

сочинение значение
persistent/non-persistent Pulsar предоставляет постоянные и непостоянные темы. Если выбрана непостоянная тема, все сообщения сохраняются в памяти. Если брокер перезапустится, все сообщения будут потеряны. Если выбрана постоянная тема, все сообщения будут сохраняться на диске, перезапустите брокер, и сообщения можно будет использовать в обычном режиме.
tenant Как следует из названия, это тенант. Сначала pulsar использовался как промежуточное ПО, используемое всей компанией в Yahoo. Для топика нужно указать несколько слоев, а тенант — это один из слоев.
namespace Пространство имен можно рассматривать как второй уровень, такой как бизнес-группа заказов на платформе электронной коммерции.
topic Имя очереди сообщений
  • Шаг 3. Вызовите метод Send Send message, который также предоставляет метод sendasync для поддержки асинхронной передачи.

Среди трех вышеперечисленных шагов шаги 1 и 2 относятся к нашему этапу подготовки, который используется для создания клиента и создания производителя.Наша основная логика действительно находится в отправке, поэтому здесь я сначала упомяну несколько небольших вопросов, которые вы можете подумать. про другое Что делается в очереди сообщений, а потом сравните с пульсаром:

  • Будет ли он отправлен сразу после того, как мы вызовем send?
  • Если есть несколько разделов, как мне узнать, какому Брокеру я должен отправить?

режим отправки

Выше мы упомянули, что отправка делится на два режима: асинхронный и синхронный, но на самом деле режим синхронизации в пульсаре также является асинхронным режимом, в режиме синхронизации имитируется блокировка обратного вызова для достижения эффекта синхронизации. также используется в kafka. , но в RocketMQ все отправки действительно синхронны и будут запрашиваться непосредственно у брокера.

На основе этого режима пакетная отправка поддерживается как в pulsar, так и в kafka, а также прямая отправка в RocketMQ. Каковы преимущества пакетной отправки? Когда TPS, который мы отправляем, особенно высок, если мы напрямую подключаемся к брокеру каждый раз, когда отправляем, мы можем выполнять много повторяющихся действий, таких как сжатие, аутентификация, создание ссылок и т. д. Например, если мы отправляем 1000 сообщений, мы можем выполнить эту работу 1000 раз, а если отправлять пакетами, то эти 1000 сообщений объединяются в один запрос, который относительно сжимается и аутентифицируется только один раз.

Некоторые студенты могут спросить, не приведут к отправке большого количества времени. На самом деле, это не нужно беспокоиться, время по умолчанию каждые 1 мс отправляет пакет в пульсаре, или когда BatchSize по умолчанию будет отправлен на 1000, частота отправки все или быстро.

Отправить балансировку нагрузки

В очереди сообщений топик обычно расширяется по горизонтали.В pulsar и kafka он называется partition, а в RocketMQ называется queue.По сути, это раздел.Мы можем размещать разные разделы на разных брокерах для достижения эффекта нашего горизонтального расширение. .

Когда мы отправляем, мы можем сформулировать собственную стратегию выбора партиций или можем использовать ее для обучения стратегиям партиций по умолчанию. Когда мы выбираем раздел, как мы определяем, какой раздел соответствует какому брокеру?

Взгляните на картинку ниже:

  • Шаг 1: Вся наша информационная информация о сопоставлении разделов хранится в кеше zk и брокера.
  • Шаг 2: Мы можем получить отношения между разбиением и брокером, запросив брокера, и обновляя время.
  • Шаг 3: Каждая партиция в Pulsar абстрагируется в отдельный producter, это и Kafka, Rocketmq разные.В Kafka наверное выбирается выбрать партицию и потом перейти на батион соответствующий Адресу Брокера, потом Transfer. Pulsar упаковывает каждую партицию в Producer.Если не нужно обращать внимание на то, какой Broker ему соответствует, вся логика находится в коде Producter, и он более чистый.

сжатое сообщение

Сжатие сообщений является одним из средств оптимизации передачи информации. Обычно мы видим, что некоторые большие файлы предоставляются для загрузки в виде сжатого пакета. Мы также можем использовать эту идею в нашей очереди сообщений. 1000 записей могут иметь размер передачи 1М, но после сжатия могут быть только десятки кб, что повышает эффективность передачи нам и брокеру, но при этом наш ЦП тоже приносит убытки. Клиент Pulsar поддерживает несколько типов сжатия, таких как lz4, zlib, zstd, snappy и т. д.

client.newProducer()
    .topic(“test-topic”)
    .compressionType(CompressionType.LZ4)
    .create();

Broker

Далее, давайте поговорим о второй более важной частиBroker, В дизайне Брокера пульсар сильно отличается от всех остальных очередей сообщений, и именно из-за этого отличия он стал его характеристикой.

Разделение вычислений и хранилища

Прежде всего, давайте поговорим о его самой большой особенности: разделении вычислений и хранения. Мы говорили в начале, что Pulsar — ​​это следующее поколение очередей сообщений, что очень выгодно для его архитектурного дизайна. Будь то kafka или RocketMQ, все вычисления и хранилище размещаются на одной машине. У этого режима есть несколько недостатков:

  • Расширенная сложность: когда нам нужно расширить кластер, мы обычно из-за процессора или диска, которые влияют на одну из причин, но нам, возможно, придется подать заявку на конфигурацию процессора и диска, это очень хорошая машина, что приводит к пустой трате ресурсов. И kafka это будет расширено, также необходимо перенести данные, процесс очень сложный.
  • Несбалансированная нагрузка: когда в некоторых разделах слишком много данных, нагрузка на брокеры будет несбалансированной.Как показано на рисунке ниже, если в определенном разделе слишком много данных, это приведет к тому, что брокер (корабль) будет нести слишком много данных. , но другие брокеры могут простаивать

Архитектура разделения вычислений пульсара может очень хорошо решить эту проблему:

  • Для вычислений: то есть наш брокер, который обеспечивает чтение и запись очередей сообщений, не хранит никаких данных, stateless очень дружелюбен к нашему расширению, пока вашей машины достаточно, вы можете использовать его случайно. Расширение брокеров часто подходит для увеличения пропускной способности потребителей.Когда у нас есть предприятия или виды деятельности с высоким трафиком, такие как рекламные акции электронной коммерции, мы можем заранее расширить возможности брокеров.
  • Для хранения: то есть наш букмекер предоставляет только хранилище для очередей сообщений.Если есть требование по количеству сообщений, мы можем расширить букмекер, и нам не нужно мигрировать данные, так что расширение очень удобно.

хранилище сообщений

Анализ существительных:

На картинке выше показана архитектура чтения и письма Bookie.Сначала необходимо ввести несколько существительных:

  • Запись, Запись — это запись, хранящаяся в BookKeeper, которая содержит идентификатор записи, сущность записи и т.п.
  • Реестр можно считать, что реестр используется для хранения записей, а несколько последовательностей записей образуют реестр.
  • Журнал, по сути, является бухгалтерским журналом WAL (упреждающая запись), который используется для хранения журнала транзакций бухгалтера.Файл журнала имеет максимальный размер.По достижении этого размера будет создан новый файл журнала.
  • Журнал записей, файл, в котором хранится запись, книга — это логическое понятие, запись будет сначала агрегироваться по книге, а затем записываться в файл журнала записей. Точно так же журнал записей будет иметь максимальное значение, и после достижения максимального значения будет создан новый файл журнала записей.
  • Индексный файл, индексный файл книги, запись в книге записывается в файл журнала записей, индексный файл используется для индексации каждой книги в файле журнала записей, записи места хранения каждой книги в журнале записей и данных. в журнале записей Длина в файле журнала.
  • Хранилище метаданных, хранилище метаданных, используется для хранения метаданных, связанных с букмекерской конторой, например, какие бухгалтерские книги находятся в букмекерской конторе, а бухгалтер в настоящее время использует хранилище zk, поэтому перед развертыванием бухгалтерии требуется кластер zk.

Процесс написания общей архитектуры:

  • Шаг 1: Брокер инициирует запрос на запись и сначала записывает WAL на диск журнала. Друзья, знакомые с mysql, знают redolog. Журнал и redolog используются для восстановления данных, которые не являются постоянными.
  • Шаг 2: Затем запишите данные в индекс и бухгалтерскую книгу.Для поддержания производительности диск не будет записываться напрямую, но будет записываться кэш страниц, а затем диск будет обновляться асинхронно.
  • Шаг 3: подтвердите запись.

Процесс чтения таков:

  • Шаг 1: Сначала прочитайте индекс, конечно, сначала прочитайте кэш, а затем перейдите к диску.
  • Шаг 2: После получения индекса перейдите к регистратору записей, чтобы получить соответствующие данные в соответствии с индексом.

Как эффективно читать и писать?

В kafka, когда у нас есть больше тем, потому что у kafka есть одна тема и один файл, наш дисковый ввод-вывод изменится с последовательной записи на случайную запись. В RocketMq, несмотря на то, что несколько тем соответствуют одному файлу записи, так что запись становится последовательной записью, но наше чтение может легко привести к обновлению нашего кэша страниц различными перезаписями, что оказывает большое влияние на наш ввод-вывод. Таким образом, pulsar сделал много оптимизаций для этих проблем как при чтении, так и при записи:

  • Процесс записи: последовательная запись + кэширование страниц. В процессе записи все наши файлы являются независимыми дисками, а синхронно сбрасывается только Journal, Journal записывает файл journal-wal последовательно, и эффективность последовательной записи очень высока. Хотя и в реестре, и в индексе есть несколько файлов, мы асинхронно записываем только в кэш страниц и на диск, поэтому случайная запись не повлияет на нашу производительность.
  • Процесс чтения: кеш брокера + кеш букмекера. В пульсаре он очень дружелюбен к хвостовому чтению и в основном не использует io. В общем, наш потребитель сразу получит сообщение, отправленное производителем, поэтому эта часть является постоянной. После этого, он все равно существует как кеш в брокере.Конечно, даже если у брокера нет кеша (например, брокер только что создан), у нашего букмекера тоже будет свой кеш в memtable, что сокращает процесс чтения и io через несколько кешей.

Мы можем обнаружить, что в идеальном случае чтение и запись io полностью изолированы, поэтому в Pulsar легко поддерживать миллионы тем, но в наших kafka и RocketMQ это очень сложно.

Неограниченное потоковое хранилище

Тема на самом деле является потоком бухгалтерских книг (сегментом). Благодаря такому дизайну Pulsar не является простой системой очереди сообщений. Он также может заменить систему потоковой передачи, поэтому ее также называют собственной потоковой платформой, которая может заменить такие системы, как flink.Мы можем видеть поток событий (тема / раздел), множество памяти сегмента, но каждый сегмент, состоящий из входа, это можно рассматривать как сообщение, отправленное нашу партию, как правило, как запись.

Сегмент можно рассматривать как базовое измерение, которое мы записываем в файл.Данные одного и того же сегмента будут записаны в один и тот же файл, разные сегменты будут разными файлами, а сегменты между сегментами будут сохранены в метаданных.

Многоуровневое хранилище

В kafka и RocketMQ сообщения будут иметь определенное время хранения, так как диск будет иметь ограничения по объему, эта функция также предусмотрена в pulsar, но если вы хотите хранить свои сообщения постоянно, вы можете использовать иерархическое хранилище, мы можем поставить некоторые более старые данные регулярно обновляются в дешевом хранилище, таком как s3, тогда мы можем хранить наши очереди сообщений бесконечно долго.

репликация данных

Репликация данных в pulsar сильно отличается от kafka и RocketMQ. В других очередях сообщений другие реплики обычно активно синхронизируются, и обычно это время становится непредсказуемым. В pulsar аналогичный протокол qurom используется для предоставления группе доступного пула букмекеров а затем одновременно написать часть букмекера, пока возвращается частичный успех (обычно больше 1/2).

  • Размер ансамбля (E) определяет размер букмекерского пула, доступного для данной книги.
  • Напишите размер кворума (QW) Определяет количество букв, в которые пульсар пишет записи.
  • Ack Quorum Size (Qa) определяет количество букмекеров, которые должны быть подтверждены для записи.

Использование этого метода одновременной записи сделает репликацию данных более эффективной, особенно при наличии большого количества копий данных.

Consumer

Далее поговорим о последнем важном компоненте пульсараconsumer.

модель подписки

Режим подписки используется для определения того, как наши сообщения распространяются среди разных потребителей.Разное промежуточное программное обеспечение очереди сообщений имеет свой собственный режим подписки.Как правило, наши общие режимы подписки:

  • Кластерный режим: сообщение может быть использовано потребителями только в одном кластере.
  • Широковещательный режим: сообщение может быть использовано всеми потребителями в кластере.

В pulsar предусмотрено четыре режима подписки, а именно эксклюзивный, аварийное восстановление, совместное использование и совместное использование ключей:

  • Эксклюзивный: как следует из названия, им может владеть только один потребитель. Если в том же кластере есть второй потребитель для регистрации, второй не будет работать. Это относится к глобально упорядоченным сообщениям.
  • Аварийное восстановление: расширенная версия является эксклюзивной.Если эксклюзивная версия выйдет из строя, она автоматически переключится на другого хорошего потребителя, но она может быть эксклюзивной только для одного.
  • Общий режим: этот режим немного похож на кластерный режим, и сообщение может быть использовано потребителями только в одном кластере, но, в отличие от RocketMQ, RocketMQ разделен на разделы, и данные одного и того же раздела будут отправлены на одну машину. Потребление в Pulsar не будет зависеть от размера раздела, а будет обучать всех потребителей отправлять сообщения по очереди. Какая от этого польза? Если у вас есть 100 машин, но у вас есть только 10 разделов, вам нужно запустить только 10 потребителей, но в pulsar 100 машин можно использовать для обработки потребителей.
  • Совместное использование ключей: аналогично размеру раздела, упомянутому выше, последовательные сообщения одного и того же ключа в RocketMQ будут отправляться в раздел, но здесь не будет измерения раздела, а будет выделено только фиксированному потребителю в соответствии с хэшем ключа. , Решите проблему, заключающуюся в том, что потребительские возможности ограничены количеством разделов.

Режим получения сообщений

Будь то в kafka или в RocketMQ, мы все являемся клиентами, регулярно опрашивающими наших брокеров для получения сообщений.Этот режим называется режимом Long-Polling. Этот режим имеет недостаток, заключающийся в том, что сетевые накладные расходы относительно велики.Давайте рассчитаем задержку потребляемого потребителя.Мы предполагаем, что сетевая задержка между брокером и потребителем равна R, тогда наше общее время равно:

  • Когда некое сообщение A только что прибыло к брокеру, long-polling только что закончил упаковку данных и вернул их, и время, когда брокер возвращается к потребителю, равно R.
  • Потребитель снова отправляет запрос запроса, который снова является R.
  • Верните наше сообщение A потребителю здесь снова R.

Если мы рассмотрим только сетевую задержку, мы увидим, что задержка потребления нашего сообщения составляет около 3R, поэтому мы должны придумать что-то для ее оптимизации.Некоторые студенты могут сразу подумать, что наше сообщение будет передано непосредственно нам.Потребитель не правильно, на этот раз у нашей задержки будет только один R, это наш общий режим push, а вот простой режим push проблематичен, если у нас скорость добычи намного больше скорости потребления, то push Новость точно высохнет вспомню, это обратное давление. Так как же решить противодавление? Мы можем оптимизировать метод push и превратить его в динамический push.Мы объединяем Long-polling, чтобы информировать брокера об оставшемся пространстве в буфере, когда выполняется запрос long-polling, а брокер отвечает за отправку данных. В это время Брокер знает, какое максимальное количество фрагментов данных может быть передано, поэтому он может контролировать поведение при отправке, чтобы не перегружать Потребителя.

Например:

Когда Потребитель инициирует запрос, оставшаяся емкость буфера составляет 100, а Брокер возвращает максимум сообщений 32 за раз, Затем Брокер выполнит 3 нажатия (всего 96 сообщений) для этого запроса с длительным опросом Потребителю и вернуть ответ Потребителю (ответ содержит 4 сообщения).

Если используются длинномую модель, каждый раз, когда потребитель отправляет запрос в брокеру для выполнения ответа, этот пример требует 4 взаимодействия длительного опроса (4 запроса и 4 ответа в общей сложности 8 сетевых операций; динамический толчок / тяга 1 Запрос, три толкания и один ответ, в общей сложности 5 сетевых операций).

Поэтому pulsar использует этот режим получения сообщений для дальнейшей оптимизации времени поступления сообщений от потребительского уровня. Я думаю, что эта конструкция очень изобретательна, и режим длительного опроса многих промежуточных программ можно улучшить, обратившись к этой идее.

Суммировать

Многие идеи дизайна Apache Pulsar отличаются от других промежуточных программ, но это, несомненно, ближе к будущему.Я смело предсказываю, что будущее развитие некоторых других промежуточных программ сообщений также будет приближаться к нему.В настоящее время отечественные пользователи Pulsar также получают больше и многое другое Многие, Tencent Cloud предоставляет облачную версию TDMQ pulsar. Конечно, некоторые другие известные компании, такие как Huawei, Zhihu, Huya и т. д., предпринимают пошаговые попытки. Я считаю, что pulsar действительно тенденция. В конце концов, это также напомнило мне фразу из недавнего финала большой реки:

Все изменения могут сопровождаться болью и окольными путями.Открытая дорога не будет широкой и гладкой, но стремительное поступательное течение больших рек и рек не заблокируют ни пороги, ни рифы. Где Дао, хотя ко мне идут миллионы людей.

На самом деле я говорил здесь только о некоторых общих чертах, а более подробно вы можете ознакомиться со следующими учебными справочными материалами:

  • Прежде всего, вы можете пойти в документацию официального сайта Pulsar, чтобы сначала понять план.
  • Вы также можете обратить внимание на публичный аккаунт пульсара, и я буду каждый день публиковать несколько статей, связанных с пульсарами, которые, я думаю, очень хорошо написаны.
  • Вы можете отправиться на станцию ​​B, чтобы найти TGIP.Это член команды проекта, о котором pulsar будет рассказывать каждую неделю.Если вы хотите узнать, вы можете посмотреть это видео.
  • push or pull?: Блог Woohoo.cn на.com/Hz mark/afraid/currently…
  • Промежуточное ПО для обмена сообщениями для архитектурных решений - Pulsar:blog.CSDN.net/Sweet and Sour Fish 83/art IC…

Если вы считаете, что эта статья полезна для вас, то ваше внимание и пересылка - самая большая поддержка для меня, O(∩_∩)O: