1. Введение в Кафку
1. Введение
Apache Kafka — это платформа для распределенной обработки потоков. Он имеет следующие характеристики:
- Поддержка публикации сообщений и подписки, аналогично RabbtMQ, ActiveMQ и другим очередям сообщений.
- Поддержка обработки данных в реальном времени
- Гарантия надежной доставки сообщений
- Поддерживает постоянное хранение сообщений и обеспечивает отказоустойчивость сообщений благодаря схеме распределенного хранения с несколькими копиями.
- Высокая пропускная способность, один брокер может легко обрабатывать тысячи разделов и миллионы сообщений в секунду.
2. Основные понятия
Messages And Batches
Базовая единица данных Kafka называется сообщением.Чтобы уменьшить нагрузку на сеть и повысить эффективность, несколько сообщений будут помещены в один и тот же пакет (Пакет), а затем записаны.
Topics And Partitions
Сообщения Kafka классифицируются по темам.Тема может быть разделена на несколько разделов.Раздел представляет собой журнал коммитов. Сообщения записываются в разделы в порядке добавления, а затем считываются в порядке поступления. Kafka обеспечивает избыточность данных и масштабируемость с помощью разделов. Разделы могут быть распределены по разным серверам, а это означает, что тема может охватывать несколько серверов, обеспечивая более высокую производительность, чем один сервер.
Поскольку тема содержит несколько разделов, порядок сообщений во всем разделе не может быть гарантирован, но может быть гарантирован порядок сообщений в одном разделе.
Producers And Consumers
- режиссер
Производители несут ответственность за создание сообщений. В общем, производитель распределяет сообщения равномерно по всем разделам темы, и ему все равно, в какой раздел будет записано сообщение. Если мы хотим записать сообщение в указанный раздел, мы можем сделать это, настроив разделитель.
- потребитель
Потребители являются частью группы потребителей, и потребители несут ответственность за потребление сообщений. Потребители могут подписаться на одну или несколько тем и читать сообщения в порядке их создания. Потребители различают прочитанные сообщения, проверяя их смещения. Смещение — это увеличивающееся число, которое Kafka добавляет при создании сообщения, и оно уникально для каждого сообщения в данном разделе. Потребитель сохраняет последнее смещение чтения каждого раздела в Zookeeper или Kafka, и если потребитель завершает работу или перезапускается, он также может получить смещение, чтобы убедиться, что состояние чтения не потеряно.
Раздел может быть прочитан только одним потребителем в одной группе потребителей, но может быть прочитан несколькими потребителями в разных группах потребителей. Когда потребители из нескольких групп потребителей вместе читают одну и ту же тему, они не влияют друг на друга.
Brokers And Clusters
Автономный сервер Kafka называется брокером. Брокер получает сообщения от производителей, устанавливает смещения для сообщений и фиксирует сообщения на диск для сохранения. Брокеры обслуживают потребителей, отвечая на запросы на чтение разделов, возвращая сообщения, которые были зафиксированы на диске.
Брокеры являются частью кластера. Каждый кластер выберет брокера в качестве контроллера кластера (контроллера).Контроллер кластера отвечает за управление, включая назначение разделов брокерам и мониторинг брокеров.
В кластере раздел подчиняется брокеру, который называется лидером раздела. Раздел может быть назначен нескольким брокерам, и в это время будет происходить репликация раздела. Этот механизм репликации обеспечивает избыточность сообщений для разделов, и в случае сбоя одного посредника другие посредники могут взять на себя управление.
2. Кафка Продюсер
1. Стратегия разделения
Причина раздела
- В кластере удобно расширяться, каждый раздел можно настроить в соответствии с машиной, на которой он расположен, а тема может состоять из нескольких разделов, поэтому кластер может адаптироваться к данным любого размера.
- Параллелизм может быть улучшен, поскольку он может быть прочитан и записан в единицах раздела.
Принцип зонирования
Мы инкапсулируем данные, отправленные производителем, в объект ProducerRecord.
- В случае указания раздела непосредственно используйте указанное значение в качестве значения раздела.
- Если значение раздела не указано, но есть ключ, значение раздела получается путем взятия остатка от хеш-значения ключа и количества разделов темы.
- В случае отсутствия значения раздела или значения ключа целое число генерируется случайным образом при первом вызове (последующие вызовы увеличивают это целое число), а значение раздела получается путем взятия остатка от общего количества разделов, доступных для темы. этого значения, то есть постоянный алгоритм Said Round Robin (циклическое планирование).
2. Гарантия достоверности данных
Чтобы гарантировать, что данные, отправленные производителем, могут быть надежно отправлены в указанную тему, каждый раздел темы должен отправить подтверждение производителю после получения данных, отправленных производителем.Выполнить следующий раунд отправки, в противном случае повторно отправить данные.
Стратегия синхронизации данных реплики
строить планы | преимущество | недостаток |
---|---|---|
Отправить подтверждение, когда более половины синхронизации завершено | низкая задержка | При выборе нового лидера допускайте отказ n узлов, требующих 2n+1 реплик. |
После завершения всех синхронизаций отправляется ack | При выборе нового лидера допускайте отказ n узлов, требующих n+1 реплик. | высокая задержка |
Кафка выбрал второй вариант по следующим причинам:
- Кроме того, чтобы допустить отказ n узлов, первая схема требует 2n+1 копий, а вторая схема требует только n+1 копий, и каждый раздел Kafka имеет большой объем данных, первая схема вызовет много проблем. избыточности данных.
- Хотя сетевая задержка второй схемы будет выше, сетевая задержка меньше влияет на Kafka (передачу в том же сетевом окружении).
ISR
Лидер поддерживает динамический набор синхронизированных реплик (ISR), что означает набор последователей, которые синхронизируются с лидером. Когда ведомый в ISR завершает синхронизацию данных, лидер отправляет подтверждение производителю. Если фолловер длительное время не синхронизирует данные с лидером, то фолловер будет выкинут из ISR.Порог времени задается параметром replica.lag.time.max.ms. После неудачи лидера из ISR избирается новый лидер.
механизм ответа подтверждения
Для некоторых менее важных данных надежность данных не очень высока, и можно допустить небольшую потерю данных, поэтому нет необходимости ждать, пока все подписчики в ISR успешно получат их.
Таким образом, Kafka предоставляет пользователям три уровня надежности.Пользователи могут выбирать следующие конфигурации в соответствии с требованиями надежности и задержки.
установка параметра ack (спрашивает)
- 0: Производитель не ждет подтверждения от брокера. Эта операция обеспечивает наименьшую задержку. Брокер вернется, как только получит его и не запишет на диск. При сбое брокера данные могут быть потеряны.
- 1: производитель ожидает подтверждения от брокера, а лидер раздела возвращает подтверждение после успешного размещения раздела.Если лидер выходит из строя до того, как ведомый будет успешно синхронизирован, данные будут потеряны.
- -1 (все): производитель ожидает подтверждения от брокера, лидера и последователей раздела (в ISR) для успешного размещения перед возвратом подтверждения, но если синхронизация последователя завершена, до того, как брокер отправит ack, лидер выходит из строя, и производитель перезапускается. Отправка сообщения новому лидеру вызовет дублирование данных.
Проблемы согласованности данных (обработка ошибок)
- Ошибка повторителя После сбоя повторитель будет временно исключен из ISR.После восстановления повторителя он прочитает последнее HW, записанное на локальном диске, отрезает часть лог-файла выше HW и начнет с HW лидеру. Когда LEO ведомого больше или равно HW раздела, то есть после того, как ведомый догонит ведущего, он может снова присоединиться к ISR.
- Отказ лидера После отказа лидера из ISR будет выбран новый лидер.После этого, чтобы обеспечить согласованность данных между несколькими репликами, оставшиеся последователи сначала усекут часть своих лог-файлов выше, чем HW.Затем синхронизируют данные от нового лидера.
Примечание. Это гарантирует только согласованность данных между репликами и не гарантирует, что данные не будут потеряны или дублированы.
3. 3. Ровно раз семантика
Установка уровня ACK сервера на -1 гарантирует, что никакие данные не будут потеряны между источником и сервером, то есть семантика хотя бы один раз. Напротив, установка уровня ACK сервера на 0 гарантирует, что каждое сообщение от производителя будет отправлено только один раз, то есть семантика «Не более одного раза».
Хотя бы один раз может гарантировать, что данные не будут потеряны, но не может гарантировать, что данные не будут дублироваться; с другой стороны, хотя бы один раз может гарантировать, что данные не будут дублироваться, но не может гарантировать, что данные не потеряется. Однако для некоторых очень важных сведений, таких как данные транзакций, нижестоящие потребители данных требуют, чтобы данные не дублировались и не терялись, т. е. семантика Exactly Once.
До версии 0.11 Kafka с этим ничего нельзя было поделать, можно было только гарантировать, что данные не будут потеряны, а затем нижестоящие потребители будут дедуплицировать данные глобально. В случае нескольких нижестоящих приложений, для каждого из них требуется глобальная дедупликация, что оказывает большое влияние на производительность.
В версии 0.11 Kafka появилась важная функция: идемпотентность. Так называемая идемпотентность означает, что независимо от того, сколько раз производитель отправляет повторяющиеся данные на сервер, сервер сохранит только один. Идемпотентность в сочетании с семантикой хотя бы один раз составляют семантику Кафки «точно один раз». который:
- По крайней мере один раз + идемпотентность = ровно один раз. Чтобы включить идемпотентность, просто установите для параметра enable.idompotence значение true в параметрах производителя.
Реализация идемпотентности Kafka на самом деле состоит в том, чтобы воспроизвести исходные потребности нисходящего потока для данных восходящего потока. Производитель с включенной идемпотентностью будет назначен PID во время инициализации, а сообщения, отправленные в тот же раздел, будут сопровождаться порядковым номером. Сторона брокера будет кэшировать, когда будет отправлено сообщение с тем же первичным ключом, брокер сохранит только одно.
Однако PID будет меняться после перезапуска, а разные разделы также имеют разные первичные ключи, поэтому идемпотентность не может гарантировать ровно один раз для разделов и сеансов.
идемпотент отправить
Как упоминалось выше, один из способов реализации Exactly Once состоит в том, чтобы сделать нижестоящую систему с идемпотентными характеристиками обработки, а в Kafka Stream сам Kafka Producer является «нисходящей» системой, поэтому, если у Producer могут быть идемпотентные характеристики обработки, то Kafka Stream может поддержка Ровно раз семантика в определенной степени.
Чтобы реализовать идемпотентную семантику производителя, Kafka вводит идентификатор производителя (то есть PID) и порядковый номер. Каждому новому производителю будет присвоен уникальный PID во время инициализации, который полностью прозрачен для пользователя и не будет доступен пользователю.
Для каждого PID каждый
Точно так же сторона брокера также поддерживает порядковый номер для каждого
- Если порядковый номер сообщения на единицу больше, чем порядковый номер, поддерживаемый Брокером, это означает, что в середине есть данные, которые еще не были записаны, то есть не по порядку.В это время Брокер отклоняет сообщение
- Если порядковый номер сообщения меньше или равен порядковому номеру, поддерживаемому Брокером, это означает, что сообщение было сохранено, что является дубликатом сообщения, и Брокер напрямую отбрасывает сообщение.
Приведенный выше дизайн решает две проблемы в версиях до 0.11.0.0:
- После того, как брокер сохраняет сообщение, происходит сбой перед отправкой ACK.Производитель считает, что сообщение не было отправлено успешно, и повторяет попытку, что приводит к дублированию данных.
- Предыдущее сообщение не удалось отправить, следующее сообщение было успешно отправлено, а предыдущее сообщение было успешно отправлено после повторной попытки, что привело к нарушению порядка данных.
3. Потребитель Кафки
1. Метод потребления
Потребитель считывает данные от брокера в режиме извлечения.
Режим push сложно адаптировать к потребителям с разной скоростью потребления, поскольку скорость отправки сообщений определяется брокером. Его цель состоит в том, чтобы доставлять сообщения как можно быстрее, но это может легко привести к тому, что потребители будут слишком поздно обрабатывать сообщения, как правило, из-за отказа в обслуживании и перегрузки сети. В режиме извлечения сообщения могут потребляться с соответствующей скоростью в соответствии с потребляемой мощностью потребителя.
Недостатком режима pull является то, что если у kafka нет данных, потребитель может зациклиться, постоянно возвращая пустые данные. В ответ на это потребители Kafka передают тайм-аут параметра длительности при потреблении данных. Если в настоящее время нет данных, доступных для потребления, потребитель будет ждать в течение определенного периода времени, прежде чем вернуться. Этот период времени является тайм-аутом.
2. Стратегия выделения разделов
В группе потребителей есть несколько потребителей, а топик имеет несколько разделов, поэтому неизбежно потребуется выделение разделов, то есть определение того, какой потребитель использует этот раздел.
У Kafka есть две стратегии распределения: RoundRobin и Range.
roundrobin берет по модулю количество потребителей в соответствии с номером раздела и распределяет их в циклическом режиме.
3. Ведение зачета
Поскольку потребитель может столкнуться со сбоями, такими как сбой питания и время простоя в процессе потребления, после восстановления потребителя он должен продолжать потреблять с позиции до сбоя, поэтому потребитель должен записывать, какое смещение он потребляет в режиме реального времени, поэтому которые он может продолжать потреблять после восстановления сбоя.
группа + тема + раздел (GTP) может определить смещение!
До версии Kafka 0.9 потребитель по умолчанию сохранял смещение в Zookeeper. В настоящее время).
4. Глубокое понимание механизма репликации Kafka
1. Кластер Кафка
Kafka использует Zookeeper для хранения информации об участниках кластера (брокерах). Каждый брокер имеет уникальный идентификатор broker.id, который используется для идентификации его личности в кластере, который может быть настроен в конфигурационном файле server.properties или автоматически сгенерирован программой. Ниже представлен процесс автоматического создания кластера брокеров Kafka:
- Когда каждый брокер запускается, он создает временный узел в пути Zookeeper /brokers/ids и записывает свой Broker.id, чтобы зарегистрироваться в кластере;
- При наличии нескольких брокеров все брокеры будут конкурировать за создание узла /controller в Zookeeper. Поскольку узлы в Zookeeper не дублируются, необходимо успешно создать только один брокер. В настоящее время этот брокер называется брокером-контроллером. Помимо функций других брокеров, он также отвечает за управление состоянием разделов темы и их реплик.
- Когда брокер не работает или автоматически завершает работу, вызывая истечение времени ожидания сеанса Zookeeper, будет инициировано событие наблюдателя, зарегистрированное в Zookeeper, и Kafka выполнит соответствующую отказоустойчивую обработку; если время простоя связано с брокером-контроллером, он также Инициировать выборы нового контроллера.
2. Механизм копирования
Чтобы обеспечить высокую доступность, разделы Kafka мультикопированы, и в случае потери одной копии данные раздела также можно получить из других копий. Однако для этого требуется, чтобы данные соответствующей реплики были полными, что является основой согласованности данных Kafka, поэтому для специального управления необходимо использовать брокера контроллера. Механизм репликации Kafka будет подробно объяснен ниже.
Разделы и реплики
Темы Kafka разделены на несколько разделов, а раздел — это основная единица хранения в Kafka. Каждый раздел может иметь несколько реплик. Одна из реплик является ведущей репликой, и все события отправляются непосредственно на ведущую реплику, другие реплики являются ведомыми репликами, которые необходимо реплицировать, чтобы обеспечить согласованность данных с ведущей репликой. Копия-последователь станет новым лидером.
ISR-механизм
Каждый раздел имеет список ISR (синхронизированных реплик), в котором хранятся все синхронизированные доступные реплики. Ведущая копия должна быть синхронной копией, а подчиненная копия должна соответствовать следующим условиям, чтобы считаться синхронной копией:
- Имеется активная сессия с Zookeeper, то есть пульсации должны регулярно отправляться в Zookeeper;
- Сообщение было получено от ведущей реплики с малой задержкой в течение заданного времени.
Если реплика не соответствует вышеуказанным условиям, она будет удалена из списка ISR и не будет добавлена снова, пока условия не будут выполнены.
неполные выборы лидера
Для механизма реплики на уровне брокера существует необязательный параметр конфигурации unclean.leader.election.enable, значение по умолчанию — fasle, что означает, что неполные выборы лидера запрещены. Это нужно для того, чтобы разрешить неполностью синхронизированной реплике стать ведущей репликой, когда ведущая реплика умирает, а в ISR нет других доступных реплик, что может привести к потере данных или несогласованности данных.В некоторых случаях требования к согласованности данных высоки. , Для сценариев (таких как финансовая сфера) это может быть неприемлемо, поэтому его значение по умолчанию равно false, если вы можете допустить некоторые несоответствия данных, вы можете настроить его на true.
минимальная синхронизированная реплика
Другим важным параметром механизма ISR является min.insync.replicas, который можно настроить на уровне брокера или темы, представляя как минимум несколько доступных реплик в списке ISR. Здесь предполагается, что установлено значение 2, тогда, когда количество доступных реплик меньше этого значения, весь раздел считается недоступным. На этом этапе, когда клиент записывает данные в раздел, он выдает исключение org.apache.kafka.common.errors.NotEnoughReplicasExceptoin: сообщения отклоняются, поскольку синхронизированных реплик меньше, чем требуется.
отправить подтверждение
Kafka имеет необязательный параметр ack для производителя, который указывает, сколько реплик раздела должно получить сообщение, прежде чем производитель сочтет сообщение успешно записанным:
- acks=0 : сообщение считается успешным, когда оно отправлено, и оно не будет ждать ответа от сервера.
- acks=1 : пока ведущий узел кластера получает сообщение, производитель получит успешный ответ от сервера.
- acks=all : производитель получит успешный ответ от сервера только тогда, когда все узлы, участвующие в репликации, получат сообщение.
3. Запрос данных
Механизм запроса метаданных
Среди всех реплик только реплика-лидер может читать и писать сообщения. Так как ведущие реплики разных разделов могут находиться на разных посредниках, если посредник получает запрос на разделение, но ведущая реплика раздела не находится на посреднике, он вернет клиенту ответ «Не лидер для раздела.Ошибка». Для решения этой проблемы Kafka предоставляет механизм запроса метаданных.
Сначала каждый брокер в кластере будет кэшировать информацию о репликах разделов всех топиков, а клиент будет периодически отправлять запросы метаданных, а затем кэшировать полученные метаданные. Интервал периодического обновления метаданных можно указать, настроив metadata.max.age.ms для клиента. С помощью метаданных клиент узнает брокеру, где находится ведущая реплика, а затем напрямую отправляет запросы на чтение и запись соответствующему брокеру.
Если выбор копии раздела происходит во временном интервале временного запроса, это означает, что исходная кэшированная информация может быть устаревшей, а также может быть получен ответ об ошибке Not a Leader for Partition. клиент Он снова сделает запрос метаданных, затем обновит локальный кеш, а затем перейдет к правильному брокеру для выполнения соответствующей операции.Процесс выглядит следующим образом:
видимость данных
Следует отметить, что не все данные, сохраненные на лидере раздела, могут быть прочитаны клиентом.Чтобы обеспечить согласованность данных, клиент может прочитать только данные, сохраненные всеми синхронными репликами (все реплики в ISR). .
наконец
Вы можете подписаться на мою общедоступную учетную запись WeChat, чтобы учиться и развиваться вместе.