Kafka — широко используемое промежуточное программное обеспечение для сообщений.Эта статья в основном знакомит с основными компонентами Kafka и связанными с ними принципами.
Базовая архитектура
- Брокер: узел обработки промежуточного программного обеспечения сообщений, узел Kafka является посредником, и один или несколько посредников могут формировать кластер Kafka.
- Тема: Kafka классифицирует сообщения по темам, и каждое сообщение, публикуемое в кластере Kafka, должно указывать тему.
- Производитель: производитель сообщений, клиент, который отправляет сообщения брокеру.
- Потребитель: потребитель сообщений, клиент, который читает сообщения от брокера.
- ConsumerGroup: каждый потребитель принадлежит к определенной группе потребителей, сообщение может быть отправлено нескольким различным группам потребителей, но только один потребитель в группе потребителей может использовать сообщение.
- Раздел: физическая концепция, тема может быть разделена на несколько разделов, и каждый раздел упорядочен внутри
Компенсировать
Kafka гарантирует порядок сообщений внутри раздела посредством смещения, а порядок смещения не пересекает разделы. После Kafka 0.10 используйте специальную тему__consumer_offset
Сохраните смещение.__consumer_offset
Метод удержания журнала Compact, то есть тема организует сообщения с одним и тем же ключом
В __consumer_offset хранятся три типа сообщений:
- Сообщение метаданных группы потребителей
- Сообщение о смещении группы потребителей
- Новости надгробия
kafka log
место хранения
Каждый раздел фактически соответствует каталогу журнала: {topicName}-{partitionid}/, и в этом каталоге будет несколько сегментов журнала (LogSegment). Файл LogSegment состоит из двух частей: файла «.index» и файла «.log».
В файле индекса используется метод разреженного индекса, чтобы избежать индексации всех данных журнала и сэкономить место для хранения.Отправить
Используя кеш страниц для последовательного чтения файлов, операционная система может предварительно считывать данные в кеш страниц. В то же время используйте mmap для прямого сопоставления файла журнала с виртуальным адресным пространством.
read() является системным вызовом.Сначала файл копируется с жесткого диска в буфер в пространстве ядра, а затем данные копируются в пространство пользователя.Фактически выполняется две копии данных; mmap() также является системным вызовом, но копирование данных не выполняется.При возникновении ошибки страницы файл копируется напрямую с жесткого диска в пространство пользователя, и выполняется только одно копирование данных. Java использует MappedByteBuffer для инкапсуляции mmapНулевое копирование: данные сообщения отправляются напрямую из кэша страниц в сеть. Обычное чтение файла должно проходить через процесс, показанный на рисунке ниже, и между пользовательским режимом и режимом ядра есть две копии памяти.
Kafka использует нулевое копирование, чтобы избежать копирования сообщений между режимом ядра и режимом пользователя.копировать
-
Каждый раздел имеет ISR (синхронизированные реплики).
-
Каждая реплика в наборе ISR синхронизируется с лидером, а те, которые не входят в него, не могут быть синхронизированы.
-
Только реплики в ISR могут быть избраны лидерами
-
Сообщение, написанное производителем, считается «зафиксированным», только если оно получено всеми репликами в ISR.
-
Смещение конца журнала: смещение последней части данных, записанных производителем в Kafka.
-
Верхний водяной знак: смещение последней части данных, которая была успешно скопирована на другие реплики, то есть данные между смещением конца журнала и верхним водяным знаком были записаны в лидер раздела, но не были успешно резервное копирование на другие реплики
Процесс синхронизации реплик:
Controller
Контроллер аналогичен мастеру кластера, и в основном удается следующие блоки:
- Брокерский онлайн и оффлайн процессинг
- Расширение раздела темы, обработка выделения реплики раздела, выборы лидера
Контроллер выбирается брокером, вытесняющим временную ноду zk, и контроллер устанавливает длинное соединение со всеми брокерами.
Контроллер управляет выбором лидера раздела следующими способами:
метод выборов | инструкция |
---|---|
OfflinePartitionLeaderSelector | Срабатывает, когда лидер уходит в автономный режим |
ReassignedPartitionLeaderSelector | Запускается после переназначения реплики раздела, синхронизация данных завершена |
PreferredReplicaPartitionLeaderSelector | Оптимальные выборы лидеров, инициируемые ручным или автоматическим планированием баланса лидеров |
ControlledShutdownLeaderSelector | Запускается, когда брокер отправляет запрос ShutDown для активного отключения службы. |
Идемпотентность сообщения
проблема:
- До 0.11.0 производитель гарантировал хотя бы один раз
- хотя бы раз может привести к дублированию данных Повторная операция, вызванная задержкой сетевого запроса и т. д., при отправке запроса на повторную попытку сервер не знает, был ли обработан запрос (предыдущая информация о состоянии не записывается), поэтому это может привести к повторной передаче данных. запрос, который является дублированием данных, вызванным собственным механизмом Kafka (механизм повторных попыток при отклонении от нормы)
решение:
- PID (идентификатор производителя), используемый для идентификации каждого клиента производителя.
- порядковые номера, каждое сообщение, отправляемое клиентом, будет иметь соответствующий порядковый номер, и серверная сторона определяет, повторяются ли данные в соответствии с этим значением.
Rebalance
5 ситуаций, которые случаются с ребалансом кафки:
- В Группу потребителей присоединяются новые потребители.
- Некоторые потребители ушли в оффлайн. Потребители не обязательно должны быть в автономном режиме.Например, когда потребитель не может отправить HeartbeatRequest в GroupCoordinator в течение длительного времени из-за длительной задержки GC или сети, GroupCoordinator будет рассматривать потребителя в автономном режиме.
- Некоторые потребители добровольно выходят из Consumer Group.
- Любая тема, на которую подписана Consumer Group, изменяет количество разделов.
- Потребитель вызывает unsubscribe(), чтобы отменить подписку на тему.
Kafka управляет операциями перебалансировки через GroupCoordinator.
- GroupCoordinator — это компонент KafkaServer, используемый для управления группой потребителей.
- GroupCoordinator добавляет Watcher в ZooKeeper
- Получить GroupCoordinator: потребитель отправит ConsumerMetadataRequest любому брокеру в кластере Kafka.
- Потребитель подключается к GroupCoordinator и периодически отправляет HeartbeatRequest.
- Если в HeartbeatResponse есть исключение IllegalGeneration, это означает, что GroupCoordinator инициировал операцию перебалансировки, и в это время вводится ссылка на перебалансировку. Ребалансировка делится на два процесса.
Join Group:
- Потребитель сначала отправляет запрос JoinGroupRequest в GroupCoordinator, который содержит соответствующую информацию о потребителе.
- GroupCoordinator выбирает потребителя, который становится лидером группы, инкапсулирует его как JoinGroupResponse и возвращает его каждому потребителю.
- Только сообщение JoinGroupResponse, полученное лидером группы, инкапсулирует информацию обо всех потребителях, а лидер группы выделяет разделы в соответствии с информацией о потребителях и выбранной стратегией распределения разделов.
Sync Group:
- Каждый потребитель отправит SyncGroupRequest координатору группы, но только запрос SyncGroupRequest лидера группы содержит результат выделения раздела.
- GroupCoordinator формирует SyncGroupResponse и возвращает его всем потребителям в соответствии с результатом выделения разделов лидером группы.
- После того, как потребитель получит SyncGroupResponse и проанализирует его, он может получить раздел, назначенный самому себе.