Три основные организационные структуры MQ
1. КроликMQ
Функции компонентов RabbitMQ
-
Broker:
Экземпляр RabbitMQБрокер -
Virtual Host:
Веб хостинг.Эквивалентная база данных Mysql, На Брокере может быть несколько виртуальных хостов, и виртуальные хосты изолированы друг от друга. Каждый виртуальный хост имеет свои собственные очереди, обмены, привязки и механизмы разрешений. vhost должен быть указан при подключении, vhost по умолчанию — /. -
Exchange:
Обмен, который получает сообщения, отправленные производителями, и направляет их в очереди на сервере. -
Queue:
Очередь сообщений, используемая для хранения сообщений до тех пор, пока они не будут отправлены потребителям.это контейнер для сообщений. Сообщение может быть помещено в одну или несколько очередей. -
Banding:
Связывание отношения дляСвязь между очередями сообщений и обменами. через ключ маршрутизации (Routing Key) связывает обмен с очередью сообщений. -
Channel:
труба, аДвунаправленный канал потока данных. Будь то публикация сообщения, подписка на очередь или получение сообщения, все эти действия выполняются через конвейер. Поскольку установление и уничтожение TCP очень дорого обходится операционной системе, для повторного использования TCP-соединения введена концепция каналов. -
Connection:
TCP-соединение между производителем / потребителем и брокером. -
Publisher:
Производитель сообщения. -
Consumer:
потребитель сообщения. -
Message:
сообщение, которое состоит из заголовка сообщения и тела сообщения. Заголовок сообщения включаетRouting-Key,Priority(приоритет) и т. д.
Несколько типов переключателей для RabbitMQ
Exchange
Для распространения сообщенийQueue
час,Exchange
Типы соответствуют разным стратегиям распространения, и существует 3 типаExchange
:Direct,Fanout,Topic.
-
Direct: в сообщении
Routing Key
И еслиBinding
серединаRouting Key
точно так же,Exchange
Сообщение будет распределено в соответствующую очередь.
-
Fanout: Каждое сообщение, отправленное на коммутатор типа Fanout, будет распределено по всем связанным очередям. Переключатели Fanout не имеют
Routing Key
.Это самый быстрый способ пересылки сообщений среди трех типов коммутаторов..
-
Topic: Тематические переключатели распределяют сообщения посредством сопоставления с образцом,
Routing Key
соответствовать образцу. он распознает только дваподстановочный знак:"#"и"*".#
Соответствует нулю или более словам,*
Сопоставьте 1 слово.
TTL
TTL (Время жить): время жить. RabbitMQ поддерживает время истечения срока действия сообщения, всего 2 типа.
-
Укажите, когда сообщение будет отправлено. Настроив тело сообщения
Properties
, вы можете указать время истечения срока действия текущего сообщения. - Укажите при создании Exchange. Рассчитывается из очереди входящих сообщений, если больше времени ожидания конфигурации очереди, сообщение автоматически очищается.
Механизм подтверждения сообщения производителя
Подтвердить механизм
- Подтверждение сообщения означает, что после того, как производитель доставит сообщение, если брокер получит сообщение, он передаст нам ответ производителю.
- Производитель принимает ответ, чтобы подтвердить, что сообщение отправлено брокеру нормально.Этот метод такжеОсновная гарантия надежной доставки сообщений!
Как реализовать подтверждающее сообщение Confirm?
-
Включить режим подтверждения на канале:
channel.confirmSelect()
-
Включить прослушивание на канале:
addConfirmListener
, отслеживать результаты обработки успешных и неудачных попыток и повторно отправлять сообщение или записывать обработку журнала и другие последующие операции в соответствии с конкретными результатами.
Механизм возврата сообщений
Return ListenerИспользуется для обработки некоторых немаршрутизируемых сообщений.
Наш производитель сообщений отправляет сообщения в определенную очередь, указав обмен и маршрутизацию, а затем наши потребители прослушивают очередь для операций потребления и обработки сообщений.
Но в некоторых случаях, если текущий обмен не существует или указанный ключ маршрутизации не может быть маршрутизирован, когда мы отправляем сообщение, в это время нам нужно отслеживать это недостижимое сообщение, и нам нужно использовать Return Listener.
В базовом API есть ключевой элемент конфигурацииMandatory
: Если true, прослушиватель получит сообщение о недоступности маршрута и обработает его. Если false, брокер автоматически удалит сообщение.
Точно так же, наблюдая,chennel.addReturnListener(ReturnListener rl)
Передайте ReturnListener, который переопределил метод handleReturn.
Потребительские ACK и NACK
Когда потребление потребляется, если бизнес-исключение может регистрировать запись, затем компенсацию. Но нам нужна серьезная проблема, такая как время простоя сервера.Ручной ACKГарантия успеха потребления на стороне потребителя.
// deliveryTag:消息在mq中的唯一标识
// multiple:是否批量(和qos设置类似的参数)
// requeue:是否需要重回队列。或者丢弃或者重回队首再次消费。
public void basicNack(long deliveryTag, boolean multiple, boolean requeue)
Как указано выше, сообщение находится вПотребитель возвращается в очередьЭто возврат сообщения брокеру, если сообщение не было успешно обработано. Как правило, в практических приложениях очередь на повторный вход закрыта (Избегайте входа в бесконечный цикл), то есть установить в false.
Очередь недоставленных сообщений DLX
死信队列(DLX Dead-Letter-Exchange):
Когда сообщение становится недоставленным в одной очереди, оно будет перемещено обратно в другую очередь, которая является очередью недоставленных сообщений.
DLX — это тоже обычный Exchange, ничем не отличающийся от обычного Exchange, его можно указать на любой очереди, то есть фактически задать свойства очереди.
Когда в этой очереди есть недоставленное письмо, RabbitMQ автоматически повторно опубликует сообщение в заданном Exchange, а затем перенаправит его в другую очередь.
2. RocketMQ
Официальный продукт Alibaba для обмена сообщениями для Double Eleven, который поддерживает все службы обмена сообщениями Alibaba Group, подвергался тщательным испытаниям на высокую доступность и высокую надежность в течение более десяти лет и является основным продуктом цепочки транзакций Alibaba.
Ракета: Значение ракеты.
Основные концепции RocketMQ
Он имеет следующие основные понятия:Broker
,Topic
,Tag
,MessageQueue
,NameServer
,Group
,Offset
,Producer
а такжеConsumer
.
Ниже приводится подробное введение.
-
Broker: роль ретранслятора сообщений, ответственная засохранить сообщение, чтобы переслать сообщение.
-
- BrokerЭто сервер, который предоставляет определенные услуги.Один узел-брокер поддерживает длительное соединение и пульсацию со всеми узлами NameServer и периодически отправляетTopicИнформация регистрируется на NameServer, между прочим, базовая связь и соединениеРеализация на основе Nettyиз.
-
- BrokerОтвечает за хранение сообщений, поддерживает облегченные очереди с темами в виде широты, одна машина может поддерживать десятки тысяч очередей и поддерживать модель push-pull сообщений.
-
- Данные показывают, что официальная линия:Возможность накопления сотен миллионов сообщений, а также можетСтрого гарантируем порядок сообщений.
-
-
Topic:тема! Это тип сообщения первого уровня. Например, систему электронной коммерции можно разделить на: новости транзакций, новости логистики и т. д. Сообщение должно иметь Тему.TopicОтношения между производителями и потребителями очень свободны.Тема может иметь 0, 1 или несколько производителей, отправляющих сообщения в нее, и производитель также может отправлять сообщения в разные темы одновременно. Тема также может быть подписана 0, 1 или несколькими потребителями.
-
Tag:Этикетка! Думайте об этом как о подтеме, который является типом сообщения второго уровня, используемым для предоставления пользователям дополнительной гибкости. Используя теги, сообщения с разными целями одного и того же бизнес-модуля могут использовать одну и ту же тему, но разныеTagидентифицировать. Например, сообщения о транзакциях можно разделить на: сообщения о создании транзакций, сообщения о завершении транзакций и т. д. Сообщение может не иметьTag. Теги помогают поддерживать чистоту и согласованность кода, а также могут предоставлятьRocketMQПредоставляемая система запросов помогает.
-
MessageQueue: в теме можно установить несколько очередей сообщений.При отправке сообщения выполняется тема сообщения, и RocketMQ будет опрашивать все очереди в теме для отправки сообщения. Физическая единица управления сообщением. В теме может быть несколько очередей.Введение очередей позволяет распределять и группировать хранение сообщений, а также имеет возможность горизонтального масштабирования.
-
NameServer: аналогично Zookeeper в Kafka, но между кластерами NameServer.нет связиДа, больше, чем ЗКЛегкий. Он в основном отвечает за управление исходными данными, включаяTopicи управление маршрутной информацией. Каждый брокер будет регистрироваться на NameServer при запуске, а Producer будет переходить на NameServer в соответствии с темой перед отправкой сообщения.Получить информацию о маршрутизации соответствующего брокераПотребитель также будет периодически получать информацию о маршрутизации Темы.
-
Producer: Производитель, поддерживает три способа отправки сообщений:Синхронный, асинхронный и односторонний
-
-
单向发送
: после отправки сообщения вы можете продолжить отправку следующего сообщения или выполнить бизнес-код, не дожидаясь ответа сервера инет функции обратного вызова.
-
-
-
异步发送
: после отправки сообщения вы можете продолжить отправку следующего сообщения или выполнить бизнес-код, не дожидаясь ответа сервера.имеет функцию обратного вызова.
-
-
-
同步发送
: после отправки сообщения подождите, пока сервер ответит успешно или с ошибкой, прежде чем продолжить следующие операции.
-
-
-
Consumer: потребитель, поддержка
PUSH
иPULL
Два режима потребления, поддержкаПотребление кластераишироковещательное потребление-
-
集群消费
: в этом режиме кластер потребителей использует несколько очередей темы вместе, и очередь может использоваться только одним потребителем.Если потребитель умирает, другие потребители в группе будут продолжать потреблять вместо приостановленного потребителя.
-
-
-
广播消费
: он будет отправлен каждому потребителю в группе потребителей для потребления. эквивалентноRabbitMQМодель публикации-подписки.
-
-
-
Group: группа, группа может подписаться на несколько тем. Делится на ProducerGroup и ConsumerGroup, которые представляют определенный тип производителей и потребителей.В общем случае, одна и та же служба может использоваться как группа, и одна и та же группа обычно отправляет и потребляет одни и те же сообщения.
-
Offset:существуетRocketMQВсе очереди сообщений являются постоянными структурами данных с бесконечной длиной.Так называемая бесконечная длина означает, что каждая единица хранения в очереди имеет фиксированную длину, а доступ к единице хранения осуществляется с использованием смещения.Смещение - это тип java long, 64 бит, теоретически не переполнится за 100 лет, поэтому считается, что он имеет бесконечную длину. Очередь сообщений также можно рассматривать как массив бесконечной длины,Offsetявляется индексом.
Отложенное сообщение
Версия RocketMQ с открытым исходным кодом не поддерживает произвольную точность времени, а поддерживает только определенные уровни, такие как время 5 с, 10 с, 1 мин и т. д. Среди них уровень = 0 означает отсутствие задержки, уровень = 1 означает задержку уровня 1, уровень = 2 означает задержку уровня 2 и так далее.
Уровни задержки следующие:
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
Последовательное сообщение
Порядок сообщений означает, что сообщения могут потребляться в том порядке, в котором они были отправлены (FIFO). RocketMQ может строго гарантировать порядок сообщений, который можно разделить на分区有序
или全局有序
.
сообщение о транзакции
Message Queue MQ предоставляет функции распределенных транзакций, аналогичные X/Open XA, а окончательная согласованность распределенных транзакций может быть достигнута с помощью сообщений транзакций очереди сообщений MQ. На приведенном выше рисунке показан общий поток сообщений о транзакциях: отправка и отправка обычных сообщений о транзакциях и компенсационный поток сообщений о транзакциях.
- Отправка и фиксация сообщения о транзакции:
- отправить половину сообщения
- Результат записи ответного сообщения сервера
- Выполнить локальную транзакцию по результату отправки (если запись не удалась, полусообщение в это время невидимо для бизнеса, и локальная логика не выполняется);
- Выполните фиксацию или откат в соответствии с состоянием локальной транзакции (операция фиксации создает индекс сообщения, и сообщение видно потребителям).
- Поток компенсации за сообщения о транзакциях:
- Сообщение с транзакцией не является Compart / Rotacback (Состояние ожидания сообщения), чтобы запустить «Проверка» на сервере;
- Производитель получает ответное сообщение и проверяет статус локальной транзакции, соответствующей ответному сообщению.
- Повторная фиксация или откат в соответствии с локальным статусом транзакции
Среди них фаза компенсации используется для устранения тайм-аута или отказа сообщения Commit или Rollback.
- статус сообщения о транзакции:
Сообщения о транзакциях имеют три состояния: состояние фиксации, состояние отката и промежуточное состояние.
- TransactionStatus.comMittransaction: совершает транзакцию, которая позволяет потребителям потреблять это сообщение.
- TransactionStatus.RollbackTransaction: транзакция отката, что означает, что сообщение будет удалено и не будет разрешено для использования.
- TransactionStatus.Unkonwn: промежуточное состояние, которое представляет необходимость проверки очереди сообщений для определения статуса сообщения.
Механизм высокой доступности RocketMQ
RocketMQ по своей природе распределен и может быть сконфигурирован как master-slave и с горизонтальным расширением.
Роль Broker of the Master поддерживает чтение и запись, а роль Broker of the Slave поддерживает только чтение, то есть Producer может подключаться к Broker роли Master только для написания сообщений; Consumer может подключаться к Broker роли Роль Мастера, либо ее можно подключить к Роли Брокера ведомой для считывания информации.
Высокая доступность потребления сообщений (главный-подчиненный)
В конфигурационном файле Потребителя нет необходимости указывать, читать с Мастера или со Слейва, когда Мастер недоступен или занят, Потребитель будет автоматически переключаться на чтение с Слейва. С механизмом автоматического переключения Потребителей, когда машина с ролью Мастера выходит из строя, Потребитель все еще может читать сообщения от Ведомого, не затрагивая программу Потребителя. Это обеспечивает высокую доступность со стороны потребителя.RocketMQ в настоящее время не поддерживает раб для автоматического передачи на мастер, если ресурсов машины недостаточно и нужно превратить Slave в Master, необходимо вручную остановить Брокер роли Slave, изменить конфигурационный файл и запустить Брокер с новым конфигурационным файлом.
Высокая доступность отправки сообщений (настройка нескольких мастер-узлов)
При создании темы создайте несколько очередей сообщений темы в нескольких группах посредников (компьютеры с одинаковым именем посредника и разными идентификаторами брокера образуют группу посредников), чтобы, когда мастер группы посредников недоступен, мастера других групп по-прежнему доступен, производитель все еще может отправлять сообщения.
репликация master-slave
Если в группе брокеров есть ведущий и подчиненный, сообщения необходимо реплицировать от главного к подчиненному.Существует два метода репликации: синхронный и асинхронный.
-
синхронная репликация: Метод синхронной репликации заключается в том, чтобы дождаться успешной записи ведущего и ведомого устройств, прежде чем сообщать об успешном состоянии записи клиенту. В случае сбоя ведущего все резервные данные остаются у ведомого, а простое восстановление синхронной репликации увеличит задержку записи данных и снизит пропускную способность системы.
-
Асинхронная репликация: Метод асинхронной репликации заключается в том, что пока мастер выполняет успешную запись, он может сообщать клиенту о состоянии успешной записи. В режиме асинхронной репликации система имеет более низкую задержку и более высокую пропускную способность, но в случае сбоя ведущего некоторые данные могут быть потеряны, поскольку они не записываются на ведомое устройство.
通常情况下,应该把Master和Save配置成同步刷盘方式,主从之间配置成异步的复制方式,这样即使有一台机器出故障,仍然能保证数据不丢,是个不错的选择。
балансировки нагрузки
Балансировка нагрузки производителя
На стороне производителя, когда каждый экземпляр отправляет сообщение, по умолчанию онголосованиеВсе очереди сообщений отправляются таким образом, чтобы сообщения попадали в разные очереди равномерно. Поскольку очереди могут быть разбросаны по разным брокерам, сообщения отправляются разным брокерам, как показано на следующем рисунке:
Балансировка потребительской нагрузки
Если количество экземпляров потребителей больше, чем общее количество очередей сообщений,Дополнительные экземпляры-потребители не смогут быть назначены в очередь., он не сможет использовать сообщение и не сможет выполнять роль распределения нагрузки. Следовательно, необходимо контролировать, чтобы общее количество очередей было больше или равно количеству потребителей.
- Режим кластера потребителей — запуск нескольких потребителей обеспечивает балансировку нагрузки потребителей (одинаково разделенные очереди).
- По умолчанию используются амортизированные очереди.: Очередь будет равномерно распределяться между каждым экземпляром в соответствии с количеством очередей и количеством экземпляров, чтобы каждый потребитель мог поровну разделить очередь потребления, как показано на следующем рисунке с 6 очередями и тремя производителями.
- Другой алгоритм усредненияКруговая очередь очередиВ форме каждый потребитель совместно использует очередь сообщений разных мастер-узлов, как показано на следующем рисунке:
Широковещательный режим не является сбалансированным по нагрузке, требуя, чтобы сообщение было доставлено всем экземплярам потребителей в группе потребителей, поэтому не существует такой вещи, как амортизированное потребление сообщений.
очередь недоставленных сообщений
Когда сообщение не может быть использовано, RocketMQ автоматически повторяет попытку сообщения. Если сообщение превышает максимальное количество повторных попыток, RocketMQ сочтет это сообщение проблемой. Но в это время RocketMQ не будет сразу отбрасывать проблемное сообщение, а отправит его в специальную очередь, соответствующую этой группе потребителей: очередь недоставленных сообщений. Имя очереди недоставленных сообщений%DLQ%+ConsumGroup
Очереди недоставленных сообщений обладают следующими свойствами:
- Очередь недоставленных сообщений соответствует идентификатору группы, а не отдельному экземпляру потребителя.
- Если идентификатор группы не генерирует недоставленное сообщение, очередь сообщений RocketMQ не создаст для него соответствующую очередь недоставленных сообщений.
- Очередь недоставленных сообщений содержит все недоставленные сообщения, сгенерированные соответствующим идентификатором группы, независимо от того, к какой теме относится сообщение.
3. Кафка
Kafka — это распределенная, разделенная на несколько реплик,На основе зоопаркаСкоординированная распределенная система обмена сообщениями.
Его самая большая особенность заключается в том, что он может обрабатывать большие объемы данных в режиме реального времени для удовлетворения различных сценариев спроса: таких как системы пакетной обработки на основе Hadoop, системы реального времени с малой задержкой, потоковые механизмы Storm/Spark, журналы Web/Nginx, доступ журналы и службы сообщений ждут, используйтеНаписано на языке Scala. Проект с открытым исходным кодом верхнего уровня, принадлежащий Apache Foundation.
Первый взгляд на архитектурную схему Кафки
Основные концепции Кафки
В Kafka есть несколько основных концепций:Broker
,Topic
,Producer
,Consumer
,ConsumerGroup
,Partition
,Leader
,Follower
,Offset
.
-
Broker: Сообщение Обработка промежуточных программ, узел KAFKA - это брокер, один или несколько брокеров может образовывать кластер кафки
-
Topic: Kafka классифицирует сообщения по темам, и каждое сообщение, публикуемое в кластере Kafka, должно указывать тему.
-
Producer: производитель сообщений, клиент, который отправляет сообщения брокеру.
-
Consumer: потребитель сообщений, клиент, который читает сообщения от брокера
-
ConsumerGroup: каждый потребитель принадлежит к определенной группе потребителей, сообщение может быть получено несколькими различными группами потребителей, но только один потребитель в группе потребителей может использовать сообщение.
-
Partition: Физическая концепция, тема может быть разделена на несколько разделов, а внутренние сообщения каждого раздела упорядочены.
-
Leader: каждый раздел имеет несколько копий, только один является лидером, и лидер является разделом, отвечающим за чтение и запись данных.
-
Follower: Послеведок следует за лидером, все запросы на запись направляются через лидер, изменения данных транслируются всем подписчикам, а подписчики и лидера хранят синхронизированные данные. Если лидер не удается, новый лидер избирается из последователей. Когда последователь и лидер зависают, застряли, или синхронизируйте слишком медленно, лидер удалит последователь от
ISR列表
Удалите его и заново создайте Последователя. -
Offset: Компенсировать. Файлы хранилища kafka названы в соответствии с offset.kafka, Преимущество использования смещения в качестве имени заключается в облегчении поиска. Например, если вы хотите найти адрес 2049, просто найдите файл 2048.kafka.
Тема, Раздел и Брокер могут быть поняты следующим образом:
Тема представляет собой логический набор бизнес-данных. Например, сообщения об операциях, связанные с заказом, помещаются в тему заказа, а сообщения об операциях, связанные с пользователем, помещаются в тему пользователя. сообщения о заказе, вероятно, будут иметь очень большой объем, например, сотни гигабайт или даже уровень ТБ, если вы поместите так много данных на одну машину, возникнет проблема с ограничением емкости, тогда вы можете разделить несколько разделов в теме на хранить в осколках данных, разные разделы могут быть расположены на разных машинах, что эквивалентноРаспределенное хранилище.每台机器上都运行一个Kafka的进程Broker。
Общий контроллер ядра Kafka Контроллер
В кластере Kafka будет один или несколько брокеров, один из которых будет выбран контроллером (Kafka Controller), что можно понимать какBroker-Leader
, который отвечает за управление состоянием всех разделов и реплик во всем кластере.
- когда
Partition-Leader
В случае сбоя реплики контроллер отвечает за выбор новой ведущей реплики для раздела. - При обнаружении изменения в наборе ISR раздела контроллер несет ответственность за уведомление всех брокеров о необходимости обновления информации о своих метаданных.
- Когда тема увеличивает количество разделов, контроллер также несет ответственность за осведомленность о новом разделе другими узлами.
Механизм выбора контроллера
Когда кластер kafka запускается,选举的过程是集群中每个broker都会 尝试在zookeeper上创建一个 /controller 临时节点,zookeeper会保证有且仅有一个broker能创建成功
, брокер станет общим контроллером кластера.
Когда брокер роли контроллера выйдет из строя, временный узел zookeeper исчезнет, и другие брокеры в кластере всегда будут следить за временным узлом.Если временный узел исчезнет, они будут конкурировать за повторное создание временного узла, что является выбором механизм, о котором мы упоминали выше. Zookeeper гарантирует, что брокер станет новым контроллером. Брокер с идентификацией контроллера требует на одну ответственность больше, чем другие обычные брокеры.
- Мониторинг изменений, связанных с брокером. Добавьте BrokerChangeListener в узел /brokers/ids/ в Zookeeper для обработки изменений увеличения или уменьшения количества брокеров.
- Мониторинг изменений, связанных с темой. Добавьте TopicChangeListener в узел / Brokers / Topics в ZooKeeper для обработки изменений в сокращениях Topic; добавьте TopicDeletionListener в узел ZooKeeper / Admin / DELETE_TOPICS для обработки действия по удалению Topic.
- Прочтите Zookeeper, чтобы получить всю текущую информацию, связанную с темой, разделом и брокером, и управлять соответствующим образом.. Добавьте PartitionModificationsListener в узел /brokers/topics/ в Zookeeper, соответствующий всем темам, чтобы отслеживать изменения распределения разделов в теме.
- Обновите информацию о метаданных кластера и синхронизируйте ее с другими общими узлами брокера.
Механизм выбора лидера реплики раздела
Когда контроллер обнаружит, что брокер, в котором находится лидер раздела, не работает, он выберет первого брокера из списка ISR (в соответствии с параметром unclean.leader.election.enable=false) в качестве лидера (первый брокер помещается в список ISR первым), может быть реплика с наиболее синхронизированными данными), если параметр unclean.leader.election.enable равен true, это означает, что когда все реплики в списке ISR не работают, лидер может быть выбран из реплик, отличных от списка ISR. Этот параметр может улучшить доступность, но у нового избранного лидера может быть гораздо меньше данных. Есть два условия для того, чтобы копия попала в список ISR:
- Узлы-реплики не могут генерировать разделы и должны иметь возможность поддерживать сеанс с zookeeper и подключаться к сети ведущих реплик.
- Реплика может реплицировать все записи на лидере и не может сильно отставать. (Реплики, отстающие в синхронизации с репликой-лидером, определяются конфигурацией replica.lag.time.max.ms. Реплики, которые не синхронизировались с лидером один раз больше этого времени, будут удалены из списка ISR)
Механизм записи смещения для потребления потребительских сообщений
Каждый потребитель будет регулярно отправлять смещение своего потребительского раздела во внутреннюю тему Kafka: Consumer_offsets. При отправке в прошлом ключом будет ConsumerGroupId+topic+номер раздела, а значением будет текущее значение смещения. Kafka будет периодически очищать сообщения в теме, и, наконец, сохранить последние данные
Поскольку __consumer_offsets может получать много одновременных запросов, Kafka по умолчанию выделяет 50 разделов (может быть установлено с помощью offsets.topic.num.partitions), которые могут противостоять большому параллелизму за счет добавления компьютеров.
Механизм перебалансировки потребителей
средства ребалансировкиЕсли количество потребителей в группе потребителей изменится или количество разделов-потребителей изменится, Kafka перераспределит отношения между потребителями и разделами-потребителями.. Например, если потребитель в группе потребителей зависнет, назначенные ему разделы будут автоматически переданы другим потребителям, а если он снова перезапустится, то часть разделов будет ему возвращена.
Уведомление: перебалансировка предназначена только для случая подписки, в которой не указано потребление раздела.Если раздел указан с помощью метода назначения потребления, кафка не будет выполнять перебалансировку.
Перебалансировку потребителей могут вызвать следующие ситуации:
- Потребители в группе потребителей увеличились или уменьшились
- Динамически добавлять разделы в темы
- Группа потребителей подписывается на другие темы
Во время процесса перебалансировки потребители не могут потреблять сообщения из Кафки, что повлияет на TPS кафка. Если есть много узлов в кластере кафки, такие как сотни, ребалансировка может занять много времени, поэтому старайтесь избегать его в Происходит восстановление пиковых систем.
Процесс ребалансировки выглядит следующим образом
Когда потребитель присоединяется к группе потребителей, потребитель, группа потребителей и координатор группы проходят следующие этапы.
Этап 1: Выбор координатора группы
Координатор группы GroupCoordinator: каждая группа потребителей выбирает брокера в качестве своего собственного координатора группы, который отвечает за мониторинг сердцебиения всех потребителей в этой группе потребителей и определение того, не работает ли он, а затем активирует перебалансировку потребителей. Когда каждый потребитель в группе потребителей запускается, он отправляет запрос FindCoordinatorRequest на узел в кластере kafka, чтобы найти соответствующий координатор группы GroupCoordinator и установить с ним сетевое соединение. Метод выбора координатора группы: Для выбора раздела __consumer_offsets, в который должно быть отправлено смещение, потребляемое потребителем, можно использовать следующую формулу.Посредник, соответствующий лидеру этого раздела, является формулой координатора группы потребителей:hash(consumer group id) % 对应主题的分区数
Второй этап: присоединяйтесь к потребительской группе JOIN GROUP
После успешного поиска GroupCoordinator, соответствующий группе потребления, она входит в стадию присоединения к группе потребления. На этом этапе потребитель отправит жонгрубеперевенно-запрос к Группкоординатуру и обрабатывать ответ. Затем GroupCoordinator выбирает первый потребитель, чтобы присоединиться к группе из группы потребителей в качестве руководителя (координатор потребительских групп) и отправляет информацию о компании потребителей лидеру, а затем лидер будет нести ответственность за разработку плана раздела.
Третий этап (Sync Group)
Лидер-потребитель отправляет SyncGroupRequest координатору группы, а затем координатор группы отправляет схему раздела каждому потребителю, и они будут выполнять сетевое подключение и потребление сообщений в соответствии с брокером-лидером указанного раздела.
Стратегия распределения разделов Consumer Rebalance
Существует три основных стратегии ребалансировки:range
,round-robin
,sticky
.По умолчанию используется стратегия распределения диапазонов..
Предположим, что топик имеет 10 разделов (0-9), и теперь потребляют три потребителя:
диапазонная стратегия:按照分区序号排序分配
, предполагая, что n = количество разделов / количество потребителей = 3, m = количество разделов % количество потребителей = 1, тогда каждому из первых m потребителей выделяется n+1 раздел, а следующим (количество потребителей - m) потребление Каждый из них выделяет n разделов. Например, разделы с 0 по 3 назначаются потребителю, разделы с 4 по 6 назначаются потребителю, а разделы с 7 по 9 назначаются потребителю.
круговая стратегия:轮询分配
Например, раздел 0,3,6,9 к потребителю, раздел 1,4,7 к потребителю, раздел 2, 5, 8 к потребителю
липкая стратегия:Стратегия первоначального распределения похожа на циклическую, но при ребалансировке необходимо гарантировать следующие два принципа.
- Распределение перегородок должно быть максимально равномерным.
- Распределение разделов сохраняется таким же, как и последнее распределение, насколько это возможно.
Когда двое вступают в конфликт,Первая цель предпочтительнее для второй целиотметка. Таким образом, исходная стратегия распределения разделов может быть сохранена в максимальной степени. Например, для случая выделения первого диапазона, если третий потребитель повесит трубку, результат повторного использования стратегии прилипания будет следующим: потребителю1 будет выделено еще 7 в дополнение к исходным 0~3, потребителю2 будет выделено еще одно 7 в дополнение к исходным 4~6 перераспределит 8 и 9
Анализ механизма публикации сообщения производителем
-
метод записи
Производитель использует режим push для публикации сообщений брокеру, и каждое сообщение добавляется к разделу, который принадлежит диску последовательной записи (Последовательная запись на диск более эффективна, чем случайная запись, что обеспечивает пропускную способность kafka.).
-
Маршрутизация сообщений
Когда производитель отправляет сообщение брокеру, он выбирает, в каком разделе его хранить, в соответствии с алгоритмом раздела. Его механизм маршрутизации:
- Если указан раздел, используйте его напрямую;
- Если патишен не указан, но указан ключ, передать
hash(key)%分区数
Вычислить паттерн маршрута.Если ни патишен, ни ключ не указаны, используйте опрос для выбора патишена.
-
процесс записи
- Производитель сначала находит лидера раздела из узла "/brokers/.../state" zookeeper.
- Производитель отправляет сообщение лидеру
- Лидер записывает сообщение в локальный журнал
- Подписчики получают сообщения от лидера и отправляют ACK лидеру после записи в локальный журнал.
- После того, как лидер получает ACK всех реплик в ISR, он увеличивает HW (верхний водяной знак, смещение последней фиксации) и отправляет ACK производителю.
HW и ЛЕО
HW俗称高水位
, сокращение от HighWatermark, возьмите наименьшее значение LEO (log-end-offset) в ISR, соответствующее разделу, в качестве HW, и потребитель может использовать не более того места, где находится HW. Кроме того, у каждой реплики есть HW, а лидер и ведомый ответственны за обновление состояния своего HW.对于leader新写入的消息,consumer不能立刻消费,leader会等待该消息被所有ISR中的replicas同步后更新HW, 此时消息才能被consumer消费
.Это гарантирует, что, если Broker's Broker недействителен, сообщение все еще может быть получено от нового лидера слона. Для запросов на чтение от внутреннего брокера ограничений на аппаратное обеспечение нет.
хранилище сегментов журнала
Данные сообщений раздела Kafka хранятся в соответствующей папке, названной в честь имени темы + номера раздела.Сообщения хранятся в сегментах раздела, а сообщения каждого сегмента хранятся в разных файлах журнала.Кафка предусматривает, что Максимальный размер лог-файла сегмента — 1 Г. Цель этого ограничения — облегчить загрузку лог-файла в память для работы:
1 # 部分消息的offset索引文件,kafka每次往分区发4K(可配置)消息就会记录一条当前消息的offset到index文件,
2 # 如果要定位消息的offset会先在这个文件里快速定位,再去log文件里找具体消息
3 00000000000000000000.index
4 # 消息存储文件,主要存offset和消息体
5 00000000000000000000.log
6 # 消息的发送时间索引文件,kafka每次往分区发4K(可配置)消息就会记录一条当前消息的发送时间戳与对应的offset到timeindex文件,
7 # 如果需要按照时间来定位消息的offset,会先在这个文件里查找
8 00000000000000000000.timeindex
9
10 00000000000005367851.index
11 00000000000005367851.log
12 00000000000005367851.timeindex
13
14 00000000000009936472.index
15 00000000000009936472.log
16 00000000000009936472.timeindex
Это число, например 9936472, представляет собой начальное смещение, содержащееся в файле сегмента журнала, что означает, что в этот раздел было записано как минимум около 10 миллионов фрагментов данных. В Kafka Broker есть параметр log.segment.bytes, который ограничивает размер каждого файла сегмента журнала, максимум — 1 ГБ. Когда файл сегмента журнала заполнен, новый файл сегмента журнала автоматически открывается для записи, чтобы предотвратить слишком большой размер одного файла и повлиять на производительность чтения и записи файла.Этот процесс называется свертыванием журнала. записанный называется активным сегментом журнала.
Наконец, прикрепите карту данных узла zookeeper.
Некоторые проблемы и решения, предложенные MQ
1. Как обеспечить последовательное потребление?
- RabbitMQ: Очередь, соответствующая Потребителю, может быть решена.
-
RocketMQ
- Глобальный порядок: в теме есть только одна очередь сообщений.
- Локальный заказ: в соответствии с алгоритмами маршрутизации, такими как
hash(key)%队列数
Получите индекс маршрутизации, чтобы убедиться, что упорядоченные сообщения направляются в одну и ту же MessageQueue.
-
Kafka:
- Глобальный порядок: В теме есть только один раздел.
- Локальный заказ: в соответствии с алгоритмами маршрутизации, такими как
hash(key)%分区数
Получите индекс маршрутизации, чтобы упорядоченные сообщения нужно было направлять в один и тот же раздел.
2. Как добиться задержанного потребления?
-
RabbitMQ: два варианта
- Очередь недоставленных сообщений + TTL
- Представляем плагин задержки для RabbitMQ
- RocketMQ: Встроенная поддержка отложенных сообщений.
-
Kafka: Действуйте следующим образом
- Создайте тему специально для задержки сообщения
- Создайте нового потребителя для использования этой темы
- сохранение сообщения
- Регулярно открывайте другую ветку, чтобы получить постоянное сообщение и поместить его в тему, которая будет фактически использована.
- Фактический потребитель потребления извлекает сообщение из фактической темы для потребления.
3. Как обеспечить надежную доставку сообщений
-
RabbitMQ:
-
Брокер--> Потребитель: ручной ACK
-
Производитель --> Брокер: два варианта
-
1. 数据库持久化
1.将业务订单数据和生成的Message进行持久化操作(一般情况下插入数据库,这里如果分库的话可能涉及到分布式事务) 2.将Message发送到Broker服务器中 3.通过RabbitMQ的Confirm机制,在producer端,监听服务器是否ACK。 4.如果ACK了,就将Message这条数据状态更新为已发送。如果失败,修改为失败状态。 5.分布式定时任务查询数据库3分钟(这个具体时间应该根据的时效性来定)之前的发送失败的消息 6.重新发送消息,记录发送次数 7.如果发送次数过多仍然失败,那么就需要人工排查之类的操作。
преимущество: Это может гарантировать 100% потерю сообщений.
недостаток: Первый шаг связан с проблемами распределенных транзакций.
-
-
2. 消息的延迟投递
-
流程图中,颜色不同的代表不同的message 1.将业务订单持久化 2.发送一条Message到broker(称之为主Message),再发送相同的一条到不同的队列或者交换机(这条称为确认Message)中。 3.主Message由实际业务处理端消费后,生成一条响应Message。之前的确认Message由Message Service应用处理入库。 4~6.实际业务处理端发送的确认Message由Message Service接收后,将原Message状态修改。 7.如果该条Message没有被确认,则通过rpc调用重新由producer进行全过程。
-
преимущество: По сравнению со схемой персистентности улучшена скорость отклика.
недостаток: Сложность системы немного высока.Если и сообщения терпят неудачу, и сообщения потеряны, механизм подтверждения все равно должен быть компенсирован.
- RocketMQ:
-
Производитель теряет данные
В процессе отправки Сообщения Брокеру Продюсер теряется из-за проблем с сетью, либо Сообщение приходит Брокеру, но есть проблема и оно не сохраняется. В ответ на эту проблему RocketMQ устанавливает для Producer три способа отправки сообщений:
-
同步发送
: Естественно гарантированная надежная доставка -
异步发送
: Вам необходимо настроить реализацию в соответствии с результатом ответа брокера в функции обратного вызова. -
单向发送
: Надежная доставка не может быть гарантирована
-
-
Брокер теряет данные
Брокер получил Сообщение и временно сохранил его в памяти, прежде чем Потребитель смог его воспринять, Брокер повесил трубку.
в состоянии пройти持久化
Установите для решения:
1. Установите постоянство при создании очереди, чтобы брокер сохранял метаданные очереди, но не сохранял сообщения в очереди.
2. Установите для параметра deliveryMode сообщения значение 2, сообщение может быть сохранено на диске, так что подтверждение Producer уведомления будет отправлено только после того, как сообщение будет сохранено на диске.
После этих двух шагов, даже если Брокер зависнет, Продюсер точно не получит акк, поэтому может повторно отправить
- Потребители теряют данные
Потребитель принял сообщение, но возникла внутренняя проблема, и сообщение не было обработано.Брокер считает, что потребитель его обработал и будет отправлять только последующие сообщения. В это время будет关闭autoack,消息处理过后,进行手动ack
, сообщение, которое не удалось использовать несколько раз, попадет в死信队列
, что требует ручного вмешательства.
- Kafka:
-
Производитель теряет данные
уже настроен
acks=all
, оно не будет потеряно.Требование состоит в том, чтобы ваш лидер получил сообщение, и все подписчики синхронизировались с сообщением, прежде чем вы сочтете, что написание этого письма выполнено успешно. Если это условие не выполняется, производитель будет автоматически и непрерывно повторять попытки бесконечное количество раз. -
Брокер теряет данные
Кафка посредником, а затем переизбрание лидера. Вдумайтесь, по прошествии этого времени если у другого фолловера просто какие-то данные не синхронизируются, то в результате на этот раз лидер зависнет, а потом избрание фолловера в лидера, не меньше каких-то данных? Он потерял некоторые данные ах.
В настоящее время обычно требуется установить как минимум следующие четыре параметра:
-
- установить тему
replication.factor
Параметр: это значение должно быть больше 1, что означает, что каждый раздел должен иметь как минимум 2 реплики.
- установить тему
-
- Настройка на сервере Kafka
min.insync.replicas
Параметр: это значение должно быть больше 1. Это требует от лидера, по крайней мере, осознавать, что есть по крайней мере один ведомый, который все еще находится в контакте с самим собой и не отстает, чтобы убедиться, что лидер повесил трубку и есть еще один последователь.
- Настройка на сервере Kafka
-
- Установить на стороне производителя
acks=all
: Это требует, чтобы каждый фрагмент данных был записан во все реплики, прежде чем он будет считаться успешно записанным.
- Установить на стороне производителя
-
- Установить на стороне производителя
retries=MAX
(очень большое значение, означающее бесконечный повтор): это требует бесконечных повторов после сбоя записи, и он застрял здесь.
- Установить на стороне производителя
Наша производственная среда настроена в соответствии с вышеуказанными требованиями.После такой настройки, по крайней мере, на стороне брокера Kafka, можно гарантировать, что при сбое брокера, на котором находится лидер, данные не будут потеряны при переключении лидера.
- Потребители теряют данные
Вы потребляете сообщение, а затем потребитель автоматически отправляет смещение, заставляя Kafka думать, что вы уже использовали сообщение, но на самом деле вы только собираетесь его обработать, и вы вешаете трубку, не успев его обработать. .
Разве это не похоже на RabbitMQ?Все знают, что Kafka будет автоматически отправлять смещения, так что покаОтключите смещение автоматической фиксации и отправьте смещение вручную после обработки, чтобы гарантировать, что данные не будут потеряны.Однако в это время еще может быть повторное потребление.Например, если вы только что закончили обработку и не сдали зачет, а сами повесились, вы обязательно повторите потребление один раз в это время, так что это хорошо для обеспечения идемпотентности.
4. Как обеспечить идемпотентность сообщений?
Взяв RocketMQ в качестве примера, сценарии, в которых сообщения повторяются, перечислены ниже:
1. Дублирование сообщений при отправке
Когда сообщение было успешно отправлено на сервер и сохраняется, происходит сбой в сети или клиент не работает, в результате чего сервер не отвечает клиенту. Если производитель понимает, что сообщение не удалось отправить, и пытается отправить сообщение снова, потребитель получит два сообщения с одинаковым содержимым и одним и тем же идентификатором сообщения.
2. Сообщение повторяется во время доставки
В сценарии потребления сообщения сообщение было доставлено потребителю и бизнес-обработка завершена.Когда клиент отправляет ответ на сервер, сеть отключается. Чтобы гарантировать, что сообщение будет потреблено хотя бы один раз, сервер очереди сообщений версии RocketMQ попытается снова доставить ранее обработанное сообщение после восстановления сети, и потребитель впоследствии получит два сообщения с одинаковым содержимым и тот же идентификатор сообщения.
3.Дублирование сообщений при балансировке нагрузки(включая, помимо прочего, дрожание сети, перезапуск брокера и перезапуск потребительского приложения)
Перебалансировка запускается, когда брокер или клиент версии RocketMQ перезапускает, расширяет или сжимает очередь сообщений, и потребители могут получать повторяющиеся сообщения.
Итак, каково решение? прямо над.
5. Как решить проблему задержки сообщений?
Есть несколько моментов, которые следует учитывать в этом вопросе:
1. Как быстро израсходовать накопившиеся сообщения?
Временно написать консьюмера для рассылки сообщений, а сообщения в бэклоге равномерно распределить по N очередям, при этом одна очередь соответствует одному консьюмеру, что эквивалентно увеличению скорости потребления в N раз.
до исправления:
После модификации:
2. Задержка слишком велика, что приводит к тому, что срок действия некоторых сообщений истекает. Что мне делать?
Пакетная перезагрузка. Когда дела не заняты, например ранним утром, заранее подготовьте программу, найдите потерянную партию сообщений и повторно импортируйте их в MQ.
3. Имеется большое количество отложенных сообщений, а диск MQ переполнен, поэтому новые сообщения не могут прийти, и большое количество сообщений теряется, что делать?
Это не путь. Кто сделал [Потребитель распределения сообщений]. Затем перейдите на второй план, а затем дополните данные ночью.