1. Кластер Кафка
Kafka использует Zookeeper для хранения информации об участниках кластера (брокерах). Каждый брокер имеет уникальный идентификаторbroker.id
, используемый для идентификации себя в кластере, который можно найти в файле конфигурацииserver.properties
быть настроены в , или автоматически сгенерированы программой. Ниже представлен процесс автоматического создания кластера брокеров Kafka:
- Когда каждый брокер запускается, он будет в Zookeeper.
/brokers/ids
создать путь临时节点
, и поставить свойbroker.id
писать, тем самым регистрируя себя в кластере; - При наличии нескольких брокеров все брокеры создаются в Zookeeper на конкурентной основе.
/controller
Узел, так как узлы в Zookeeper не будут повторяться, должен быть успешно создан только один брокер, который в настоящее время называется брокером-контроллером. Помимо функций других брокеров,Также отвечает за управление состоянием тематических разделов и их реплик.. - Когда брокер не работает или автоматически завершает работу, вызывая истечение времени ожидания сеанса Zookeeper, будет инициировано событие наблюдателя, зарегистрированное в Zookeeper, и Kafka выполнит соответствующую отказоустойчивую обработку; если время простоя связано с брокером-контроллером, он также Инициировать выборы нового контроллера.
Во-вторых, механизм копирования.
Чтобы обеспечить высокую доступность, разделы Kafka мультикопированы, и в случае потери одной копии данные раздела также можно получить из других копий. Однако для этого требуется, чтобы данные соответствующей копии были полными, что является основой согласованности данных Kafka, поэтому необходимо использоватьcontroller broker
для специализированного управления. Механизм репликации Kafka будет подробно объяснен ниже.
2.1 Разделы и реплики
Темы Kafka разделены на несколько разделов, а раздел — это основная единица хранения в Kafka. Каждый раздел может иметь несколько реплик (можно использовать при создании темыreplication-factor
параметры указать). Одна из реплик является ведущей репликой, и все события отправляются непосредственно на ведущую реплику, другие реплики являются ведомыми репликами, которые необходимо реплицировать, чтобы обеспечить согласованность данных с ведущей репликой. Копия-последователь станет новым лидером.
2.2 Механизм ISR
Каждый раздел имеет список ISR (синхронизированных реплик), в котором хранятся все синхронизированные доступные реплики. Ведущая копия должна быть синхронной копией, а подчиненная копия должна соответствовать следующим условиям, чтобы считаться синхронной копией:
- Имеется активная сессия с Zookeeper, то есть пульсации должны регулярно отправляться в Zookeeper;
- Сообщение было получено от ведущей реплики с малой задержкой в течение заданного времени.
Если реплика не соответствует вышеуказанным условиям, она будет удалена из списка ISR и не будет добавлена снова, пока условия не будут выполнены.
Вот пример создания темы: используйте--replication-factor
Укажите коэффициент копирования 3, который будет использоваться после успешного создания.--describe
Команда может видеть, что раздел 0 имеет три реплики 0, 1 и 2, и все три реплики находятся в списке ISR, из которых 1 является ведущей репликой.
2.3 Неполные лидерные выборы
Для механизма реплики существует необязательный параметр конфигурации на уровне брокера.unclean.leader.election.enable
, значение по умолчанию — fasle, что означает, что неполные выборы лидера запрещены. Это нужно для того, чтобы разрешить неполностью синхронизированной реплике стать ведущей репликой, когда ведущая реплика умирает, а в ISR нет других доступных реплик, что может привести к потере данных или несогласованности данных.В некоторых случаях требования к согласованности данных высоки. , Для сценариев (таких как финансовая сфера) это может быть неприемлемо, поэтому его значение по умолчанию равно false, если вы можете допустить некоторые несоответствия данных, вы можете настроить его на true.
2.4 Минимальные реплики синхронизации
Другим важным параметром механизма ISR являетсяmin.insync.replicas
, можно настроить на уровне брокера или темы, что означает, что в списке ISR должно быть как минимум несколько доступных реплик. Здесь предполагается, что установлено значение 2, тогда, когда количество доступных реплик меньше этого значения, весь раздел считается недоступным. В этот момент, когда клиент записывает данные в раздел, будет выдано исключениеorg.apache.kafka.common.errors.NotEnoughReplicasExceptoin: Messages are rejected since there are fewer in-sync replicas than required。
2.5 Отправить подтверждение
Kafka имеет необязательный параметр ack для производителя, который указывает, сколько реплик раздела должно получить сообщение, прежде чем производитель сочтет сообщение успешно записанным:
- acks=0: сообщение считается успешным, когда оно отправлено, и оно не будет ждать ответа от сервера;
- acks=1: пока ведущий узел кластера получает сообщение, производитель получит успешный ответ от сервера;
- acks=all: производитель получит успешный ответ от сервера только тогда, когда все узлы, участвующие в репликации, получат сообщение.
3. Запрос данных
3.1 Механизм запроса метаданных
Среди всех реплик только реплика-лидер может читать и писать сообщения. Поскольку ведущие реплики разных разделов могут находиться на разных посредниках, если посредник получает запрос на раздел, но ведущая реплика раздела не находится на посреднике, он вернетNot a Leader for Partition
ответ об ошибке. Для решения этой проблемы Kafka предоставляет механизм запроса метаданных.
Сначала каждый брокер в кластере будет кэшировать информацию о репликах разделов всех топиков, а клиент будет периодически отправлять запросы метаданных, а затем кэшировать полученные метаданные. Интервал регулярного обновления метаданных можно настроить для клиента с помощьюmetadata.max.age.ms
указать. С помощью метаданных клиент узнает брокеру, где находится ведущая реплика, а затем напрямую отправляет запросы на чтение и запись соответствующему брокеру.
Если выбор реплики раздела происходит во временном интервале временного запроса, это означает, что исходная кэшированная информация может быть устаревшей, и есть возможность получитьNot a Leader for Partition
В этом случае клиент снова сделает запрос метаданных, затем обновит локальный кеш, а затем перейдет к нужному брокеру для выполнения соответствующей операции.Процесс выглядит следующим образом:
3.2 Видимость данных
Следует отметить, что не все данные, сохраненные на лидере раздела, могут быть прочитаны клиентом.Чтобы обеспечить согласованность данных, клиент может прочитать только данные, сохраненные всеми синхронными репликами (все реплики в ISR). .
3.3 Нулевая копия
Kafka записывает и читает все данные через нулевое копирование. Разница между традиционной копией и нулевой копией заключается в следующем:
Четыре копии и четыре переключения контекста в устаревшем режиме
В качестве примера возьмем отправку файла на диске по сети. В традиционном режиме метод, показанный в следующем псевдокоде, обычно используется для чтения данных файла в память, а затем для отправки данных в памяти через сокет.
buffer = File.read
Socket.send(buffer)
Этот процесс фактически имеет место для четырех копий данных. Сначала данные файла считываются в буфер режима ядра (копия DMA) через системный вызов, а затем приложение считывает данные буфера режима памяти в буфер пользовательского режима (копия ЦП), а затем пользовательская программа отправляет данные через сокет Данные буфера пользовательского режима Скопируйте в буфер режима ядра (копия ЦП) и, наконец, скопируйте данные в буфер сетевой карты через копирование DMA. В то же время он также сопровождается четырьмя переключателями контекста, как показано на следующем рисунке:
sendfile и transferДля реализации нулевого копирования
Ядро Linux 2.4+ пройденоsendfile
Системные вызовы, которые обеспечивают нулевое копирование. После того как данные скопированы в буфер режима ядра через DMA, они напрямую копируются в буфер NIC через DMA без копирования ЦП. Отсюда и термин «нулевое копирование». Помимо сокращения копирования данных, поскольку весь прочитанный файл в сеть отправляетсяsendfile
Вызов завершается всего двумя переключениями контекста для всего процесса, что значительно повышает производительность. Процесс нулевого копирования показан на следующем рисунке:
С точки зрения конкретной реализации передача данных Kafka осуществляется через TransportLayer и его подклассы.PlaintextTransportLayer
изtransferFrom
метод, вызывая FileChannel в Java NIOtransferTo
Метод реализует нулевое копирование следующим образом:
@Override
public long transferFrom(FileChannel fileChannel, long position, long count) throws IOException {
return fileChannel.transferTo(position, count, socketChannel);
}
Примечание: transferTo
иtransferFrom
Нет никакой гарантии, что можно использовать нулевое копирование. Можно ли на самом деле использовать нулевое копирование, зависит от ОС, если ОС обеспечиваетsendfile
Такой системный вызов с нулевым копированием, эти два метода будут в полной мере использовать преимущества нулевого копирования посредством такого системного вызова, в противном случае с помощью этих двух методов нельзя будет достичь нулевого копирования.
4. Физическое хранилище
4.1 Назначение разделов
При создании топика Kafka сначала решает, как распределить реплики разделов между брокерами, руководствуясь следующими принципами:
- Равномерно распределяйте реплики разделов по всем брокерам;
- Убедитесь, что каждая реплика раздела распределена на другом брокере;
- если используется
broker.rack
Параметр указывает информацию о стойке для брокера, тогда реплики каждого раздела будут максимально выделены брокерам разных стоек, чтобы избежать недоступности одной стойки и недоступности всего раздела.
По указанным выше причинам, если вы создаете тему с 3 репликами на одном узле, обычно будет выдано следующее исключение:
Error while executing topic command : org.apache.kafka.common.errors.InvalidReplicationFactor
Exception: Replication factor: 3 larger than available brokers: 1.
4.2 Правила хранения данных раздела
Сохранение данных — фундаментальная функция Kafka, но Kafka не хранит данные вечно и не ждет, пока все потребители прочитают сообщения, прежде чем удалять сообщения. Вместо этого Kafka настраивает периоды хранения данных для каждой темы, указывая, как долго данные могут храниться до их удаления, или объем данных, которые могут храниться до их очистки. Соответствует следующим четырем параметрам:
-
log.retention.bytes
: максимально допустимый объем данных перед удалением данных, значение по умолчанию равно -1, что означает отсутствие ограничений; -
log.retention.ms
: количество миллисекунд для сохранения файла данных, если не установлено, используйтеlog.retention.minutes
Значение в , по умолчанию равно нулю; -
log.retention.minutes
: количество минут для хранения файлов данных, если не установлено, используйтеlog.retention.hours
Значение в , по умолчанию равно нулю; -
log.retention.hours
: количество часов хранения файлов данных, значение по умолчанию — 168, что соответствует одной неделе.
Поскольку поиск и удаление сообщений в большом файле требует много времени и чреват ошибками, Kafka делит раздел на несколько сегментов, и сегмент, который в данный момент записывает данные, называется активным сегментом. Активные фрагменты никогда не удаляются. Если вы храните данные за неделю по умолчанию и каждый день используете новый сегмент, то вы увидите, что использование нового сегмента каждый день приведет к удалению самого старого сегмента, поэтому большую часть времени в партиции будет 7 сегментов.
4.3 Формат файла
Обычно формат данных, сохраняемых на диске, совпадает с форматом сообщения, отправляемого производителем. Если производитель отправляет сжатые сообщения, то сообщения в одном пакете сжимаются вместе, отправляются как «свернутые сообщения» (в формате, показанном ниже) и сохраняются на диск. После того, как потребитель прочитает его, он самостоятельно распаковывает упакованное сообщение, чтобы получить конкретную информацию о каждом сообщении.
использованная литература
- Неха Наркхеде, Гвен Шапира, Тодд Палино (автор), Сюэ Миндэн (переводчик). Полное руководство по Кафке. People's Posts and Telecommunications Press. 26 декабря 2017 г.
- Высокопроизводительная архитектура Kafka
Дополнительные статьи серии о больших данных см. в проекте с открытым исходным кодом GitHub.:Руководство для начинающих по большим данным