Действительно, достаточно статьи о том, как начать работать с Кафкой.

Kafka

Знакомство с Кафкой

Что такое Кафка

Кафка питается отLinkedinРазработанная компанией, это распределенная платформа распределенного потока сообщений с несколькими разделами и несколькими репликами на основе Zookeeper, а также с открытым исходным кодом.Система обработки сообщений на основе режима публикации-подписки.

Основные термины Кафки

Сообщение: единица данных в Kafka называется消息, также известный как запись, можно рассматривать как запись строки в таблице базы данных.

пакет: для эффективности сообщение будет分批次При записи в Kafka пакет относится к набору сообщений.

Тема: Тип сообщения называется主题(Тема), можно сказать, что тема представляет собой тип сообщения. Это эквивалентно классификации сообщений. Темы похожи на таблицы в базе данных.

Раздел: тема может быть разделена на несколько разделов, и разделы в одной теме могут быть не на одной машине, а могут быть развернуты на нескольких машинах, тем самым реализуя kafka伸缩性, разделы в одной теме упорядочены, но нет гарантии, что все разделы в теме упорядочены

Производитель: клиентское приложение, которое публикует сообщения в тему, называется生产者(Производитель), производитель используется для непрерывной отправки сообщений в тему.

Потребитель: клиентская программа, которая подписывается на сообщения темы, называется消费者(Потребитель), потребитель используется для обработки сообщений, созданных производителем.

Группа потребителей: отношения между производителями и потребителями подобны отношениям между поварами и посетителями в ресторане: один повар соответствует нескольким клиентам, то есть один производитель соответствует нескольким потребителям.消费者群组(Группа потребителей) относится к группе, состоящей из одного или нескольких потребителей.

Компенсировать:偏移量(Потребительское смещение) — это своего рода метаданные, которые представляют собой постоянно увеличивающееся целочисленное значение, используемое для записи положения потребителя при перебалансировке, чтобы его можно было использовать для восстановления данных.

брокер: независимый сервер Kafka называетсяbroker, посредник получает сообщение от производителя, устанавливает смещение для сообщения и фиксирует сообщение на диске для сохранения.

брокерский кластер: брокер集群Кластер брокеров состоит из одного или нескольких брокеров, каждый из которых имеет брокера, выступающего в качестве集群控制器роль (автоматически избирается из числа активных участников кластера).

Реплика: Резервное копирование сообщений в Kafka также называется副本(Реплика), количество реплик настраивается.Кафка определяет два типа реплик: Ведущая реплика и Последовательная реплика.Первая предоставляет услуги внешнему миру, а вторая лишь пассивно следует за ней.

Ребаланс: Ребаланс. После того, как экземпляр потребителя в группе потребителей умирает, другие экземпляры потребителя автоматически переназначают раздел темы подписки. Перебалансировка — это важное средство для потребителей Kafka для достижения высокой доступности.

Особенности Kafka (принципы дизайна)

  • 高吞吐、低延迟: Самая большая особенность kafka заключается в том, что она очень быстро отправляет и получает сообщения, Kafka может обрабатывать сотни тысяч сообщений в секунду, а ее минимальная задержка составляет всего несколько миллисекунд.
  • 高伸缩性: Каждый топик (topic) содержит несколько разделов (partitions), причем разделы в топике могут быть распределены по разным хостам (брокерам).
  • 持久性、可靠性: Kafka обеспечивает постоянное хранение данных, сообщения сохраняются на диске и поддерживает резервное копирование данных для предотвращения потери данных Базовое хранилище данных Kafka хранится на основе Zookeeper, данные которого, как мы знаем, могут храниться постоянно.
  • 容错性: Позволяет узлам в кластере выходить из строя, узел выходит из строя, и кластер Kafka может нормально работать.
  • 高并发: поддержка тысяч клиентов для одновременного чтения и записи

Сценарии использования Кафки

  • Отслеживание активности: Kafka можно использовать для отслеживания поведения пользователей. Например, мы часто возвращаемся на Taobao за покупками. В тот момент, когда вы открываете Taobao, ваша регистрационная информация и количество входов в систему будут переданы Kafka в виде сообщений. Когда вы просматриваете и магазин, ваша информация о просмотре, ваш поисковый индекс и ваши увлечения покупками будут передаваться в Kafka в виде сообщений одно за другим, чтобы можно было создавать отчеты, интеллектуальные рекомендации, предпочтения при покупке и т. д.
  • Доставка сообщений: Еще одно основное использование Kafka — доставка сообщений. Приложения отправляют уведомления пользователям, доставляя сообщения. Эти компоненты приложения могут генерировать сообщения, не заботясь о формате сообщения или способе отправки сообщения.
  • Метрики: Kafka также часто используется для записи данных оперативного мониторинга. Это включает в себя сбор данных из различных распределенных приложений и создание централизованной обратной связи для различных операций, таких как сигналы тревоги и отчеты.
  • Ведение журнала: основная концепция Kafka исходит из журнала отправки.Например, мы можем отправлять обновления базы данных в Kafka, чтобы записывать время обновления базы данных, и открывать ее для различных потребителей через Kafka в качестве службы с унифицированным интерфейсом, такой как hadoop, Hbase, Solr и т. д.
  • Потоковая передача. Потоковая передача — это область, которая предлагает множество приложений.
  • Ограничение тока и пиковое сглаживание: Kafka в основном используется в области Интернета, когда в определенное время поступает слишком много запросов, и запросы могут быть записаны в Kafka, чтобы избежать сбоев службы, вызванных прямыми запросами к серверной программе.

Очередь сообщений Кафки

Очередь сообщений Kafka обычно делится на два режима: режим «точка-точка» и режим публикации-подписки.

Kafka поддерживает группы потребителей, что означает, что в Kafka будет один или несколько потребителей.Если сообщение, созданное производителем, потребляется потребителем, то этот режим является двухточечным.

Если сообщения, сгенерированные одним производителем или несколькими производителями, могут потребляться несколькими потребителями одновременно, такая очередь сообщений становится очередью сообщений режима публикации-подписки.

Архитектура системы Кафка

Как показано на рисунке выше, типичный кластер Kafka содержит несколько производителей (которые могут быть просмотрами страниц, созданными веб-интерфейсом, или журналами сервера, системным процессором, памятью и т. д.) и несколько брокеров (Kafka поддерживает горизонтальное расширение. Как правило, чем больше брокеров, тем больше (выше пропускная способность кластера), несколько групп потребителей и кластер Zookeeper. Kafka управляет конфигурацией кластера через Zookeeper, выбирает лидеров и выполняет перебалансировку при изменении группы потребителей. Производители используют режим push для публикации сообщений брокерам, а потребители используют режим pull для подписки и получения сообщений от брокеров.

Основной API

Kafka имеет четыре основных API, которые

  • API производителя, который позволяет приложениям отправлять записи сообщений в одну или несколько тем.
  • Consumer API, который позволяет приложениям подписываться на одну или несколько тем и обрабатывать поток созданных для них записей.
  • Streams API, который позволяет приложению действовать как потоковый процессор, потреблять входные потоки из одной или нескольких тем и генерировать для них выходные потоки, эффективно преобразовывая входные потоки в выходные потоки.
  • Connector API, который позволяет создавать и запускать доступных производителей и потребителей, которые подключают темы Kafka к существующим приложениям или системам данных. Например, коннектор к реляционной базе данных может фиксировать все изменения в таблице.

Почему Кафка такой быстрый

Кафка реализует零拷贝принцип быстрого перемещения данных и предотвращения переключения между ядрами. Kafka может отправлять записи данных пакетами, от производителя в файловую систему (журнал темы Kafka) к потребителю, и эти пакеты данных можно просматривать от начала до конца.

Пакетная обработка может выполнять более эффективное сжатие данных и уменьшить задержку ввода-вывода. Kafka использует последовательную запись на диск, чтобы избежать ненужной траты случайной адресации диска. Дополнительные сведения об адресации диска см.Диск твердых основных знаний, которые должны знать программисты.

Подводя итог, можно выделить четыре основных момента.

  • последовательное чтение и запись
  • нулевая копия
  • сжатие сообщений
  • отправлять партиями

Установка Kafka и важная настройка

Установка Kafka I должна быть более подробно описана в первой статье серии Kafka Подробнее см.Познакомить вас с КафкойЭта статья.

Тогда давайте в основном поговорим о настройке важных параметров в Kafka.Эти параметры очень важны для Kafka.

конфигурация на стороне брокера

  • broker.id

Каждый брокер kafka имеет уникальный идентификатор для представления, этот уникальный идентификатор — broker.id, а его значение по умолчанию — 0. Это значение должно быть уникальным в кластере kafka, это значение можно задать произвольно,

  • port

Если вы запустите kafka с образцом конфигурации, он будет прослушивать порт 9092. Измените параметр конфигурации порта, чтобы установить его на любой порт. Учтите, что если вы используете порты ниже 1024, вам нужно запускать какфа с привилегиями root.

  • zookeeper.connect

Адрес Zookeeper, используемый для сохранения метаданных брокера, указывается через zookeeper.connect. Например, я могу указатьlocalhost:2181Указывает, что Zookeeper работает на локальном порту 2181. мы также можем пройти например, мы можем пройтиzk1:2181,zk2:2181,zk3:2181указать несколько значений параметров для zookeeper.connect. Параметр конфигурации представляет собой набор разделенных двоеточиямиhostname:port/pathсписок со следующими значениями

hostname — это имя компьютера или IP-адрес сервера Zookeeper.

port — номер порта клиента Zookeeper.

/path — необязательный путь Zookeeper, используется путь KafkachrootСреда, если не указано, используется путь по умолчанию.

Если у вас есть два кластера Kafka, допустим, это kafka1 и kafka2, тоzookeeper.connectПараметры можно указать так:zk1:2181,zk2:2181,zk3:2181/kafka1иzk1:2181,zk2:2181,zk3:2181/kafka2

  • log.dirs

Kafka сохраняет все сообщения на диск, а каталог, в котором хранятся эти фрагменты журнала, находится черезlog.dirsЧтобы сформулировать, это локальный системный путь, разделенный запятыми, log.dirs не имеет значения по умолчанию,Вы должны вручную указать его значение по умолчанию. На самом деле есть еще один параметрlog.dir, как известно, эта конфигурация неsДа, по умолчанию нужно настроить только log.dirs, например, можно передать/home/kafka1,/home/kafka2,/home/kafka3Это настраивает значение этого параметра.

  • num.recovery.threads.per.data.dir

В следующих трех случаях Кафка будет использовать可配置的线程池для обработки фрагментов журнала.

Сервер запускается нормально, что используется для открытия фрагментов журнала для каждого раздела;

Перезапустите после сбоя сервера, чтобы проверить и обрезать фрагменты журнала для каждого раздела;

Сервер корректно выключается для закрытия фрагментов журнала.

По умолчанию для каждого каталога журналов используется только один поток. Поскольку эти потоки используются только при запуске и выключении сервера, можно настроить большое количество потоков для достижения цели бесперебойной работы. Особенно на серверах с большим количеством разделов, в случае сбоя, использование операции строки скважины может сэкономить часы времени во время восстановления. При установке этого параметра обратите внимание, что настроенный номер соответствует одному каталогу журналов, указанному в log.dirs. То есть, если для num.recovery.threads.per.data.dir установлено значение 8, а в log.dir указано 3 пути, то всего требуется 24 потока.

  • auto.create.topics.enable

По умолчанию kafka будет использовать три способа автоматического создания тем, вот три случая:

Когда продюсер начинает писать сообщения в тему

Когда потребитель начинает читать сообщения из темы

Когда любой клиент отправляет запрос метаданных в тему

auto.create.topics.enableЯ предлагаю установить для параметра значение false, что означает, что тема не может быть создана автоматически. В нашей онлайн-среде много тем со странными названиями, думаю, это, наверное, из-за того, что этот параметр установлен в true.

Конфигурация темы по умолчанию

Kafka предоставляет множество параметров конфигурации по умолчанию для вновь создаваемых тем, давайте вместе познакомимся с этими параметрами.

  • num.partitions

Параметр num.partitions указывает, сколько разделов должна содержать вновь созданная тема. Если функция автоматического создания темы включена (эта функция включена по умолчанию), количество разделов темы равно значению, указанному в этом параметре. Значение по умолчанию этого параметра равно 1. Обратите внимание, что мы можем увеличить количество тематических разделов, но не уменьшить количество разделов.

  • default.replication.factor

Этот параметр относительно прост. Он указывает количество копий сообщений, сохраненных kafka. Если одна копия выходит из строя, другая может продолжать предоставлять услуги. Значение default.replication.factor по умолчанию равно 1. Этот параметр действителен после включения функция автоматического создания темы. .

  • log.retention.ms

Кафка обычно определяет, как долго данные могут храниться в зависимости от времени. По умолчанию параметр log.retention.hours используется для настройки времени, которое по умолчанию составляет 168 часов, что составляет одну неделю. В дополнение к этому есть два параметра log.retention.minutes и log.retentiion.ms. Эти три параметра имеют одинаковую функцию, все они определяют, как долго сообщение будет удаляться, рекомендуется использовать log.retention.ms.

  • log.retention.bytes

Другой способ сохранить сообщение — определить, истек ли срок его действия. Его значение передается через параметрlog.retention.bytesуказать, действуя на каждый раздел. То есть, если у вас есть тема с 8 разделами и для log.retention.bytes установлено значение 1 ГБ, то тема может хранить до 8 ГБ данных. Следовательно, когда количество разделов темы увеличивается, данные, которые могут храниться всей темой, также увеличиваются.

  • log.segment.bytes

Все вышеперечисленные журналы действуют на фрагменты журнала, а не на отдельные сообщения. Когда сообщения поступают на брокер, они добавляются к текущему сегменту журнала раздела.Когда размер сегмента журнала достигает верхнего предела, указанного log.segment.bytes (по умолчанию 1 ГБ), текущий сегмент журнала будет закрыт и новый сегмент журнала будет открыт. Если сегмент журнала закрыт, он начинает ожидать истечения срока действия. Чем меньше значение этого параметра, тем чаще будут закрываться и размещаться новые файлы, что снижает общую эффективность записи на диск.

  • log.segment.ms

Как упоминалось выше, фрагмент журнала должен дождаться истечения срока действия после закрытия, а затемlog.segment.msЭтот параметр представляет собой сумму параметров, указывающую, как долго журнал должен быть закрыт. Между log.segment.ms и log.retention.bytes нет проблемы взаимного исключения. Фрагменты журнала закрываются при достижении ограничения по размеру или времени, в зависимости от того, что наступит раньше.

  • message.max.bytes

брокер, установивmessage.max.bytesПараметр для ограничения размера одного сообщения, по умолчанию 1000 000, что составляет 1 МБ, если производитель попытается отправить сообщение, превышающее этот размер, не только сообщение не будет получено, но и сообщение об ошибке, возвращенное маклер. Как и другие параметры конфигурации, связанные с байтами, этот параметр относится к размеру сжатого сообщения, то есть, если сжатое сообщение меньше, чем message.max.bytes, фактический размер сообщения может быть больше этого значения.

Это значение оказывает значительное влияние на производительность. Чем выше значение, тем больше времени требуется потоку, отвечающему за обработку сетевых подключений и запросов, для обработки этих запросов. Это также увеличивает размер блока записи на диск, что влияет на пропускную способность ввода-вывода.

  • retention.ms

Указывает, как часто сохраняется сообщение темы.По умолчанию 7 дней, то есть тема может сохранять сообщения только 7 дней. После установки этого значения оно перезапишет значение глобального параметра на стороне брокера.

  • retention.bytes

retention.bytes: указывает, сколько места на диске нужно зарезервировать для темы. Подобно глобальным параметрам, это значение часто полезно в многопользовательских кластерах Kafka. Текущее значение по умолчанию равно -1, что означает неограниченное использование дискового пространства.

Конфигурация параметров JVM

Версия JDK обычно рекомендуется для прямого использования JDK1.8, эта версия также является предпочтительной версией для большинства программистов в Китае.

Когда дело доходит до настроек на стороне JVM, от этого никуда не деться.Эта тема, один из наиболее уважаемых методов настройки в отрасли, заключается в том, чтобы напрямую установить размер кучи JVM равным 6 ГБ, что позволит избежать многих ошибок.

Еще одним важным параметром конфигурации стороны JVM является настройка сборщика мусора, которую часто называютGCнастраивать. Если вы все еще используете Java 7, вы можете выбрать подходящий сборщик мусора в соответствии со следующими правилами:

  • Если ресурсов ЦП машины, на которой расположен брокер, очень много, рекомендуется использовать сборщик CMS. Метод включения заключается в том, чтобы указать-XX:+UseCurrentMarkSweepGC.
  • В противном случае используйте сборщик пропускной способности. Способ открытия указать-XX:+UseParallelGC.

Конечно, если вы уже используете Java 8, просто используйте сборщик G1 по умолчанию. Без какой-либо настройки G1 работает лучше, чем CMS, в основном из-за меньшего количества полных сборщиков мусора, меньшего количества параметров для настройки и т. д., поэтому просто используйте G1.

Как правило, для настройки G1 нужны только эти два параметра.

  • MaxGCPauseMillis

Этот параметр указывает время паузы по умолчанию для каждой сборки мусора. Это значение не является фиксированным, и G1 можно использовать в течение более длительного времени по мере необходимости. Его значение по умолчанию — 200 мс, что означает, что каждый раунд сборки мусора занимает около 200 мс.

  • InitiatingHeapOccupancyPercent

Этот параметр указывает процент памяти кучи, которую G1 может использовать перед началом нового раунда сборки мусора.Значение по умолчанию — 45, что означает, что G1 не включит сборку мусора, пока использование кучи не достигнет 45. В этот процент входят как молодые, так и старые поколения.

Kafka Producer

У Кафки мы называем сторону, производящую сообщение,生产者Например, мы часто возвращаемся на Taobao за покупками. В тот момент, когда вы открываете Taobao, ваша регистрационная информация и количество входов в систему будут переданы серверной части Kafka в виде сообщений. ваши увлечения покупками будут передаваться в серверную часть Kafka в виде сообщений одно за другим, а затем Taobao будет давать умные рекомендации, основанные на ваших увлечениях, так что ваш кошелек никогда не сможет устоять перед искушением, тогда эти производители генерируют消息Как он попал в приложение Kafka? Как проходит процесс доставки?

Хотя генерация сообщения очень проста, процесс отправки сообщения все еще относительно сложен, как показано на рисунке.

Мы начинаем с созданияProducerRecordОбъект запускается, ProducerRecord — это основной класс в Kafka, который представляет собой набор Kafka, который необходимо отправить.key/valueПара ключ-значение, состоящая из имени темы, в которую должна быть отправлена ​​запись, необязательного номера раздела и необязательной пары ключ-значение.

При отправке ProducerRecord нам нужно преобразовать объект пары ключ-значение сериализатором в массив байтов, чтобы их можно было передавать по сети. Затем сообщение достигает разделителя.

Если при отправке указан действительный номер раздела, этот раздел будет использоваться при отправке записей. Если при отправке раздел не указан, раздел будет указан с использованием карты хэш-функции ключа. Если во время отправки нет ни номера раздела, ни номера раздела, раздел будет выделен в циклическом режиме. Как только раздел выбран, производитель знает, в какую тему и раздел отправлять данные.

ProducerRecord также имеет связанную временную метку. Если пользователь не предоставит временную метку, производитель будет использовать текущее время в качестве временной метки в записи. Временная метка, которую использует Kafka, зависит от типа временной метки, настроенного для темы.

  • Если тема настроена на использованиеCreateTime, метка времени в записи производителя будет использоваться брокером.
  • Если тема настроена на использованиеLogAppendTime, метка времени в записи производителя будет перезаписана посредником при добавлении сообщения в его журнал.

Затем это сообщение сохраняется в пакете записей, и все сообщения в этом пакете отправляются в ту же тему и раздел. Отдельный поток отвечает за их отправку брокеру Kafka.

Kafka Broker вернет ответ, когда получит сообщение, и, если запись прошла успешно, он вернет объект RecordMetaData,Он содержит информацию о теме и разделе, а также смещение, записанное в разделе. Вышеупомянутые два типа временных меток также возвращаются пользователю.. Если запись не удалась, возвращается ошибка. Производитель попытается повторно отправить сообщение после получения ошибки, и если он по-прежнему терпит неудачу через несколько раз, он вернет сообщение об ошибке.

Создать производителя Kafka

Чтобы писать сообщения в Kafka, вам сначала нужно создать объект-производитель и установить некоторые свойства. Производитель Kafka имеет 3 обязательных свойства

  • bootstrap.servers

Это свойство указывает список адресов брокера, формат адресаhost:port. Список не обязательно должен содержать все адреса брокеров, производитель найдет информацию о другом брокере у данного брокера. Однако рекомендуется, по крайней мере,两个Информация о брокере, если один из них выйдет из строя, производитель все равно сможет подключиться к кластеру.

  • key.serializer

Брокер должен получить сериализованныйkey/valueзначение, поэтому сообщение, отправленное производителем, должно быть сериализовано перед доставкой брокеру Kafka. Производитель должен знать, как преобразовать объект Java в массив байтов. key.serializer должен быть установлен на реализацию, которая реализуетorg.apache.kafka.common.serialization.SerializerКласс интерфейса, который производитель будет использовать для сериализации ключевых объектов в массивы байтов. Расширьте класс Serializer здесь

Сериализатор — это интерфейс, указывающий, как класс будет сериализован. Его функция — преобразовывать объекты в байты. Классы, реализующие интерфейс сериализатора, в основном включают:ByteArraySerializer,StringSerializer,IntegerSerializer, где ByteArraySerialize — сериализатор по умолчанию, используемый Kafka, есть много других сериализаторов, вы можете передатьздесьПроверьте другие сериализаторы. Одно замечание:key.serializer должен быть установлен, даже если вы собираетесь отправлять только содержимое значения.

  • value.serializer

Как и key.serializer, класс, указанный в value.serializer, сериализует значение.

В следующем коде показано, как создать производителя Kafka, здесь указаны только необходимые свойства, остальные используют конфигурацию по умолчанию.

private Properties properties = new Properties();
properties.put("bootstrap.servers","broker1:9092,broker2:9092");
properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties = new KafkaProducer<String,String>(properties);

объяснить этот код

  • Сначала создайте объект свойств
  • использоватьStringSerializerСериализатор сериализует пары ключ/значение
  • Здесь мы создаем новый объект-производитель, устанавливаем соответствующий тип для ключа и передаем ему объект Properties.

Кафка отправка сообщения

После создания экземпляра объекта производителя вы можете начать отправлять сообщения. Отправка сообщений в основном выполняется следующими способами.

простой обмен сообщениями

Простейшая отправка сообщений в Kafka выглядит следующим образом:

ProducerRecord<String,String> record =
                new ProducerRecord<String, String>("CustomerCountry","West","France");

producer.send(record);

Производитель в кодеsend()метод требуетProducerRecordОбъект передается в качестве параметра.ProducerRecord имеет множество конструкторов, о которых мы поговорим ниже. Здесь вызывается тот,

public ProducerRecord(String topic, K key, V value) {}

В этот конструктор необходимо передать тему, ключ и значение.

После передачи соответствующих параметров производитель вызывает метод send() для отправки сообщения (объект ProducerRecord). На диаграмме архитектуры производителя видно, что сообщение сначала записывается в буфер в разделе, а затем пакетами отправляется брокеру Kafka.

После успешной отправки метод send() вернетFuture(java.util.concurrent)объект, тип будущего объектаRecordMetadataType, наш код выше не учитывает возвращаемое значение, поэтому соответствующий объект Future не генерируется, поэтому нет способа узнать, было ли сообщение отправлено успешно. Если информация не очень важна или не повлияет на результат, ее можно отправить таким образом.

Мы можем игнорировать ошибки, которые могут возникнуть при отправке сообщений или ошибки на стороне сервера, но до отправки сообщения производитель также может быть другими исключениями. Эти аномалии могут бытьSerializationException(序列化失败),BufferedExhaustedException 或 TimeoutException(说明缓冲区已满), илиInterruptedException(说明发送线程被中断)

Отправлять сообщения синхронно

Второй механизм отправки сообщений выглядит следующим образом

ProducerRecord<String,String> record =
                new ProducerRecord<String, String>("CustomerCountry","West","France");

try{
  RecordMetadata recordMetadata = producer.send(record).get();
}catch(Exception e){
  e.printStackTrace();
}

Этот способ отправки сообщений является улучшением по сравнению с описанным выше методом отправки: сначала вызывается метод send(), а затем вызывается метод get() для ожидания ответа от Kafka. Метод get() выдает исключение, если сервер возвращает ошибку, и если ошибки нет, мы получаемRecordMetadataОбъект, который можно использовать для просмотра записей сообщений.

Производитель (KafkaProducer) будет иметь два типа ошибок в процессе отправки: одна из них — ошибка повторной попытки, которая может быть устранена повторной отправкой сообщения. Например, ошибка соединения может быть решена повторным установлением соединения;Ошибки могут быть устранены путем переизбрания лидеров разделов. KafkaProducer настроен на автоматический повтор, и если проблема не может быть решена после нескольких попыток, будет выдано исключение повторной попытки. Другой тип ошибки, которую невозможно устранить повторной попыткой, например消息过大Для этого типа ошибки KafkaProducer не будет повторять попытку и напрямую создавать исключение.

Отправляйте сообщения асинхронно

Существует проблема с синхронной отправкой сообщений, то есть одновременно может быть отправлено только одно сообщение, что приведет к невозможности отправки многих сообщений напрямую, что приведет к задержке сообщений и невозможности максимизировать преимущества.

Например, для передачи сообщения между приложением и кластером Kafka требуется 10 мс. Отправка 100 сообщений занимает 1 секунду, если ждать ответа после отправки каждого сообщения, но если异步Таким образом, отправка 100 сообщений займет гораздо меньше времени. Большую часть времени, хотя Кафка вернетсяRecordMetadataсообщение, но нам не нужно ждать ответа.

Для обработки исключений при асинхронной отправке сообщений производитель предоставляет поддержку обратного вызова. Ниже приведен пример обратного вызова

ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>("CustomerCountry", "Huston", "America");
        producer.send(producerRecord,new DemoProducerCallBack());


class DemoProducerCallBack implements Callback {

  public void onCompletion(RecordMetadata metadata, Exception exception) {
    if(exception != null){
      exception.printStackTrace();;
    }
  }
}

Чтобы сначала реализовать обратный вызов, вам нужно определить реализациюorg.apache.kafka.clients.producer.Callbackкласс, этот интерфейс имеет только одинonCompletionметод. Если kafka вернет ошибку, метод onCompletion выдаст ненулевое (не null) исключение, здесь мы его просто выводим, если это производственная среда, требующая более детальной обработки, а затем передаем его, когда метод send() отправляет объект обратного вызова Callback.

Механизм разделения производителей

Kafka читает и записывает данные на основе分区Для большей детализации разделы могут быть распределены между несколькими хостами (брокерами), чтобы каждый узел мог обеспечить независимую запись и чтение данных, а также увеличить пропускную способность кластера Kafka за счет добавления новых узлов.负载均衡Эффект.

Выше мы ввели три способа отправки производителями:不管结果如何直接发送,发送并返回结果,发送并回调. Поскольку сообщение хранится в разделе темы, когда источник отправляет сообщение в тему, как определить, в каком разделе будет храниться сообщение?

На самом деле это встроено в механизм разделения Kafka.

Стратегия разделения

Стратегия разделения Kafka относится к алгоритму, в который раздел отправляется производитель. Kafka предоставляет нам стратегию разделения по умолчанию, а также поддерживает настройку стратегии разделения.

Если вы хотите настроить стратегию разделения, вам нужно отобразить параметры, которые настраивают сторону производителя.Partitioner.class, мы можем взглянуть на этот класс, который находится по адресуorg.apache.kafka.clients.producerпод пакетом

public interface Partitioner extends Configurable, Closeable {
  
  public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);

  public void close();
  
  default public void onNewBatch(String topic, Cluster cluster, int prevPartition) {}
}

Класс Partitioner имеет три метода, которые объясняются отдельно.

  • partition(): этот класс имеет несколько параметров:topic, указывающий подлежащий передаче предмет;keyПредставляет значение ключа в сообщении;keyBytesУказывает, что сериализованный ключ в разделе передается в виде массива байтов;valueПредставляет значение сообщения;valueBytesПредставляет массив сериализованных значений в разделе;clusterУказывает необработанные данные текущего кластера. Kafka дает вам так много информации в надежде, что вы сможете в полной мере использовать эту информацию, чтобы разделить сообщение и выяснить, в какой раздел его отправить.
  • закрыть() : унаследованоCloseableИнтерфейс может реализовать метод close(), который вызывается при закрытии раздела.
  • onNewBatch(): указывает, что разделитель уведомлен о создании нового пакета.

Среди них метод partition() тесно связан со стратегией разделения.Существуют следующие стратегии разделения

последовательный опрос

При последовательном распространении сообщения равномерно распределяются по каждому разделу, то есть каждый раздел сохраняет сообщение один раз. как ниже

На приведенном выше рисунке показана стратегия опроса.Стратегия опроса — это стратегия по умолчанию, предоставляемая Kafka Producer.Если вы не используете указанную стратегию опроса, Kafka по умолчанию будет использовать стратегию последовательного опроса.

случайный опрос

Короче говоря, случайный опрос заключается в случайном сохранении сообщений в разделе, как показано на следующем рисунке.

Код для реализации случайного распределения требует всего две строки, как показано ниже.

List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return ThreadLocalRandom.current().nextInt(partitions.size());

Сначала подсчитайте общее количество разделов для темы, а затем случайным образом верните положительное целое число, меньшее его.

По сути, стратегия random также стремится равномерно распределить данные по каждому разделу, но по фактической производительности она уступает стратегии опроса, поэтомуЕсли стремление к равномерному распределению данных, лучше использовать стратегию опроса. На самом деле случайная стратегия — это стратегия разделения, используемая старой версией производителя, а в новой версии она заменена на опрос.

Сохранение сообщений по ключу

Эту стратегию также называютkey-orderingСтратегия, каждое сообщение в Kafka будет иметь свой собственный ключ.Как только сообщение будет определено с помощью ключа, вы можете гарантировать, что все сообщения с одним и тем же ключом попадают в один и тот же раздел, потому что обработка сообщений в каждом разделе имеет последовательный характер, поэтому эта стратегия называется стратегией сохранения порядка по ключу сообщения, как показано на следующем рисунке.

Метод разделения для реализации этой стратегии столь же прост и требует только следующих двух строк кода:

List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return Math.abs(key.hashCode()) % partitions.size();

Вышеуказанные стратегии разбиения являются относительно базовыми стратегиями, кроме того, вы также можете настроить стратегию разбиения.

Механизм сжатия производителя

Термин "сжатие" – это просто взаимозаменяемая идея. Это классическая идея использования процессорного времени для обмена дисковым пространством или объемом передачи операций ввода-вывода в надежде сократить использование диска или передачу операций ввода-вывода с меньшими затратами ресурсов процессора. Меньше сети I /О переводы. Если вы еще этого не знаете, надеюсь, вы сначала прочтете эту статью.Алгоритм сжатия основных знаний, которые необходимо знать программистамА потом вы понимаете, что происходит с сжатием.

Что такое сжатие Kafka

Сообщения Kafka делятся на два слоя: коллекции сообщений и сообщения. Коллекция сообщений содержит несколько элементов журнала, и в элемент журнала инкапсулируется настоящее сообщение. Базовый журнал сообщений Kafka состоит из ряда элементов журнала сбора сообщений. Кафка обычно не работает напрямую с конкретным сообщением, она всегда работает на уровне сбора сообщений.写入работать.

В Kafka сжатие происходит в двух местах: Kafka Producer и Kafka Consumer, зачем включать сжатие? Грубо говоря, новость слишком большая, и ее необходимо变小一点чтобы сообщения шли быстрее.

Используется в Kafka Producercompression.typeвключить сжатие

private Properties properties = new Properties();
properties.put("bootstrap.servers","192.168.1.9:9092");
properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.put("compression.type", "gzip");

Producer<String,String> producer = new KafkaProducer<String, String>(properties);

ProducerRecord<String,String> record =
  new ProducerRecord<String, String>("CustomerCountry","Precision Products","France");

Приведенный выше код указывает на то, что алгоритм сжатия Producer использует GZIP.

Есть компрессия, должна быть декомпрессия, производитель использует алгоритм сжатия для сжатия сообщения и отправляет его на сервер, а затем потребитель распаковывает его.Поскольку используемый алгоритм сжатия отправляется вместе с ключом и значением, потребитель знает, какой алгоритм сжатия использовать.

Настройка важных параметров Kafka

в предыдущей статьеПознакомить вас с КафкойВ этой статье мы в основном вводим параметры построения кластера Kafka. В этой статье мы представим важную конфигурацию производителя Kafka. У производителя есть много настраиваемых параметров. В документе (kafka.apache.org/document ATI…

key.serializer

Сериализация для ключевых ключей, которая реализуетorg.apache.kafka.common.serialization.Serializerинтерфейс

value.serializer

Реализована сериализация значений значенийorg.apache.kafka.common.serialization.Serializerинтерфейс

acks

Параметр acks указывает, сколько реплик раздела должно получить сообщение, прежде чем производитель сочтет сообщение успешно записанным. Этот параметр больше влияет на потерю сообщений.

  • Если acks = 0, это означает, что производитель не знает, было ли сгенерированное им самим сообщение получено сервером, и знает, что оно было успешно записано. Если на пути отправки есть ошибка, производитель не знает об этом, и это еще больше сбивает с толку, потому что сообщение не возвращается. Это похоже на протокол транспортного уровня UDP, пока он отправляется, серверу все равно, принимает он его или нет.
  • Если acks = 1, пока лидер кластера получает сообщение, он возвращает сообщение производителю, сообщая ему, что запись прошла успешно. Если в процессе отправки происходит сбой в сети или лидер не был выбран, а другие условия приводят к сбою записи сообщения, производитель получит сообщение об ошибке.В это время производитель часто повторно отправляет данные. Поскольку отправка сообщений также делится на同步и异步, Kafka решит, следует ли отправлять синхронно или асинхронно, чтобы обеспечить эффективную передачу сообщений. Если вы позволите клиенту ждать ответа от сервера (путем вызоваFutureсерединаget()метод), что, очевидно, увеличивает задержку, и если клиент использует обратные вызовы, это будет исправлено.
  • Если acks = all, в этом случае производитель получит сообщение от сервера только в том случае, если все узлы, участвующие в репликации, получили сообщение. Однако его задержка выше, чем при acks = 1, потому что мы ожидаем получения сообщения более чем одним серверным узлом.

buffer.memory

Этот параметр используется для установки размера буфера памяти производителя, который используется производителем для буферизации сообщений, отправляемых на сервер. Если приложение отправляет сообщения на сервер быстрее, чем может, это приведет к нехватке места у производителя. В этот момент вызов метода send() либо заблокирует, либо выдаст исключение, в зависимости отblock.on.buffer.nullУстановка параметров.

compression.type

Этот параметр указывает, какой алгоритм сжатия включен производителем. По умолчанию сообщения не будут сжиматься, когда их отправятся. Этот параметр может быть установлен на Snappy, GZIP и LZ4, который указывает, какой алгоритм сжатия используется для сжатия сообщения перед отправкой его брокера. Ниже приведено сравнение каждого алгоритма сжатия

retries

Ошибка, которую производитель получает от сервера, может быть временной ошибкой (например, раздел не может найти лидера), в этом случаеreteisЗначение параметра определяет, сколько раз производитель может повторно отправить сообщение, если это число будет достигнуто, производитель прекратит повторную попытку и вернет ошибку. По умолчанию производитель ждет 100 мс между каждой повторной попыткой, этот параметр ожидания можно передатьretry.backoff.msмодифицировать.

batch.size

Если в один и тот же раздел необходимо отправить несколько сообщений, производитель поместит их в один пакет. Этот параметр указывает объем памяти, который может использовать пакет, в байтах. Когда пакет заполнен, все сообщения в пакете будут отправлены. Однако скважины-производители не обязательно ждут, пока пакет не будет заполнен, прежде чем отправлять, и может быть отправлено любое количество сообщений.

client.id

Этот параметр может быть любой строкой, сервер будет использовать ее для идентификации источника сообщения, обычно настраиваемого в журнале.

max.in.flight.requests.per.connection

Этот параметр указывает, сколько сообщений производитель может отправить до получения ответа от сервера, более высокие значения будут занимать больше памяти, но также увеличат пропускную способность. Установка значения 1 гарантирует, что сообщения будут записываться на сервер в том порядке, в котором они были отправлены.

timeout.ms, request.timeout.ms и metadata.fetch.timeout.ms

request.timeout.ms указывает время ответа, которое производитель ожидает, пока сервер вернет его при отправке данных, а metadata.fetch.timeout.ms указывает время, которое производитель ожидает, пока сервер вернет ответ при получении метаданных (например, кто является лидером целевого раздела). Если время ожидания истечет, производитель либо повторит попытку отправки данных, либо вернет ошибку. timeout.ms указывает время, в течение которого брокер ожидает, пока синхронная реплика вернет подтверждение сообщения, которое соответствует конфигурации запросов. Если в течение указанного времени подтверждение от синхронной реплики не получено, брокер вернет ошибку.

max.block.ms

Этот параметр указывает время блокировки производителя при вызове метода send() или использовании метода partitionFor() для получения метаданных.Эти методы блокируются, когда буфер отправки производителя захвачен или метаданные недоступны. Когда время блокировки достигает max.block.ms, производитель выдает исключение тайм-аута.

max.request.size

Этот параметр используется для управления размером запроса, отправляемого производителем. Он может относиться к максимальному размеру одного сообщения, которое может быть отправлено, или к общему размеру всех сообщений в одном запросе.

получить.буфер.байты и отправить.буфер.байты

Kafka реализована на основе TCP, для обеспечения надежной передачи сообщений эти два параметра задают размер буфера для TCP Socket для приема и отправки пакетов соответственно. Если они установлены на -1, используются значения операционной системы по умолчанию. Эти значения могут быть соответствующим образом увеличены, если производитель или потребитель находится в другом центре обработки данных, чем брокер.

Kafka Consumer

использование приложенияKafkaConsumerПодпишитесь на темы из Kafka и получайте сообщения из этих тем перед их сохранением. Сначала приложению необходимо создать объект KafkaConsumer, подписаться на тему и начать принимать сообщения, проверять сообщения и сохранять результаты. Через какое-то время скорость, с которой производитель пишет в тему, превышает скорость, с которой приложение может проверить данные. Что мне делать в это время? Если используется только один потребитель, приложение не сможет идти в ногу со скоростью генерации сообщений, так как несколько производителей пишут сообщения в одну и ту же тему, в это время требуется участие нескольких потребителей в потреблении сообщений. в теме Отправьте сообщение.

Потребители Кафки принадлежат к消费者群组. Потребители в группе подписываются на相同Тема, каждый потребитель получает сообщения для подмножества разделов темы. Ниже приведена схема использования разделов Kafka.

Тема T1 на рисунке выше имеет четыре раздела, а именно: раздел 0, раздел 1, раздел 2 и раздел 3. Мы создаем группу потребителей 1. В группе потребителей есть только один потребитель, который подписывается на тему T1 и получает все сообщения в T1. Поскольку один потребитель обрабатывает сообщения, отправленные в раздел четырьмя производителями, давление немного велико, и требуется помощник, чтобы помочь разделить задачи, поэтому он превращается в следующий рисунок.

Таким образом, потребительская способность потребителей значительно улучшается, но в некоторых средах, например, когда пользователи генерируют много сообщений, сообщений, генерируемых производителями, по-прежнему слишком много для потребителей, поэтому продолжайте увеличивать число потребителей.

Как показано на рисунке выше, сообщения, созданные каждым разделом, могут использоваться потребителями в каждой группе потребителей.Если в группу потребителей добавлено больше потребителей, избыточные потребители будут простаивать, как показано ниже.

Увеличение потребительской группы является главным способом масштабирования мощности расходов. Все вообще, мы можем сделать, увеличивая потребительскую группу расходов水平扩展提升消费能力. Вот почему при создании тем рекомендуется использовать большее количество разделов, что может увеличить количество потребителей для повышения производительности при высокой нагрузке потребления. Кроме того, количество потребителей не должно быть больше количества разделов, потому что лишние потребители простаивают и не помогают.

Важной особенностью Kafka является то, что ей нужно написать сообщение только один раз, и она может поддерживать любое количество приложений для чтения сообщения. Другими словами, каждое приложение может прочитать весь объем сообщений. Чтобы каждое приложение могло прочитать весь объем сообщений, приложение должно иметь разные группы потребителей. Для приведенного выше примера, если мы добавим новую группу потребителей G2, и эта группа потребителей имеет двух потребителей, она превратится в следующую фигуру.

В этом сценарии как группа потребителей G1, так и группа потребителей G2 могут получать полный объем сообщений темы T1, и в логическом смысле они принадлежат разным приложениям.

Подводя итог, если приложению необходимо прочитать полный объем сообщений, настройте группу потребления для приложения; если мощность потребления приложения недостаточна, вы можете рассмотреть возможность добавления потребителей в эту группу потребления.

Группы потребителей и перебалансировка разделов

Что такое потребительская группа

消费者组(Consumer Group)Это группа, состоящая из одного или нескольких экземпляров-потребителей (экземпляров-потребителей), которая представляет собой механизм масштабируемости и отказоустойчивости. потребители в группе потребителей共享Идентификатор группы потребителей, этот идентификатор также называетсяGroup ID, потребители в группе совместно подписываются и потребляют тему.Потребители в одной группе могут потреблять сообщения только из одного раздела, а избыточные потребители будут бездействовать и бесполезны.

Мы упомянули два способа потребления выше

  • Группа потребителей потребляет сообщения в теме. Этот шаблон потребления также называется点对点Метод двухточечного потребления также известен как очередь сообщений.
  • Сообщения в теме потребляются несколькими группами потребителей. Этот режим потребления также называется发布-订阅модель

перебалансировка потребителей

мы начинаем сверху消费者演变图Мы можем знать такой процесс: сначала потребитель подписывается на тему и потребляет сообщения всех ее разделов, затем потребитель присоединяется к группе, а затем к группе присоединяются другие потребители, и пример вновь присоединившихся потребителей分摊Часть сообщения исходного потребителя, такое поведение передачи права собственности на раздел через одного потребителя другим потребителям называется重平衡, английское название также называетсяRebalance. Как показано ниже

Ребалансировка очень важна, она приносит高可用性и伸缩性, мы можем безопасно добавлять или удалять потребителей, но в обычных обстоятельствах мы не хотим, чтобы это происходило. Во время перебалансировки потребители не могут читать сообщения, что делает всю группу потребителей недоступной во время перебалансировки. Кроме того, когда раздел переназначается другому потребителю, текущее состояние чтения сообщения теряется, и может потребоваться очистить кэш, что замедлит работу приложения, прежде чем оно сможет восстановить состояние.

потребителей через组织协调者(Kafka Broker) отправляет тактовые импульсы, чтобы поддерживать себя в качестве члена группы потребителей и подтверждать разделы, которыми он владеет. Для разных групп потребителей их организационные координаторы могут быть разными. Пока потребитель периодически отправляет тактовые импульсы, потребитель считается активным и обрабатывает сообщения в своем разделе. Тактовые импульсы отправляются, когда потребитель извлекает записи или отправляет записи, которые он потребляет.

Если Kafka перестанет отправлять пульсации через некоторое время, сессия (Session) истечет, и координатор организации подумает, что Потребитель умер, и вызовет перебалансировку. Если потребитель выйдет из строя и перестанет отправлять сообщения, координатор организации подождет несколько секунд, чтобы подтвердить, что он мертв, прежде чем запускать перебалансировку. в этот период,Мертвые потребители не будут обрабатывать сообщения. При очистке потребителей потребитель уведомит координатора о том, что он покидает группу, а координатор организации инициирует перебалансировку, чтобы свести к минимуму паузы в обработке.

Ребалансировка — палка о двух концах, она обеспечивает высокую доступность и масштабируемость для групп потребителей, но также имеет некоторые очевидные недостатки (ошибки), которые сообщество пока не может исправить.

Процесс восстановления баланса оказывает огромное влияние на группы потребителей. Поскольку каждый процесс перебалансировки приводит к тому, что все останавливается, обратитесь к механизму сборки мусора в JVM, то есть к Stop The World, STW (цитата из описания последовательного сборщика на стр. 76 в «Углубленное понимание Java Virtual Машина"):

Что еще более важно, он должен приостанавливать все другие рабочие потоки, пока происходит сборка мусора. до окончания его сбора.Stop The WorldНазвание звучит круто, но эта работа на самом деле инициируется и завершается автоматически виртуальной машиной в фоновом режиме, останавливая все потоки нормальной работы пользователя, когда пользователь невидим.Это проблема для многих приложений, которую трудно принять.

Другими словами, в течение периода перебалансировки экземпляры-потребители в группе потребителей перестанут потреблять и будут ждать завершения перебалансировки. И процесс ребалансировки идет медленно...

создать потребителя

Вышеупомянутая теория — это слишком, давайте объясним, как потребители потребляют через код.

Перед прочтением сообщения необходимо создатьKafkaConsumerобъект. Создание объекта KafkaConsumer очень похоже на создание объекта KafkaProducer — поместите свойства, которые необходимо передать потребителю, вpropertiesВ объекте мы остановимся на некоторых конфигурациях Kafka позже, здесь мы просто сначала создадим его, достаточно использовать три свойства, а именно:bootstrap.server,key.deserializer,value.deserializer.

Эти три атрибута использовались много раз, если вам не очень понятно, вы можете обратиться кВоспитать вас значит познакомиться с Кафкой Продюсер

Другое свойствоgroup.idЭто свойство не является обязательным, оно указывает, к какой группе потребителей принадлежит KafkaConsumer. Также можно создавать потребителей, не принадлежащих ни к одной группе.

Properties properties = new Properties();
        properties.put("bootstrap.server","192.168.1.9:9092");     properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");   properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
KafkaConsumer<String,String> consumer = new KafkaConsumer<>(properties);

Подписка на тему

Следующим шагом после создания потребителя является подписка на тему.subscribe()Метод принимает в качестве параметра список тем, что относительно просто в использовании.

consumer.subscribe(Collections.singletonList("customerTopic"));

Для простоты мы подписываемся только на тему.customerTopic, переданный параметр является регулярным выражением. Регулярное выражение может соответствовать нескольким темам. Если кто-то создает новую тему и название темы совпадает с регулярным выражением, немедленно будет запущена повторная балансировка, и потребитель получит доступ к новым темам. читать.

Чтобы подписаться на все темы, связанные с тестами, вы можете сделать

consumer.subscribe("test.*");

голосование

Мы знаем, что Kafka поддерживает модель подписки/публикации, а производитель отправляет данные брокеру Kafka, так как же потребитель узнает, что производитель отправил данные? На самом деле потребитель данных, сгенерированных производителем, не знает. KafkaConsumer использует метод опроса для регулярного получения данных от Kafka Broker. Если есть данные, они используются для потребления. Если данных нет, он будет продолжать опрос и ожидание. Ниже приводится метод опроса. Конкретная реализация ожидания

try {
  while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(100));
    for (ConsumerRecord<String, String> record : records) {
      int updateCount = 1;
      if (map.containsKey(record.value())) {
        updateCount = (int) map.get(record.value() + 1);
      }
      map.put(record.value(), updateCount);
    }
  }
}finally {
  consumer.close();
}
  • Это бесконечный цикл. На самом деле потребитель — это долго работающее приложение, которое циклически запрашивает данные у Kafka.
  • Третья строчка кода очень важна, Kafka должна периодически запрашивать данные в цикле, иначе она будет думать, что Потребитель завис, сработав ребалансировку, и его партиции будут переданы другим потребителям в группе. перейти кpoll()метод супермаркет время, сjava.time.DurationКласс, указывающий, что если для параметра установлено значение 0, метод poll() вернется немедленно, в противном случае он будет ждать, пока брокер вернет данные в течение указанного количества миллисекунд.
  • Метод poll() возвращает список записей. Каждая запись содержит информацию о теме, к которой принадлежит запись, информацию о разделе, в котором находится запись, смещение записи в разделе и пару ключ-значение записи. Обычно мы просматриваем этот список, обрабатывая каждую запись одну за другой.
  • использовать перед выходом из приложенияclose()метод закрытия потребителя. Сетевое соединение и сокет также закрываются, что немедленно запускает перебалансировку, а не ждет, пока координатор группы увидит, что он больше не отправляет контрольные сообщения, и предположит, что он мертв.

потокобезопасность

В одной и той же группе поток не может запускать несколько потребителей, а несколько потоков не могут безопасно совместно использовать потребителя. Согласно правилам, один потребитель использует один поток.Если несколько потребителей в группе потребителей хотят работать, то каждый потребитель должен работать в своем собственном потоке, который можно использовать в Java.ExecutorServiceЗапустите несколько потребителей для обработки.

Конфигурация потребителя

До сих пор мы научились использовать потребительский API, но были введены только самые основные свойства, а в документации Kafka перечислены все инструкции по настройке, относящиеся к потребителям. Большинство параметров имеют разумные значения по умолчанию, обычно их не нужно изменять, давайте введем эти параметры.

  • fetch.min.bytes

Это свойство указывает минимальное количество байтов, которое потребитель может получить с сервера. Когда брокер получает запрос данных от потребителя, если объем доступных данных меньшеfetch.min.bytesуказанный размер, то он будет ждать, пока будет доступно достаточно данных, прежде чем вернуть его потребителю. Это снижает нагрузку на потребителей и брокеров, поскольку им не нужно обрабатывать сообщения туда и обратно, когда тема не используется очень часто. Если доступных данных не так много, но загрузка ЦП потребителя высока, то это свойство необходимо установить выше значения по умолчанию. Если количество потребителей велико, увеличение значения этого свойства может снизить нагрузку на брокера.

  • fetch.max.wait.ms

Мы проходим вышеfetch.min.bytesСкажите Kafka подождать, пока не будет достаточно данных, чтобы вернуть их потребителю. иfetch.max.wait.msИспользуется для указания времени ожидания брокера, по умолчанию 500 миллисекунд. Если в kafka поступает недостаточно данных, минимальное количество данных, полученных потребителями, не будет выполнено, что приведет к задержке в 500 миллисекунд. Если вы хотите уменьшить потенциальную задержку, вы можете установить меньшее значение параметра. Если для fetch.max.wait.ms установлена ​​задержка 100 мс, а для fetch.min.bytes — 1 МБ, то Kafka либо вернет 1 МБ данных, либо 100 мс после получения потребительского запроса Все доступные данные. Это зависит от того, какое условие будет выполнено первым.

  • max.partition.fetch.bytes

Это свойство указывает сумму, которую сервер возвращает потребителям из каждого раздела.最大字节数. Его значение по умолчанию равно 1 МБ, т. е.KafkaConsumer.poll()Метод возвращает записи из каждого раздела до байтов, указанных в max.partition.fetch.bytes . Если топик имеет 20 разделов и 5 потребителей, то каждому потребителю нужно至少4 МБ свободной памяти для приема записей. При выделении памяти для потребителей можно выделять им больше, потому что если один из потребителей в группе выйдет из строя, то оставшимся потребителям потребуется иметь дело с большим количеством разделов. Значение max.partition.fetch.bytes должно быть больше, чем максимальное количество байтов, которое может получить брокер (настраивается с помощью свойства max.message.size).В противном случае потребитель не сможет прочитать эти сообщения, в результате чего потребитель зависнет и повторит попытку.. Другим фактором, который следует учитывать при настройке этого свойства, является время, необходимое потребителю для обработки данных. Потребителям необходимо часто вызывать метод poll(), чтобы избежать истечения срока действия сеанса и перебалансировки разделов.Если одним вызовом poll() возвращается слишком много данных, потребителям требуется больше времени для обработки, и они могут быть не в состоянии перейти к следующему раунду. вовремя. запрос, чтобы избежать истечения срока действия сеанса. Если это произойдет, вы можете изменить значение max.partition.fetch.bytes на меньшее или увеличить время истечения сеанса.

  • session.timeout.ms

Это свойство указывает, как долго потребитель может отключиться от сервера, прежде чем он будет считаться мертвым, по умолчанию 3 секунды. Если потребитель неsession.timeout.msЕсли в течение указанного времени координатору группы будет отправлено сердцебиение, оно будет считаться мертвым, и координатор запустит ребалансировку. назначает свои разделы другим потребителям в группе потребителей, этот атрибут такой же, какheartbeat.interval.msТесно связаны. heartbeat.interval.ms указывает, как часто метод poll() отправляет контрольные сообщения координатору группы, а session.timeout.ms указывает, как долго потребители могут ждать без отправки контрольных сигналов. Таким образом, эти два свойства обычно необходимо изменять одновременно.Heartbeat.interval.ms должен быть меньше, чем session.timeout.ms, который обычно составляет одну треть от session.timeout.ms. Если session.timeout.ms равен 3 с, то heartbeat.interval.ms должен быть равен 1 с. Установка session.timeout.ms меньше значения по умолчанию может быстрее обнаруживать и восстанавливать поврежденные узлы, но длительный опрос или сборка мусора могут привести к непреднамеренной перебалансировке. Установка для этого свойства более высокого значения может уменьшить случайную перебалансировку, но для обнаружения сбоев узлов потребуется больше времени.

  • auto.offset.reset

Этот атрибут указывает, что должен делать потребитель при чтении раздела без смещения или если смещение неверно. Его значение по умолчаниюlatest, что означает, что в случае недопустимого смещения потребитель начнет чтение с последней записи. Другое значениеearliest, что означает, что если смещение неверно, потребитель начнет чтение записей раздела с начальной позиции.

  • enable.auto.commit

Позже мы рассмотрим несколько различных способов фиксации смещений. Это свойство указывает, будет ли потребитель автоматически отправлять смещение. Значение по умолчанию — true. Чтобы максимально избежать дублирования данных и потери данных, вы можете установить для него значение false, и вы можете контролировать, когда отправлять смещение. Если вы установите значение true, вы также можете передатьauto.commit.interval.msСвойство для контроля частоты коммитов

  • partition.assignment.strategy

Мы знаем, что раздел будет закреплен за группой потребителей.PartitionAssignorОн решит, какие разделы должны быть выделены какому потребителю на основе заданного потребителя и темы.У Kafka есть две стратегии распределения по умолчанию.RangeиRoundRobin

  • client.id

Это свойство может быть любой строкой и используется брокером для идентификации сообщений, отправляемых клиентом, обычно используемых в журналах, метриках и квотах.

  • max.poll.records

Это свойство используется для управления количеством записей, которые могут быть возвращены одним вызовом метода call(), что может помочь вам контролировать объем данных, которые необходимо обработать при опросе.

  • получить.буфер.байты и отправить.буфер.байты

Размер буфера TCP, используемый сокетом для чтения и записи данных, также может быть установлен. Если они установлены на -1, используются значения операционной системы по умолчанию. Если производитель или потребитель находится в другом центре обработки данных, чем брокер, эти значения могут быть соответствующим образом увеличены, поскольку сеть через центр обработки данных обычно имеет относительно высокую задержку и относительно низкую пропускную способность.

Концепция коммитов и смещений

специальное смещение

Как мы упоминали выше, потребитель звонитpoll()Когда метод выполняет опрос по времени, он возвращает записи, записанные в Kafka производителем, но еще не использованные потребителем, поэтому мы можем отслеживать, какие записи читаются каким потребителем в группе. Потребители могут использовать Kafka для отслеживания местоположения (смещения) сообщений в разделах.

Потребители будут_consumer_offsetСообщение отправляется в специальной теме . Эта тема будет сохранять смещение раздела в каждом отправленном сообщении. Основная функция этой темы — записывать смещение после того, как потребитель инициирует перебалансировку. Потребитель отправляет сообщение в эту тему каждый раз . , при нормальных обстоятельствах перебалансировка не запускается. Эта тема не работает. Когда запускается перебалансировка, потребитель перестает работать, и каждый потребитель может быть назначен на соответствующий раздел. Эта тема позволяет потребителям продолжать обработку сообщений. set .

Если зафиксированное смещение меньше, чем последнее смещение, обработанное клиентом, то сообщения между двумя смещениями будут повторно обработаны.

Если зафиксированное смещение больше, чем смещение на момент последнего потребления, то сообщения между двумя смещениями будут потеряны.

теперь, когда_consumer_offsetТак важно, так как же оно представлено? Давайте поговорим о том, как отправить ####

API KafkaConsumer предоставляет несколько способов фиксации смещений.

автоматическая фиксация

Самый простой способ — позволить потребителям автоматически фиксировать смещения. еслиenable.auto.commitЕсли установлено значение true, то каждые 5 секунд потребитель будет автоматически отправлять максимальное смещение, опрашиваемое из метода poll(). Интервал фиксации задаетсяauto.commit.interval.msУправление, по умолчанию 5с. Как и все остальное в потребителе, автофиксация происходит при опросе. Потребитель проверяет в каждом опросе, зафиксировано ли смещение, и если да, то фиксирует смещение, возвращенное из предыдущего опроса.

зафиксировать текущее смещение

Пучокauto.commit.offsetУстановите значение false, чтобы позволить приложению решать, когда фиксировать смещения. использоватьcommitSync()Зафиксировать смещение. Этот API отправит последнее смещение, возвращенное методом poll(), вернется сразу после успешной отправки и выдаст исключение, если отправка не удалась.

commitSync() зафиксирует последнее смещение, возвращенное poll(). Если вы обработали все записи, обязательно вызовите commitSync(), иначе все равно есть риск потери сообщений. Будут обработаны все сообщения от сообщения до баланса неоднократно.

Асинхронная фиксация

Асинхронная фиксацияcommitAsync()с синхронной фиксациейcommitSync()Самая большая разница заключается в том, что асинхронные фиксации не будут повторяться, в то время как синхронные фиксации будут повторяться последовательно.

Сочетание синхронных и асинхронных коммитов

В нормальных условиях, для случайных сбоев фиксации, отсутствие повторной попытки не является большой проблемой, потому что, если сбой фиксации вызван временной проблемой, последующие фиксации всегда будут успешными. Но если последний коммит перед закрытием потребителя или перебалансировкой, убедитесь, что коммит прошел успешно.

следовательно,Комбинация commitAsync и commitSync обычно используется для фиксации смещений до того, как потребитель закроется..

Зафиксировать определенное смещение

Потребительский API позволяет вызывать методы commitSync() и commitAsync() с картой разделов и смещений, которые необходимо зафиксировать, то есть зафиксировать определенное смещение.

Давайте сделаем рекламу для себя. Добро пожаловать в официальный аккаунт Java Builder. Владелец аккаунта - стек технологий Java. Он любит технологии, любит читать, любит делиться и подводить итоги. Я надеюсь поделиться с вами каждой хорошей статьей на пути роста. Подпишитесь на официальный аккаунт, чтобы ответить 002, чтобы получить специально подготовленный для вас большой подарочный пакет, который вам обязательно понравится и вы соберете его.

Ссылка на статью:

Самое подробное изложение принципов в истории Кафки

Полное руководство по Кафке

kafka.apache.org/

kafka.apache.org/document ATI…

woohoo.tutorial kart.com/apache-kafka…

D zone.com/articles/ Я тоже…

«Geek Time — основные технологии и практика Kafka»