Глубокое погружение в Кафку

задняя часть Kafka
Глубокое погружение в Кафку

Эта статья подготовлена ​​технической командой OPPO Internet, укажите автора для перепечатки. В то же время приглашаем обратить внимание на нашу общедоступную учетную запись: OPPO_tech, чтобы поделиться с вами передовыми интернет-технологиями и деятельностью OPPO.

Kafka — это распределенная система публикации и обмена сообщениями на основе подписки с высокой пропускной способностью, высокой отказоустойчивостью, высокой надежностью и высокой производительностью, которая в основном используется в таких сценариях, как развязка приложений, сокращение пикового трафика и асинхронный обмен сообщениями.

Чтобы дать каждому более глубокое понимание внутреннего принципа реализации Kafka, в этой статье будут представлены хранение, удаление и извлечение сообщений из темы и журнала, затем представлен принцип реализации его механизма копирования и, наконец, представлена ​​реализация принцип производства и потребления, чтобы быть более разумным.применительно к реальному бизнесу.

Кроме того, эта статья длинная, рекомендуется ставить лайк и читать медленно :)

1. Введение

Kafka — это система распределенных публикаций и обмена сообщениями на основе подписки с мощными возможностями обработки сообщений.По сравнению с другими системами обмена сообщениями Kafka обладает следующими функциями:

  • Быстрое сохранение данных, который реализует возможность сохранения данных с временной сложностью O(1).
  • высокая пропускная способность, может достичь пропускной способности 10 Вт в секунду на обычном сервере.
  • Высокая надежность, сохранение сообщения и механизм системы копирования обеспечивают надежность сообщения, и сообщение может быть использовано несколько раз.
  • Высокое расширение, как и в других распределенных системах, все компоненты поддерживают распределенную и автоматическую балансировку нагрузки, что позволяет быстро и легко расширять систему.
  • Вычислительная мощность в автономном режиме и в режиме реального времени, который обеспечивает возможности обработки сообщений в режиме онлайн и офлайн.

Именно из-за этих превосходных характеристик он широко используется в таких сценариях, как развязка приложений, сглаживание пиков трафика и асинхронные сообщения, такие как промежуточное программное обеспечение сообщений, агрегация журналов, потоковая обработка и т. д.

В этой статье мы познакомим Кафку со следующих аспектов:

  1. В первой главе кратко представлены характеристики и преимущества Kafka как распределенной системы публикации сообщений и подписки.

  2. Во второй главе представлены темы и журналы системы kafka, а также рассказывается, как сообщения хранятся, извлекаются и удаляются.

  3. В третьей главе представлен механизм реплик kafka, чтобы понять, как добиться высокой надежности сообщений внутри kafka.

  4. В четвертой главе будет представлен алгоритм разделения сообщения и конкретная реализация идемпотентной функции со стороны вывода сообщения.

  5. В пятой главе будет понятна конкретная реализация группы потребления, механизма смещения потребления и перебалансировки с потребительской стороны сообщения.

  6. Последняя глава кратко резюмирует эту статью

2. Темы и журналы

2.1 Темы

Тема — это логическое понятие для хранения сообщений, которое можно просто понимать как набор сообщений, созданных пользователем. Тема в Kafka обычно имеет несколько подписчиков, которые потребляют сообщения соответствующей темы, а также может быть несколько производителей, которые пишут сообщения в тему.

Каждая тема может быть разделена на несколько разделов, и в каждом разделе хранятся разные сообщения. Когда сообщение добавляется в раздел, ему присваивается смещение смещения (увеличивающееся от 0), и оно гарантированно уникально в разделе Порядок сообщений в разделе гарантируется смещением, т. е. сообщения в одном и том же разделе упорядочены, как показано ниже.

Разные партиции одной темы будут размещены на разных нодах (брокерах), а кластер Kafka гарантированно будет иметь базу горизонтального расширения при партиционировании.

при условииnginx_access_logНапример, количество разделов равно 3, как показано на рисунке выше. Раздел логически соответствует журналу и физически соответствует папке.

drwxr-xr-x  2 root root 4096 10月 11 20:07 nginx_access_log-0/
drwxr-xr-x  2 root root 4096 10月 11 20:07 nginx_access_log-1/
drwxr-xr-x  2 root root 4096 10月 11 20:07 nginx_access_log-2/

Когда сообщение записывается в раздел, оно фактически записывается в ту же папку, что и раздел. Журнал разбит на несколько сегментов (Segment), каждый сегмент состоит из лог-файла и индексного файла, а размер каждого сегмента ограничен (в конфигурационном файле кластера kafkalog.segment.bytesКонфигурация, по умолчанию 1073741824 байта, то есть 1 Гб).Когда размер шарда превысит лимит, будет пересоздан новый шард, а запись внешних сообщений будет производиться только в последний шард (последовательный ввод-вывод).

-rw-r--r--  1 root root    1835920 10月 11 19:18 00000000000000000000.index
-rw-r--r--  1 root root 1073741684 10月 11 19:18 00000000000000000000.log
-rw-r--r--  1 root root    2737884 10月 11 19:18 00000000000000000000.timeindex
-rw-r--r--  1 root root    1828296 10月 11 19:30 00000000000003257573.index
-rw-r--r--  1 root root 1073741513 10月 11 19:30 00000000000003257573.log
-rw-r--r--  1 root root    2725512 10月 11 19:30 00000000000003257573.timeindex
-rw-r--r--  1 root root    1834744 10月 11 19:42 00000000000006506251.index
-rw-r--r--  1 root root 1073741771 10月 11 19:42 00000000000006506251.log
-rw-r--r--  1 root root    2736072 10月 11 19:42 00000000000006506251.timeindex
-rw-r--r--  1 root root    1832152 10月 11 19:54 00000000000009751854.index
-rw-r--r--  1 root root 1073740984 10月 11 19:54 00000000000009751854.log
-rw-r--r--  1 root root    2731572 10月 11 19:54 00000000000009751854.timeindex
-rw-r--r--  1 root root    1808792 10月 11 20:06 00000000000012999310.index
-rw-r--r--  1 root root 1073741584 10月 11 20:06 00000000000012999310.log
-rw-r--r--  1 root root         10 10月 11 19:54 00000000000012999310.snapshot
-rw-r--r--  1 root root    2694564 10月 11 20:06 00000000000012999310.timeindex
-rw-r--r--  1 root root   10485760 10月 11 20:09 00000000000016260431.index
-rw-r--r--  1 root root  278255892 10月 11 20:09 00000000000016260431.log
-rw-r--r--  1 root root         10 10月 11 20:06 00000000000016260431.snapshot
-rw-r--r--  1 root root   10485756 10月 11 20:09 00000000000016260431.timeindex
-rw-r--r--  1 root root          8 10月 11 19:03 leader-epoch-checkpoint

Сегмент содержит несколько лог-файлов с разными суффиксами. В качестве базового смещения сегмента будет использоваться смещение первого сообщения в сегменте. Фиксированная длина смещения — 20. Если недостаточно для заполнения 0, то как имя файла индексного файла, а также файла журнала, например00000000000003257573.index,00000000000003257573.log,00000000000003257573.timeindex, файлы с одинаковым именем образуют сегмент (без учета суффикса имени), за исключением.index,.timeindex,.logФайлы журналов, отличные от файлов журналов с суффиксом, имеют следующие значения:

тип файла эффект
.index Файл индекса смещения записывает отношение сопоставления , где относительное смещение представляет первое сообщение сегмента, начиная с 1, а начальный адрес представляет соответствующее сообщение об относительном смещении в файле shard.log, начиная с адрес
.timeindex Файл индекса временной метки, запись отношения сопоставления
.log лог-файл, в котором хранятся детали сообщения
.snaphot файл моментального снимка
.deleted Когда файл сегмента удаляется, суффикс .delete будет добавлен ко всем файлам сегмента, а затем будет добавлен суффикс .delete.delete-fileЗадача задерживает удаление этих файлов (в файле file.delete.delay.ms можно задать время отложенного удаления)
.cleaned Очистить журнал временных файлов
.swap Временные файлы после сжатия журнала
.leader-epoch-checkpoint

2.2 Индекс журнала

Первое знакомство.indexфайл, здесь с файлом00000000000003257573.indexНапример, во-первых, мы можем просмотреть содержимое индексного файла с помощью следующей команды.Мы видим, что структура вывода - .На самом деле, индексный файл хранит не смещение, а относительное смещение, например первое сообщение. Относительное смещение равно 0, а смещение ссылки добавляется при форматировании вывода. Как показано на рисунке выше, указывает, что относительное смещение фрагмента равно 114, а его смещение равно 3257573+114. , то есть 3257687, position Указывает, что соответствующее смещение находится в.logФизический адрес файла через.indexИндексный файл может получить физический адрес соответствующего смещения. Индекс создается с помощью разреженного индексирования, и это не гарантирует, что каждое сообщение в сегменте имеет отношение сопоставления в файле индекса (.timeindexИндекс аналогичен), в основном для экономии места на диске и в памяти, потому что файл индекса в конечном итоге будет отображаться в памяти.

# 查看该分片索引文件的前10条记录
bin/kafka-dump-log.sh --files /tmp/kafka-logs/nginx_access_log-1/00000000000003257573.index |head -n 10
Dumping /tmp/kafka-logs/nginx_access_log-1/00000000000003257573.index
offset: 3257687 position: 17413
offset: 3257743 position: 33770
offset: 3257799 position: 50127
offset: 3257818 position: 66484
offset: 3257819 position: 72074
offset: 3257871 position: 87281
offset: 3257884 position: 91444
offset: 3257896 position: 95884
offset: 3257917 position: 100845
# 查看该分片索引文件的后10条记录
$ bin/kafka-dump-log.sh --files /tmp/kafka-logs/nginx_access_log-1/00000000000003257573.index |tail -n 10
offset: 6506124 position: 1073698512
offset: 6506137 position: 1073702918
offset: 6506150 position: 1073707263
offset: 6506162 position: 1073711499
offset: 6506176 position: 1073716197
offset: 6506188 position: 1073720433
offset: 6506205 position: 1073725654
offset: 6506217 position: 1073730060
offset: 6506229 position: 1073734174
offset: 6506243 position: 1073738288

Например, просмотрите смещение как6506155Сообщение: Сначала найдите соответствующий фрагмент по смещению, фрагмент, соответствующий 65061, является00000000000003257573, а затем дихотомией в00000000000003257573.indexНайдите в файле наибольшее значение индекса не больше 6506155, получите , затем из00000000000003257573.logПозиция 1073707263 начинает последовательно сканироваться, чтобы найти сообщение со смещением 650155.

Начиная с версии 0.10.0.0, в Kafka добавлен новый файл журнала сегментов..timeindexИндексный файл, который может находить сообщения на основе меток времени. Точно так же мы можем использовать скриптkafka-dump-log.shПросмотр содержимого файла, проиндексированного по времени.

# 查看该分片时间索引文件的前10条记录
bin/kafka-dump-log.sh --files /tmp/kafka-logs/nginx_access_log-1/00000000000003257573.timeindex |head -n 10
Dumping /tmp/kafka-logs/nginx_access_log-1/00000000000003257573.timeindex
timestamp: 1570792689308 offset: 3257685
timestamp: 1570792689324 offset: 3257742
timestamp: 1570792689345 offset: 3257795
timestamp: 1570792689348 offset: 3257813
timestamp: 1570792689357 offset: 3257867
timestamp: 1570792689361 offset: 3257881
timestamp: 1570792689364 offset: 3257896
timestamp: 1570792689368 offset: 3257915
timestamp: 1570792689369 offset: 3257927

# 查看该分片时间索引文件的前10条记录
bin/kafka-dump-log.sh --files /tmp/kafka-logs/nginx_access_log-1/00000000000003257573.timeindex |tail -n 10
Dumping /tmp/kafka-logs/nginx_access_log-1/00000000000003257573.timeindex
timestamp: 1570793423474 offset: 6506136
timestamp: 1570793423477 offset: 6506150
timestamp: 1570793423481 offset: 6506159
timestamp: 1570793423485 offset: 6506176
timestamp: 1570793423489 offset: 6506188
timestamp: 1570793423493 offset: 6506204
timestamp: 1570793423496 offset: 6506214
timestamp: 1570793423500 offset: 6506228
timestamp: 1570793423503 offset: 6506240
timestamp: 1570793423505 offset: 6506248

Например, я хочу увидеть метку времени1570793423501Начальное сообщение: 1. Сначала найдите осколок, установите1570793423501Сравните с максимальной отметкой времени каждого сегмента (максимальная отметка времени — это время последней записи файла индекса времени, если время равно 0, берется время последней модификации сегмента журнала), пока оно не станет больше или равно1570793423501, поэтому файл индекса времени находится00000000000003257573.timeindex, чья максимальная временная метка15707934235052. Найти больше или равно по дихотомии1570793423501, то есть (6506240 — смещение, а относительное смещение — 3247667); чем относительное смещение 4. Из лог-файла00000000000003257573.logНачать сканирование с позиции 1073734174, найти не менее1570793423501Данные.

2.3 Удаление журнала

В отличие от другого промежуточного программного обеспечения сообщений, сообщения в кластере Kafka не будут удаляться из-за потребления или нет.Как и журнал, сообщение в конечном итоге будет помещено на диск, и предусмотрена соответствующая периодичность политики (через параметр log.retention. check.interval.ms установить, по умолчанию 5 минут) для выполнения операций удаления или сжатия (файл конфигурации брокераlog.cleanup.policyЕсли параметр "delete", выполняется операция удаления, а если "compact", выполняется операция уплотнения, и по умолчанию "delete").

2.3.1 Удаление журнала по времени

параметр По умолчанию инструкция
log.retention.hours 168 Время хранения журнала (часы)
log.retention.minutes никто Время хранения журнала (минуты), приоритет выше, чем часы
log.retention.ms никто Время хранения журнала (миллисекунды), приоритет выше минут

Когда время хранения сообщения в кластере превышает установленный порог (log.retention.hours, по умолчанию 168 часов, то есть семь дней), его нужно удалить. Здесь будет оцениваться, соответствует ли время сегмента условию удаления в соответствии с максимальной меткой времени журнала сегмента.Максимальная метка времени сначала выбирает последнюю индексную запись в индексном файле меток времени, и если соответствующее значение метки времени больше чем 0, будет выбрано значение, в противном случае время последней модификации.

Причина, по которой время последней модификации здесь не выбирается напрямую, заключается в том, чтобы избежать непреднамеренного изменения файла журнала сегментов, что может привести к неточному времени.

Если срок действия всех сегментов журнала в разделе истек, будет создан новый сегмент журнала в качестве файла записи для новых сообщений, а затем будет выполнен параметр удаления.

2.3.2 Удаление журнала на основе пробела

параметр По умолчанию инструкция
log.retention.bytes 1073741824 (т.е. 1G), по умолчанию не включено, то есть бесконечность Общий размер файла журнала, а не размер одного сегмента
log.segment.bytes 1073741824 (т. е. 1G) размер сегмента одного журнала

Сначала будет рассчитан размер удаляемого журнала.diff(totalSize-log.rentention.bytes), затем начните с самого старого сегмента, чтобы увидеть набор файлов, которые можно удалить (еслиdiff-segment.size>=0, условие удаления выполнено), и, наконец, выполняется операция удаления.

2.3.3 Удаление журнала на основе смещения начала журнала

В общем случае начальное смещение файла журнала (logStartOffset) будет равно baseOffset первого сегмента журнала, но его значение будет увеличиваться из-за запроса на удаление сообщения.Значение logStartOffset фактически является самым маленьким сообщением в наборе журналов. , Сообщения меньше этого значения будут удалены. Как показано на рисунке выше, мы предполагаем, что logStartOffset=7421048, и процесс удаления журнала выглядит следующим образом:

  • Перейдите от самого старого сегмента журнала, чтобы определить, меньше ли baseOffset следующего сегмента значение logStartOffset или равно ему. Если это так, его необходимо удалить, поэтому будет удален первый сегмент.
  • Следующий осколок осколка 2 имеет baseOffset=6506251
  • У следующего сегмента сегмента 3 есть baseOffset=9751854>7421048, поэтому сегмент 3 не будет удален.

2.4 Сжатие журнала

Предыдущее упоминание в качестве файла конфигурации брокераlog.cleanup.policyКогда значение параметра установлено значение «Compact», будет выполнена операция сжатия. Сжатие здесь отличается от сжатия в обычном смысле. Сжатие здесь означает, что только значение последней версии сообщения с тем же ключом Сохраняется, как показано на следующем рисунке. Перед сжатием смещение непрерывно увеличивается. После сжатия приращение смещения может не быть непрерывным, и только 5 записей сообщений сохраняются.

Каталог журналов Kafkacleaner-offset-checkpointфайл для записи смещения, которое было очищено в каждом разделе каждой темы, благодаря которому файл журнала в разделе можно разделить на две части:cleanУказывает, что он был сжат;dirtyУказывает, что сжатие не выполнялось, как показано на следующем рисунке (активный сегмент не будет участвовать в операции сжатия журнала, поскольку в файл будут записаны новые данные).

-rw-r--r--  1 root root    4 10月 11 19:02 cleaner-offset-checkpoint
drwxr-xr-x  2 root root 4096 10月 11 20:07 nginx_access_log-0/
drwxr-xr-x  2 root root 4096 10月 11 20:07 nginx_access_log-1/
drwxr-xr-x  2 root root 4096 10月 11 20:07 nginx_access_log-2/
-rw-r--r--  1 root root    0 9月  18 09:50 .lock
-rw-r--r--  1 root root    4 10月 16 11:19 log-start-offset-checkpoint
-rw-r--r--  1 root root   54 9月  18 09:50 meta.properties
-rw-r--r--  1 root root 1518 10月 16 11:19 recovery-point-offset-checkpoint
-rw-r--r--  1 root root 1518 10月 16 11:19 replication-offset-checkpoint

#cat cleaner-offset-checkpoint
nginx_access_log 0 5033168
nginx_access_log 1 5033166
nginx_access_log 2 5033168

Когда журнал сжат, он определяет приоритет сжатого журнала в соответствии с отношением грязной части данных к файлу журнала (cleanableRatio), а затем устанавливает отношение сопоставления ключа и смещения для грязной части данных. (сохраните максимальное смещение соответствующего ключа) и сохраните его в SkimpyoffsetMap, а затем скопируйте его.Для данных в сегменте сегмента сохраняются только сообщения, записанные в SkimpyoffsetMap, и размер соответствующих файлов журнала после сжатия будет уменьшен Во избежание появления слишком маленьких лог-файлов и индексных файлов, размер всех сегментов Shard не будет превышать установленный размерlog.segment.bytesзначение размера), на множество фрагментов после фрагмента пакета со сжатием журнала.

Как показано выше, не все сообщения сжимаются передclean checkpointЗначение равно 0, что указывает на то, что данные в этом разделе не были сжаты.После первого сжатия размер файла журнала каждого предыдущего раздела уменьшается, и размер файла журнала будет перемещен одновременно.clean checkpointЗначение смещения от позиции до конца этого сжатия. При втором сжатии первые два фрагмента {0,5 ГБ, 0,4 ГБ} будут объединены в группу, {0,7 ГБ, 0,2 ГБ} будут объединены в группу для сжатия и так далее.

Как показано на рисунке выше, основной процесс сжатия журнала выглядит следующим образом:

  1. рассчитатьdeleteHorizonMsЗначение: когда значение сообщения пусто, сообщение будет храниться в течение определенного периода времени и будет удалено при следующем сжатии журнала после истечения времени ожидания, поэтому оно будет рассчитано здесь.deleteHorizonMs, по этому значению определяется, что шард лога с пустым значением можно удалить. (deleteHorizonMs = clean部分的最后一个分片的lastModifiedTime - deleteRetionMs, deleteRetionMs настраивается через конфигурационный файл log.cleaner.delete.retention.ms, по умолчанию 24 часа).
  2. Определите диапазон смещения сжатой грязной части [firstDirtyOffset, endOffset): гдеfirstDirtyOffsetУказывает начальное смещение грязной воды, как правило, равноеclear checkpointценность,firstUncleanableOffsetУказывает минимальное смещение, которое не может быть очищено, которое обычно равно baseOffset активного сегмента, а затем пересекает сегмент журнала из позиции firstDirtyOffset и заполняет отношение сопоставления между ключом и смещением в SkimpyoffsetMap. заполнен или достигает верхнего пределаfirstUncleanableOffset, вы можете определить предел сжатия журналаendOffset.
  3. Группируйте фрагменты журнала в [logStartOffset, endOffset), а затем сжимайте их в соответствии с методом группировки.

3. Копировать

Kafka поддерживает избыточное резервное копирование сообщений, и вы можете установить количество копий соответствующей темы (--replication-factorКоличество копий темы настройки параметров можно указать при создании темы,offsets.topic.replication.factorУстановить тему потребления_consumer_offsetsКоличество реплик, по умолчанию 3), каждая реплика содержит одно и то же сообщение (но не полностью согласовано, данные слейв-копии могут немного отставать от мастер-копии). В наборе реплик каждого раздела одна реплика выбирается в качестве главной реплики (лидера), а остальные реплики являются подчиненными репликами.Все запросы на чтение и запись предоставляются главной репликой.Подчиненная реплика отвечает за синхронизацию данных главную реплику в свой собственный раздел.Если раздел, в котором расположена первичная реплика, выйдет из строя, будет переизбрана новая первичная реплика для предоставления внешних служб.

3.1 Набор ISR

Набор ISR (In-Sync Replica) представляет текущий доступный набор реплик.Лидерная реплика в каждом разделе будет поддерживать набор ISR этого раздела. Доступно здесь означает, что объем сообщений подчиненной реплики не сильно отличается от объема сообщений главной реплики.Реплики, добавленные в набор ISR, должны соответствовать следующим условиям:

  1. Узел, на котором находится реплика, должен поддерживать пульсацию с помощью ZooKeeper.
  2. Смещение последнего сообщения подчиненной реплики должно отличаться от смещения последнего сообщения главной реплики не более чем на установленное пороговое значение (replica.lag.max.messages) или LEO реплики отстает от LEO мастер-реплики не более чем на установленный порог (replica.lag.time.max.ms), официальная рекомендация использовать последнее суждение, и удалено в новой версии kafka0.10.0replica.lag.max.messagesпараметр.

Если условие не соответствует ни одному из вышеперечисленных из копии, будет предложена сбор ISR, когда она снова соответствует вышеуказанным условиям, будет повторно добавлена ​​в коллекцию. ISR вводится в основном синхронизированным с репликой каждого из двух схем асинхронных дефектов репликации (если есть синхронные копии реплики или тайм-аут замедляют общую производительность группы копий для разрешения; все копии, если только асинхронная копия, когда когда сообщению намного за главным копией главного копирования после переизбрания, он потеряет бы это сообщение)

3.2 HW&LEO

HW (High Watermark) – это специальная метка смещения. Когда потребитель потребляет, могут быть получены только сообщения меньшего размера, чем HW, а HW и последующие сообщения невидимы для потребителей. Это значение управляется первичной копией. Когда после всех ведомые реплики в наборе ISR вытащили указанное HW сообщение, главная реплика увеличит значение HW на 1, то есть укажет на следующее смещение смещения, что может обеспечить надежность сообщения перед HW.

LEO (Log End Offset) представляет собой следующее смещение последнего сообщения текущей реплики.Такая отметка есть у всех реплик.Если это мастер-реплика, то при добавлении к ней сообщения производителем ее значение будет +1. Его значение также увеличивается, когда ведомая реплика успешно получает сообщение от ведущей.

3.2.1 Обновление LEO и HW из реплики

Данные ведомой копии поступают из основной копии.Данные получаются путем отправки запроса на выборку в основную копию.Значение LEO подчиненной копии будет храниться в двух местах, одно из которых является узлом, где находится ведомая копия. ), другой — это узел, на котором расположена основная копия, и собственный узел сохраняет LEO В основном для обновления собственного значения HW, главная копия сохраняет LEO подчиненной копии для обновления своего HW. Когда подчиненная копия записывает новое сообщение, она увеличивает свой собственный LEO.Когда главная копия получает запрос на выборку от подчиненной копии, она сначала читает соответствующие данные из своего собственного журнала и обновляет данные, прежде чем вернуть их подчиненной Сохраненная ведомая копия значения LEO. Как только данные ведомой копии будут записаны, она попытается обновить свое собственное значение HW, сравнить LEO с возвращенным HW главной копии в ответе на выборку и принять минимальное значение в качестве нового значения HW.

3.2.2 Обновление основной копии LEO и HW

Первичная реплика обновляет собственное значение LEO при записи в журнал, аналогично вторичной реплике. Значение HW главной копии — это значение HW раздела, которое определяет видимость данных раздела, соответствующих потребителю.В следующих четырех случаях главная копия попытается обновить свое значение HW:

  • Реплика становится первичной: когда реплика становится первичной, кафка попытается обновить значение HW раздела.
  • Брокер аварийно завершает работу, и реплика удаляется из набора ISR: если происходит сбой узла брокера, он проверяет, затронут ли соответствующий раздел, а затем проверяет, нужно ли обновлять значение HW раздела.
  • Когда генератор записывает сообщение в первичную реплику: запись сообщения увеличивает его значение LEO и проверяет, нужно ли изменять значение HW в это время.
  • Когда первичная реплика получает запрос на выборку от вторичной реплики: первичная реплика попытается обновить значение HW раздела при обработке запроса на выборку от вторичной реплики.

Фронт должен попытаться обновить HW, но он может не обновиться.Основная копия сохраняет значение LEO подчиненной копии и собственное значение LEO.Здесь будет сравниваться значение LEO всех копий, соответствующих условиям, и будет выбран HW с наименьшим значением LEO и HW с наибольшим разделением value, где реплика, удовлетворяющая условию, — это реплика, удовлетворяющая одному из следующих двух условий:

  • Реплика находится в коллекции ISR
  • LEO реплики отстает от LEO первичной реплики не более чем на установленный порог (replica.lag.time.max.ms, по умолчанию 10s)

3.3 Сценарий потери данных

Как упоминалось ранее, будут проблемы, если полагаться только на HW для усечения журнала и оценки уровня воды.Как показано на рисунке выше, предполагается, что есть две копии A и B. Сначала A является главной копией, B является ведомой копией, а параметрыmin.insync.replicas=1, то есть, когда ISR имеет только одну копию, он также вернет успех:

  • В исходной ситуации два сообщения были записаны в главную копию A, что соответствует HW=1, LEO=2, LEOB=1, и одно сообщение было записано в подчиненную копию B, что соответствует HW=1, LEO= 1.
  • В это время реплика B инициирует запрос fetchOffset=1 к первичной реплике A, и первичная реплика обновляет LEOB=1 после получения запроса, указывая, что реплика B получила сообщение 0, а затем пытается обновить значение HW.min(LEO,LEOB)=1, то есть обновление не требуется, а затем на ведомую реплику B возвращается сообщение 1 и текущий раздел HW=1. После получения ответа от реплики B записывается лог и обновляется LEO=2, и затем обновляется его HW=1, хотя было написано Два сообщения, но значение HW нужно обновить до 2 в следующем раунде запросов.
  • В это время перезапустите с копии B. После перезапуска журнал будет усечен в соответствии со значением HW, то есть сообщение 1 будет удалено.
  • Передано на главную копию из копирования BA FetchOffset = 1 запрос, первичная копия A в это время, если нет ненормальности, второй шаг такой же, как без проблем, при условии, что основная копия также снижается, затем изменения от Реплика B будет главной копией.
  • Когда копия A будет восстановлена, она станет ведомой копией, и журнал будет усечен в соответствии со значением HW, то есть сообщение 1 будет потеряно, а затем сообщение 1 будет безвозвратно утеряно.

3.4 Сценарий несогласованности данных

Как показано на рисунке, предполагается, что имеются две копии A и B. Вначале A — главная копия, B — подчиненная копия, а параметрыmin.insync.replicas=1, то есть, когда ISR имеет только одну копию, он также вернет успех:

  • В начальном состоянии главная копия A записала два сообщения, соответствующие HW=1, LEO=2, LEOB=1, а подчиненная копия B также синхронизировала два сообщения, соответствующие HW=1, LEO=2.
  • В это время подчиненная копия B отправляет главной копии запрос fetchOffset=2.После получения запроса главная копия A обновляет раздел HW=2 и возвращает значение подчиненной копии B. Если подчиненная копия B уходит вниз в это время, значение HW будет вызвано сбоем записи.
  • Мы предполагаем, что главная копия A в это время также не работает, а подчиненная копия B сначала восстанавливается и становится главной копией.В это время происходит усечение журнала, сохраняется только сообщение 0, а затем служба предоставляется извне, предполагая что сообщение 1 написано извне (это сообщение, в отличие от предыдущего сообщения 1, разные сообщения отмечены разными цветами).
  • После того, как копия А будет запущена, она станет ведомой копией, и усечения журнала не произойдет, потому что HW=2, но сообщение, соответствующее смещению 1, на самом деле несовместимо.

3.5 Механизм эпохи лидеров

Значение HW используется для измерения успешности резервного копирования копии и основы усечения журнала в случае сбоя, что может привести к потере данных и несогласованности данных.Поэтому в новой версии Kafka (0.11) было введено понятие эпохи лидера. .0.0), а эпоха лидера представляет собой пару ключ-значение , где эпоха представляет собой номер версии главной копии лидера, который закодирован с 0. Когда лидер меняется один раз, он увеличивается на 1, а смещение представляет позицию, в которой основная копия версии эпохи записывает первое сообщение. , например, означает, что первая основная копия начинает запись сообщений со смещения 0, означает, что вторая основная копия копия имеет номер версии 1 и начинает писать сообщения со смещения 100, а основная копия будет сохранять информацию в кэше и регулярно записывать в файл контрольной точки, информация будет запрашиваться из кэша каждый раз, когда происходит переключение основной копии. это краткое введение в принцип работы эпохи лидеров:

  • Каждое сообщение будет содержать 4-байтовое значение номера эпохи лидера.
  • В каждом каталоге журналов будет создан файл последовательности эпох лидера для хранения номера версии основной копии и начального смещения.
  • Когда реплика становится первичной репликой, в конец файла последовательности эпохи лидера добавляется новая запись, и каждое новое сообщение становится новым значением эпохи лидера.
  • Когда копия простоя перезапускается, следующее:
    • Восстановите все лидер эпохи из лидера эпоха последовательности.
    • Отправьте запрос LeaderEpoch на главную реплику раздела, запрос содержит последнее значение эпохи лидера в файле последовательности эпох лидера подчиненной реплики.
    • Основная копия возвращает lastOffset ведущей эпохи, соответствующей ведомой копии.Возвращенное значение lastOffset делится на два случая.В одном случае требуется вернуть начальное смещение, которое на 1 больше, чем версия ведущей эпохи в запросе ведомой копии, а другой должен быть равен эпохе лидера в запросе напрямую.Возвращает значение LEO текущей первичной реплики.
    • Если смещение от ведущей эпохи подчиненной реплики больше, чем значение lastOffset, возвращенное от ведущей реплики, то значение последовательности эпохи лидера ведомой реплики будет поддерживаться в соответствии с главной репликой.
    • Сократите локальное сообщение от реплики до смещения LastOffset, возвращаемого первичной репликой.
    • Извлекайте данные из первичной реплики, начиная с реплики.
    • При получении данных, если значение эпохи лидера из копии сообщения, найденного в большем значении, чем их последняя эпоха лидера, это будет значение эпохи лидера, записанное в файл последовательности эпохи лидера, а затем продолжите синхронизацию файлов.

Посмотрите, как механизм Leader EPOCH позволяет избежать двух необычных сцен, упомянутых ранее.

3.5.1 Решение для сценария потери данных

  • Как показано на рисунке, когда ведомое устройство B перезапускается, оно отправляется на главную копию A.offsetsForLeaderEpochRequest, главная и подчиненная копии эпохи равны, тогда A возвращает текущий LEO=2, и в подчиненной копии B нет смещения больше 2, поэтому усечение не требуется.
  • Когда ведомое устройство B отправляет запрос fetchoffset=2 на главную копию A, A отключается, поэтому ведомое устройство B становится главной копией, и значение эпохи обновляется до , а значение HW обновляется до 2.
  • Когда A восстанавливается и становится подчиненной копией, отправляет запрос fetchOffset=2 на B, и B возвращает HW=2, затем подчиненная копия A обновляет HW=2.
  • Первичная реплика B принимает внешние запросы на запись и постоянно инициирует запросы на синхронизацию данных от реплики A к первичной реплике A.

Из вышеизложенного видно, что введение значения эпохи лидера позволяет избежать упомянутой выше потери данных, но здесь следует отметить, что если на первом шаге выше реплика B отправляется на первичную реплику A после вставания.offsetsForLeaderEpochRequestЕсли запрос завершится неудачно, то есть основная копия A также будет отключена в то же время, то сообщение 1 будет потеряно, как указано в следующих сценариях несогласованности данных.

3.5.2 Разрешение сценариев несогласованности данных

  • Отправить на первичную реплику A после восстановления из реплики BoffsetsForLeaderEpochRequestЗапрос, так как основная копия также не работает, копия B станет основной копией и обрежет сообщение 1, а затем получит запись нового сообщения 1.
  • После восстановления реплика А становится подчиненной репликой и отправляет на главную реплику А.offsetsForLeaderEpochRequestRequest, запрошенное значение эпохи меньше, чем у основной копии B, поэтому основная копия B вернет начальное смещение, когда epoch=1, то есть lastoffset=1, поэтому подчиненная копия A урежет сообщение 1.
  • Реплика A извлекает сообщения из первичной реплики B и обновляет значение эпохи .

Видно, что введение эпохи позволяет избежать несогласованности данных, но если обе копии не работают, сценарий потери данных остается.Все предыдущие обсуждения основаны наmin.insync.replicas=1Исходя из данных, необходимо найти компромисс между надежностью и скоростью данных.

4. Производители

4.1 Выбор раздела сообщений

Роль производителя в основном заключается в создании сообщений и хранении сообщений в разделе соответствующей темы в Kafka Конкретный раздел, в котором должно храниться сообщение, определяется следующими тремя стратегиями (приоритет сверху вниз, уменьшается по очереди):

  • Если при отправке сообщения указан раздел, которому принадлежит сообщение, оно будет отправлено непосредственно в указанный раздел.
  • Если раздел сообщения не указан, но сообщениеkey, в соответствии сkeyХэш-значение выбранного раздела.
  • Если первые два не удовлетворяют, раздел будет выбран в циклическом порядке.

4.2 настройка параметра ack и его значение

Завершение производства кластера Kafka для отправки сообщенияrequest.required.acksпараметр для установки уровня надежности данных

  • 1: Значение по умолчанию равно 1, что означает, что ведущая реплика в ISR успешно получает данные и подтверждает их перед отправкой следующего сообщения.В случае выхода из строя мастер-узла может произойти потеря данных.Для подробного анализа см. раздел на упомянутых выше репликах.
  • 0: Указывает, что производитель может продолжить отправку следующего пакета данных, не дожидаясь подтверждения узла.В этом случае эффективность передачи данных самая высокая, но надежность данных самая низкая.
  • -1: указывает, что рабочая сторона должна дождаться получения данных всеми узлами реплик в ISR, прежде чем сообщение будет успешно записано.Надежность самая высокая, но производительность самая низкая.min.insync.replicasЗначение устанавливается равным 1, то в этом случае разрешена только одна копия набора ISR, поэтому также будет потеря данных.

4.3 Идемпотентные свойства

Называемый идемпотентом, это означает, что один или несколько раз запрос ресурса для самого ресурса должен иметь один и тот же результат (за исключением проблем с тайм-аутом сети), или ясно понимать, что влияние одного и того же эффекта производится множество раз для выполнения любой операции и одновременно выполняя воздействие, так что сервер ключей мощности может распознать повторяющийся запрос, а затем отфильтровать повторный запрос, следующая информация обычно достигается идемпотентными характеристиками:

  • Уникально идентифицирует: Чтобы определить, повторяется ли запрос, требуется уникальный идентификатор, и тогда сервер может определить, является ли запрос повторным, на основе этого уникального идентификатора.
  • Журнал запросов, которые уже были обработаны: сервер должен записать обработанный запрос, а затем определить, является ли он повторным запросом по уникальному идентификатору.Если он был обработан, он сразу отклонит его или ничего не сделает, чтобы вернуть успех.

Идемпотентность стороны Producer в kafka означает, что при отправке одного и того же сообщения сообщение будет сохраняться в кластере только один раз, а его идемпотентность устанавливается только при следующих условиях:

  • Может быть гарантирована только идемпотентность рабочего конца в одном сеансе.Если производственный конец неожиданно зависает и по какой-то причине перезагружается, в это время невозможно гарантировать идемпотентность, поскольку нет способа получить информацию о предыдущем состоянии. в это время, то есть не может быть достигнута идемпотентность на уровне сеанса.
  • IDEMPotency не может разрушить несколько разделов тематических разделов, но может гарантировать только идентичность в пределах одного раздела. Когда участвуют несколько разделов сообщений, промежуточные состояния не синхронизируются.

Если вы хотите поддержать случай обрушения или разрушения нескольких сообщений сеанс разделов, нам нужно использовать транзакционную кафку для достижения.

Для реализации идемпотентной семантики генератора вводятся понятия Producer ID (PID) и Sequence Number:

  • Идентификатор производителя (PID): Каждому производителю при инициализации назначается уникальный PID, и назначение PID прозрачно для пользователей.
  • Порядковый номер: для данного PID порядковый номер монотонно увеличивается от 0, каждый раздел темы будет генерировать независимый порядковый номер, и производитель будет добавлять порядковый номер к каждому сообщению при отправке сообщения. Сторона брокера кэширует порядковый номер отправленного сообщения.Только сообщение, которое на 1 больше, чем порядковый номер последнего отправленного сообщения в разделе кеша, будет принято, а остальные будут отклонены.

4.3.1 Процесс отправки сообщений на стороне производства

Ниже приведено краткое введение в рабочий процесс отправителя сообщений, который поддерживает идемпотентность.

  1. Производственная сторона добавит данные в RecordAccumulator через Kafkaproducer и определит, нужно ли создавать новый ProducerBatch при добавлении данных.
  2. Задний этап производственного конца запускает поток отправки, который определяет, нужно ли сбросить текущий PID, причина сброса в том, что некоторые разделы сообщений повторяются несколько раз и, наконец, терпят неудачу из-за удаления тайм-аута.На этот раз серийный номер Не может быть непрерывным. Последующее сообщение не может быть отправлено, поэтому PID будет сброшен, а соответствующая информация кэша будет очищена. В это время сообщение будет потеряно.
  3. Поток-отправитель решает, нужно ли ему запрашивать новый PID, и если да, то он блокируется до тех пор, пока не будет получена информация о PID.
  4. Отправляющий поток вызываетsendProducerData()Когда метод отправляет данные, принимаются следующие решения:
    • Чтобы определить, можно ли продолжить отправку раздела темы, действителен ли PID и является ли это повторным пакетом, необходимо определить, был ли отправлен предыдущий пакет. Текущий раздел темы будет пропущен до тех пор, пока не будет отправлен предыдущий пакет.
    • Если соответствующий ProducerBatch не назначает соответствующую информацию о PID и серийном номере, она будет установлена ​​здесь.

4.3.2 Процесс приема сообщений сервера

После того, как сервер (брокер) получит запрос на запись данных, отправленный производителем, он вынесет некоторые суждения, чтобы определить, могут ли данные быть записаны.Здесь также представлен процесс работы, связанный с идемпотентностью.

  1. Если запрос имеет набор функций идемпотента, он проверит, есть ли у ClusterResource разрешение IdempotentWrite, если нет, он вернет ошибку.CLUSTER_AUTHORIZATION_FAILED.
  2. Проверьте информацию о PID.
  3. Проверьте, повторяется ли пакет в соответствии с порядковым номером пакета.Сервер будет кэшировать последние 5 пакетных данных раздела темы, соответствующего каждому PID.Если есть повторение, он напрямую вернет успех записи, но реальный операция записи данных не будет выполнена.
  4. Если есть PID и неповторяющаяся партия, сделайте следующее:
    • Определено, имеет ли PID кэш.
    • Если нет, определяется, начинается ли серийный номер с 0. Если он представлен как новый PID, PID записывается в кэш (включая информацию о PID, EPOCH и серийном номере), затем выполняется запись данных. операция; если нет Однако серийный номер не начинается с 0, и ошибка возвращается напрямую, указывая на то, что срок действия PID истек на сервере и истек или PID записан.
    • Если PID существует, он проверит, соответствует ли эпохальная версия PID серверу, и вернет ошибку, если она несовместима и порядковый номер не начинается с 0. Если эпоха непостоянна, но порядковый номер начинается с 0, его можно записать нормально.
    • Если версия эпохи непротиворечива, он запросит, является ли последний порядковый номер в кеше непрерывным, если нет, будет возвращена ошибка, в противном случае он будет записан нормально.

5. Потребители

Потребители в основном извлекают сообщения из кластера Kafka, а затем выполняют соответствующую логику потребления. Прогресс потребления потребителем контролируется им самим, что повышает гибкость потребления. Например, потребитель может контролировать повторное потребление определенных сообщений или пропускать определенные сообщения. потреблять.

5.1 Группа потребителей

Несколько потребителей могут образовывать группу потребления, и каждый потребитель принадлежит только к одной группе потребления. Каждый раздел темы подписки группы потребителей назначается только потребителю в группе потребителей для обработки, а разные группы потребителей изолированы друг от друга и не имеют зависимостей. Одно и то же сообщение может использоваться только одним потребителем в группе потребителей. Если вы хотите, чтобы одно и то же сообщение было обработано несколькими потребителями, каждый потребитель должен принадлежать к разным группам потребителей, и в соответствующей группе потребителей должен быть только один потребитель. Введение потребительской группы может реализовать «эксклюзивный» или «транслируемый» эффект потребления.

  • В группе потребителей может быть несколько потребителей, и число поддерживает динамическое изменение.
  • Каждый раздел в теме подписки группы потребителей будет назначен только одному потребителю в группе потребителей.
  • group.id идентифицирует группу потребителей, и один и тот же элемент принадлежит к той же группе потребителей.
  • Различные группы потребителей изолированы друг от друга и не влияют друг на друга.

Как показано на рисунке, группа потребителей 1 содержит двух потребителей, из которых потребителю 1 назначен раздел потребления 0, а потребителю 2 назначен раздел потребления 1 и раздел 2. Кроме того, введение групп потребителей также поддерживает горизонтальное расширение и отказоустойчивость потребителей.Например, из приведенного выше рисунка видно, что потребитель 2 имеет недостаточную емкость потребления.По сравнению с потребителем 1 прогресс потребления относительно отстает.Мы можем добавить больше в группу потребителей. Потребитель, чтобы улучшить свою общую покупательную способность, как показано на рисунке ниже.

Предполагая, что машина, на которой расположен потребитель 1, не работает, группа потребителей отправит повторную балансировку и предположит, что раздел 0 выделен потребителю 2 для потребления, как показано на следующем рисунке. Количество потребителей в одной и той же группе потребителей не является максимально возможным, а максимальное количество разделов, соответствующих теме, не может превышать количество разделов, оно снова изменится, если количество потребителей в группе не изменится и не произойдет перебалансировка .

5.2 Смещение потребления

5.2.1 Тема вытеснения потребителей

Kafka 0.9 стала сохранять информацию о смещении потребителя во внутреннем топике (__consumer_offsets) кластера.Тема по умолчанию составляет 50 партиций.Формат каждой записи журнала: , ключом которой является партиция топика, который в основном хранит информацию о темах, разделах и группах потребителей, значением является объект OffsetAndMetadata, который в основном включает такую ​​информацию, как смещение, время отправки смещения и пользовательские метаданные. Только когда группа потребителей отправляет смещение в kafka, данные будут записываться в эту тему.Если потребитель сохранит информацию о смещении потребления во внешнем хранилище, информации о смещении потребления не будет.Следующее можно сделать черезkafka-console-consumer.shСкрипт для просмотра информации о смещении потребления темы.

# bin/kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server localhost:9092 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --consumer.config config/consumer.properties --from-beginning

[consumer-group01,nginx_access_log,2]::OffsetAndMetadata(offset=17104625, leaderEpoch=Optional.[0], metadata=, commitTimestamp=1573475863555, expireTimestamp=None)
[consumer-group01,nginx_access_log,1]::OffsetAndMetadata(offset=17103024, leaderEpoch=Optional.[0], metadata=, commitTimestamp=1573475863555, expireTimestamp=None)
[consumer-group01,nginx_access_log,0]::OffsetAndMetadata(offset=17107771, leaderEpoch=Optional.[0], metadata=, commitTimestamp=1573475863555, expireTimestamp=None)

5.2.2 Автоматическая подача данных о смещении потребления

Потребитель может установить параметрыenable.auto.commitконтролировать, следует ли фиксировать автоматически или вручную, если значениеtrueЭто означает автоматическую отправку, и информация о смещении потребления будет регулярно предоставляться на фоне потребителя, а временной интервал устанавливаетсяauto.commit.interval.ms(по умолчанию 5 секунд).

Но если он настроен на автоматическую фиксацию, будут следующие проблемы:

  1. Могут быть дублированные данные о перемещении, отправленные в топик о перемещении потребления, потому что сообщение в тему пишется каждые 5 секунд, независимо от того, есть ли новая запись о потреблении, что будет генерировать большое количество сообщений с одним и тем же ключом. , требуется только одно сообщение, поэтому для очистки данных необходимо полагаться на упомянутую выше стратегию сжатия журнала.
  2. Повторное потребление, если предположить, что временной интервал для подачи смещения составляет 5 секунд, тогда, если в течение 5 секунд произойдет перебалансировка, все потребители начнут потреблять с последнего отправленного смещения, а данные, потребляемые в течение периода, будут потребляться снова.

5.2.3 Ручная подача расхода вытеснения

Ручная отправка требуетenable.auto.commitустановлено значениеfalse, а затем ход потребления контролируется бизнес-потребителем, а ручная отправка делится на следующие три типа:

  • Синхронное смещение фиксации вручную: если вызывается метод синхронной фиксацииcommitSync(), последнее смещение, полученное опросом, будет отправлено в кластер kafka, и он будет ждать, пока отправка будет успешной, прежде чем отправка будет успешной.
  • Смещение асинхронной фиксации вручную: вызовите метод асинхронной фиксацииcommitAsync(), он вернется сразу после вызова этого метода без блокировки, а затем вы сможете выполнить соответствующую логику обработки исключений через функцию обратного вызова.
  • Заданное смещение представления: указанное смещение представления также делится на асинхронное и синхронное, параметры — Map, где ключ — раздел сообщения, а значение — объект смещения.

5.3 Координатор группы

Групповой координатор - это услуга. Каждый узел в кластере KAFKA начнет такой сервис, когда он запускается. Эта услуга в основном используется для хранения информации метаданных, связанных с группами потребления. Каждая потребительская группа выберет один координатор, отвечает за хранение Информация о смещении потребления каждого раздела в группе. Основными этапами выбора являются следующими:

  • Предпочтительно определить, в каком разделе должна храниться информация о смещении группы потребителей: Как упоминалось выше, число разделов темы __consumer_offsets по умолчанию равно 50. Следующий алгоритм может использоваться для вычисления того, в каком разделе должна храниться информация о смещении соответствующего потребителя. группа должна храниться в .partition = Math.abs(groupId.hashCode() % groupMetadataTopicPartitionCount)вgroupIdэто идентификатор группы потребителей, который указан потребителем,groupMetadataTopicPartitionCountколичество тематических разделов.
  • Найдите посредника узла, соответствующего лидеру раздела в соответствии с разделом, а координатор посредника является координатором группы потребителей.

5.4 Механизм ребалансировки

5.4.1 Сценарии, в которых происходит ребалансировка

Следующие сценарии вызовут операцию перебалансировки:

  1. В группу потребителей добавляется новый потребитель.
  2. Потребители пассивно находятся в автономном режиме. Например, если потребитель в течение длительного времени не отправлял запрос пульса координатору группы из-за долговременного GC и сетевой задержки, потребитель будет считаться не в сети и исключен.
  3. Потребитель добровольно выходит из группы потребителей.
  4. Количество любых разделов тем, на которые подписана группа потребителей, изменяется.
  5. Потребитель отписывается от темы.

5.4.2 Процесс операции ребалансировки

Осуществление ребалансировки можно разделить на следующие этапы:

  1. найтиGroup Coordinator: потребитель выберет узел с наименьшей нагрузкой из кластера kafka для отправкиGroupCoorinatorRequestзапрос и обработка возвращенного ответаGroupCoordinatorResponse.其中请求参数中包含消费组的id,响应中包含Coordinator所在节点id、host以及端口号信息。
  2. Join group: Когда потребитель получает информацию координатора, она отправит запрос координатору для присоединения к группе потребителейJoinGroupRequest, когда все потребители отправят запрос, координатор выберет потребителя в качестве лидера, а затем отправит потребителю информацию о члене группы, подписке и другую информацию (формат ответаJoinGroupResponseСм. таблицу ниже), руководитель отвечает за распределение планов потребления.

JoinGroupRequestформат данных запроса

название тип инструкция
group_id String идентификатор потребителя
seesion_timeout int Если координатор не получает пульсирующее сообщение в течение времени, указанного параметром session_timeout, потребитель считается не в сети.
member_id String Идентификатор, присвоенный потребителю координатором
protocol_type String Протокол, реализованный группой потребителей, по умолчаниюsonsumer
group_protocols List Содержит все типы PartitionAssignor, поддерживаемые этим потребителем.
protocol_name String Тип PartitionAssignor
protocol_metadata byte[] Информация о потребительской подписке, сериализованная для различных типов разделения, включая пользовательские данные userdata

JoinGroupResponseформат данных ответа

название тип инструкция
error_code short код ошибки
generation_id int Хронологическая информация, назначенная координатором
group_protocol String Тип PartitionAssignor, выбранный координатором
leader_id String Лидер Member_ID.
member_id String Идентификатор, присвоенный потребителю координатором
members Коллекция карт Вся информация о потребительской подписке в группе потребителей
member_metadata byte[] Соответствующая информация о потребительской подписке
  1. Synchronizing Group StateСтадия: Когда ведущий потребитель завершает распределение плана потребления, он отправляетSyncGroupRequestЗапрос к координатору, другие ноды, не являющиеся лидерами, также отправят запрос, но параметр запроса будет пустым, и тогда координатор присвоит результат как ответSyncGroupResponseРаспространяемый потребителю запрос и соответствующий формат данных выглядят следующим образом:

SyncGroupRequestформат данных запроса

название тип инструкция
group_id String идентификатор группы потребителей
generation_id int Информация о возрасте, сохраненная группой потребителей
member_id String Идентификатор потребителя, назначенный координатором
member_assignment byte[] Результат назначения раздела

SyncGroupResponseформат данных ответа

название тип инструкция
error_code short код ошибки
member_assignment byte[] раздел, назначенный текущему потребителю

5.4.3 Стратегия распределения раздела

Kafka предоставляет три стратегии назначения разделов: RangeAssignor, RoundRobinAssignor и StickyAssignor. Реализация каждого алгоритма кратко описана ниже.

  1. RangeAssignor:Kafka будет использовать эту стратегию для выделения разделов по умолчанию.Основной процесс выглядит следующим образом.

    • Отсортируйте разделы по всем темам подписки, чтобы получить наборTP={TP0,Tp1,...,TPN+1}.
    • Отсортируйте всех потребителей в группе потребителей лексикографически по имени, чтобы получить наборCG={C0,C1,...,CM+1}.
    • рассчитатьD=N/M,R=N%M.
    • Потребитель Ci получает начальную позицию раздела потребления = D*i+min(i,R) и общее количество разделов, полученных Ci=D+(если (i+1>R)0 иначе 1).

    Предположим, что в группе потребителей есть два потребителя {C0, C1}, группа потребителей подписывается на три темы {T1, T2, T3}, каждая тема имеет три раздела, всего 9 разделов {TP1, TP2,.. .,TP9}. С помощью приведенного выше алгоритма мы можем получить D = 4, R = 1, тогда группа потребления C0 разделит потребление на {TP1, TP2, TP3, TP4, TP5}, а C1 будет потреблять раздел {TP6, TP7, TP8. , ТР9}. Здесь есть проблема: если его нельзя разделить поровну, то первые несколько потребителей займут еще один раздел.

  2. RoundRobinAssignor:Чтобы использовать эту стратегию, необходимо выполнить следующие два условия: 1) все потребители в группе потребления должны подписываться на одну и ту же тему; 2) все потребители в одной группе потребления указывают одинаковое количество потоков для каждой темы, когда они создан.

    • Отсортировать все разделы всех тем по хэш-значению, полученному по теме + разделу.
    • Отсортируйте всех потребителей лексикографически.
    • Разделы назначаются потребителям в циклическом порядке.
  3. StickyAssignor: этот метод распределения был введен в версии 0.11, в основном для обеспечения следующих характеристик: 1) обеспечить максимально сбалансированное распределение; 2) при перераспределении сохранить как можно больше существующих выделений. Первый имеет приоритет над вторым.

6. Резюме

В этой статье мы сосредоточимся на характеристиках Kafka и подробно расскажем о ее принципе и реализации.Благодаря углубленному анализу тем и журналов мы понимаем механизм хранения, поиска и удаления внутренних сообщений Kafka. Внедрение концепции ISR в систему реплик решает соответствующие недостатки двух схем синхронной реплики и асинхронной репликации, а появление механизма ведущей эпохи решает проблемы потери данных и несогласованности данных. Алгоритм выбора раздела на производственной стороне обеспечивает баланс данных, а поддержка функции идемпотента решает проблему дублирования сообщений, существовавшую ранее.

Наконец, соответствующие принципы со стороны потребителя, механизмы изоляции группы потребителей для достижения стороны потребителя новостей, оба также имеют эксклюзивную поддержку широковещательных сцен и механизм повторной балансировки, чтобы гарантировать надежность и масштабируемость потребителя.

использованная литература

[1] Сюй Цзюньмин, Анализ исходного кода Apache Kafka, [M], Пекин, Electronics Industry Press, 2017.

[2] Углубленный анализ Кафки.

[3] Глубокое понимание распределенных очередей сообщений на основе Kafka и ZooKeeper..

[4] Идемпотентность Реализация транзакционности Kafka.

[5] Обсуждение уровня воды Кафки (высокой отметки) и эпохи лидера.

[6] Как назначить разделы в потребителях Kafka.