[Кафка] "Полное руководство по Кафке" - Чтение данных из Кафки

Java

Приложения используют KafkaConsumer для подписки тем на Kafka и получения сообщений из подписанных тем. Чтение данных из Kafka отличается от чтения данных из других автоматических систем и включает в себя некоторые уникальные концепции и идеи. Трудно понять, как использовать потребительский API без предварительного понимания этих концепций. Итак, давайте сначала объясним эти важные концепции, а затем приведем несколько примеров, чтобы показать, как различные приложения могут быть реализованы с помощью потребительского API.

Потребители и группы потребителей

Предположим, у нас есть приложение, которому необходимо читать сообщения из темы Kafka и проверять эти сообщения перед их сохранением. Приложению необходимо создать объект-потребитель, подписаться на тему и начать получать сообщения, затем проверять сообщения и сохранять результаты. Через некоторое время производитель пишет сообщения в тему быстрее, чем приложение может проверить данные.Что делать?Если для обработки сообщений используется только один потребитель, приложение не сможет угнаться за скоростью сообщения поколение. Очевидно, что в настоящее время необходимо горизонтальное масштабирование потребителей. Точно так же, как несколько производителей могут писать сообщения в одну и ту же тему, мы также можем использовать нескольких потребителей для чтения сообщений из одной темы и разгрузки сообщений.

Потребители Kafka принадлежат к потребительским группам. Потребители в группе подписываются на одну и ту же тему, и каждый потребитель получает сообщения из части разделов темы.

Тема T1 предполагает, что есть четыре раздетра, мы создали потребителя C1, что является единственным в группе G1 потребителей, мы используем его, чтобы подписаться на темы T1. CL1 Потребители получат сообщение, связанное со всеми четырьмя разделами T1, показанные на рисунке 4-1.

Если вы добавите группу потребителей G1 в C2, каждый потребитель получит сообщение из двух разделов. Я предполагаю, что потребитель С1 получает сообщения раздела 0 и раздела 2, потребители С2 получают сообщения Области 1 и Области 3, как показано на рис. 4-2.

Если в группе G1 4 потребителя, то каждый потребитель может быть назначен на раздел, как показано на рис. 4-3.

Если мы добавим в группу потребителей больше, чем количество разделов в топике, лишние потребители будут простаивать и не будут получать никаких сообщений.

Добавление потребителей в группу является основным способом горизонтального масштабирования покупательной способности. Потребители Kafka часто выполняют операции с высокой задержкой, такие как запись данных в базу данных или HDFS, или используют данные для трудоемких вычислений. В этих случаях один потребитель не может угнаться за скоростью генерации данных, поэтому для разделения нагрузки можно добавить больше потребителей, и каждый потребитель обрабатывает сообщения только из некоторых разделов, что является основным средством горизонтального масштабирования. Нам необходимо создать большое количество разделов для топика, чтобы по мере роста нагрузки можно было добавлять больше потребителей. Но будьте осторожны, не позволяйте количеству потребителей превышать количество тематических разделов, избыточные потребители просто будут простаивать.

Помимо горизонтального масштабирования одного приложения путем добавления потребителей, часто бывает, что несколько приложений считывают данные из одной и той же темы. На самом деле, одна из основных целей дизайна Kafka — сделать так, чтобы данные в разделах Kafka соответствовали потребностям различных сценариев корпоративных приложений. В этих сценариях каждое приложение может получать все сообщения, а не только некоторые из них. Пока у каждого приложения есть своя группа потребителей, они могут получать все сообщения по теме. В отличие от традиционных систем обмена сообщениями, горизонтальное масштабирование потребителей и групп потребителей Kafka не оказывает негативного влияния на производительность.

В приведенном выше примере, если добавляется новая группа G2 только с одним потребителем, то этот потребитель будет получать все сообщения из темы T1, не затрагивая группу G1. Группа G2 может добавлять больше потребителей, и каждый потребитель может использовать несколько разделов, как и группа G1, как показано на рис. 4-5. В общем, группа G2 по-прежнему будет получать все сообщения, независимо от того, существуют ли другие группы.

Короче говоря, создайте группу потребителей для каждого приложения, которое должно получать все сообщения по одной или нескольким темам, а затем добавьте потребителей в группу, чтобы масштабировать возможности чтения и обработки.Потребители обрабатывают только часть сообщения.

Группы потребителей и ребалансировка разделов

Мы уже узнали из предыдущего раздела, которые потребители в Группе прочитают разделы темы вместе. Когда новый потребитель присоединяется к группе, он читает сообщения, изначально читаю другими потребителями. Когда потребитель выключается или выключается, он оставляет группу, и перегородки, которые были прочитаны им, будут прочитаны другими потребителями в Группе. Перераспределение разбиения возникает, когда тема изменяется, например, когда администратор добавляет новый раздел.

Передача права собственности на раздел от одного потребителя к другому называется ребалансировкой. Перебалансировка важна для обеспечения высокой доступности и масштабируемости для групп потребителей (мы можем безопасно добавлять или удалять потребителей), но мы не хотим, чтобы это происходило в обычных обстоятельствах. Во время перебалансировки потребители не могут читать сообщения, в результате чего вся группа становится недоступной в течение короткого периода времени. Кроме того, когда раздел переназначается другому потребителю, текущее состояние чтения потребителя теряется, и ему может потребоваться очистить кэш, что замедлит работу приложения до тех пор, пока оно не сможет восстановить состояние. В этой главе мы обсудим, как выполнить безопасную перебалансировку и как избежать ненужной перебалансировки.

Потребители сохраняют свою принадлежность к группам и право собственности на разделы, отправляя тактовые импульсы брокерам, назначенным координаторами групп (у разных групп могут быть разные координаторы). Пока потребитель отправляет тактовые импульсы с регулярными интервалами, он считается активным, что указывает на то, что он все еще читает сообщения в разделе. Потребители отправляют тактовые импульсы при опросе сообщений (для получения сообщений) или совершении смещений. Если потребитель перестанет отправлять тактовые импульсы достаточно долго, срок действия сеанса истечет, и координатор группы решит, что он мертв, что вызовет перебалансировку.

Если потребитель выйдет из строя и скважина перестанет читать сообщения, координатор группы (брокер) подождет несколько секунд, чтобы подтвердить, что он мертв, прежде чем запускать перебалансировку. В течение этих нескольких секунд мертвый потребитель не будет читать сообщения в разделе. При очистке потребителей потребитель уведомит координатора о том, что он покидает группу, и координатор немедленно инициирует перебалансировку, чтобы свести к минимуму паузы в обработке. В оставшейся части этой главы мы обсудим некоторые параметры конфигурации, которые управляют частотой отправки контрольных сигналов и сроком действия сеанса, а также способы настройки этих параметров в соответствии с фактическими потребностями.

Как происходит процесс выделения разделов?

Когда потребитель хочет присоединиться к группе, он отправляет запрос JoinGroup координатору группы. Первый потребитель, присоединившийся к группе, станет «владельцем группы». Владелец группы получает от координатора список членов группы (список содержит всех потребителей, которые недавно отправили контрольные сообщения, которые считаются активными), и отвечает за назначение разделов каждому потребителю. Он использует класс, реализующий интерфейс PartitionAssignor, чтобы решить, какие разделы должны быть назначены какому потребителю.

Kafka построила две стратегии распространения, которые мы обсудим в следующем разделе параметров конфигурации. После того, как распределение завершено, основная группа рассылки списка отправляется координатору группы, координатору, а затем отправляет информацию всем потребителям. Каждый потребитель может видеть только свою собственную информацию о распределении, только основная группа в группе знает информацию о распределении для всех потребителей. Этот процесс повторяется каждый раз при повторной балансировке.

Создайте потребителя Kafka

Перед чтением сообщений необходимо создать объект KafkaConsumer. Создание объекта KafkaConsumer очень похоже на создание объекта KafkaProducer — поместите свойства, которые вы хотите передать потребителю, в объект Properties. Все свойства подробно обсуждаются в последующих разделах этой главы. Здесь нам просто нужно использовать 3 необходимых свойства: bootstrap.servers, key.deserializer, value.deserializer.

Следующий код демонстрирует, как создать объект KafkaConsumer:

Properties props = new Properties();
 
props.put("bootstrap.servers", "broker1:9092, broker2:9092");
 
props.put("group.id", "CountryCounter");
 
props.put("key.deserializer", "org.apache.kafka.common.serializaiton.StrignDeserializer");
 
props.put("value.deserializer", "org.apache.kafka.common.serializaiton.StrignDeserializer");
 
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

Десериализатор преобразует массив байтов в объект Java, используя указанный класс (десериализатор).

group.id указывает, к какой группе потребителей принадлежит KafkaConsumer. group.id не требуется, но пока мы будем считать его обязательным. Он указывает, к какой группе потребителей принадлежит KafkaConsumer. Также можно создавать потребителей, не принадлежащих ни к одной группе, но это делается реже.

Подписывайтесь на темы

Следующим шагом после создания потребителя является начало подписки на тему. Метод subscribe() принимает в качестве параметра список тем.

consumer.subscribe(Collections.singletonList("customerCountries"));

Здесь мы создаем список с одним элементом, тема называется «customerCountries», мы также можем передать регулярное выражение при вызове метода subscribe(), регулярное выражение может соответствовать нескольким темам, если кто-то создал новую тему, и тема имя соответствует регулярному выражению, немедленно вызовет перебалансировку, и потребители смогут прочитать только что добавленную тему. Этот тип подписки удобен, если приложению необходимо читать несколько разделов и оно может обрабатывать разные типы данных. При репликации данных между Kafka и другими системами обычной практикой является подписка на несколько тем с использованием регулярных выражений.

Чтобы подписаться на все темы, связанные с тестированием, сделайте следующее: Consumer.subscribe("test.*");

голосование

Опрос сообщений — это ядро ​​потребительского API, запрашивающего данные с сервера посредством простого опроса. Как только потребитель подписывается на тему, опрос обрабатывает все детали, включая групповую координацию, перебалансировку разделов, отправку тактов и выборку данных, а разработчикам нужно только использовать простой набор API-интерфейсов для обработки данных, возвращаемых из разделов. Основная часть потребительского кода выглядит так:

Опрос — это больше, чем просто выборка данных. При первом вызове метода poll() нового потребителя он отвечает за поиск GroupCoordinator, затем присоединение к группе и принятие назначенного раздела. Если происходит ребалансировка, весь процесс также выполняется во время опроса. Конечно, сердцебиение также отправляется из опроса. Итак, мы хотим убедиться, что любая обработка, выполняемая во время опроса, должна выполняться как можно быстрее.

потокобезопасность

У нас не может быть ни одного потока, выполняющего несколько потребителей в одной группе, ни нескольких потоков, которые могут безопасно использовать одного потребителя. Как правило, один потребитель использует один поток. Если вы хотите запустить несколько потребителей в одной и той же группе потребителей, вам нужно разрешить каждому потребителю работать в своем собственном потоке. Лучше всего инкапсулировать логику потребителя в его собственном объекте, а затем использовать ExecutorService Java для запуска нескольких потоков, чтобы каждый потребитель работал в своем собственном потоке.

конфигурация потребителя

До сих пор мы научились использовать потребительский API, но были введены только несколько свойств конфигурации — bootstrap.servers, key.deserializer, value.deserializer, group.id. В документации Kafka перечислены все инструкции по настройке, относящиеся к потребителям. Большинство параметров имеют разумные значения по умолчанию и, как правило, не нуждаются в изменении, хотя есть некоторые параметры, которые имеют непосредственное отношение к потребительской производительности и доступности. Эти важные свойства описаны далее.

1. fetch.min.bytes

Это свойство указывает минимальное количество байтов, которое потребитель может получить с сервера. Когда брокер получает запрос данных от потребителя, если объем доступных данных меньше размера, указанного в fetch.min.bytes, он будет ждать, пока не будет достаточно доступных данных, прежде чем вернуть их потребителю. Это снижает нагрузку на потребителей и брокеров, поскольку им не нужно обрабатывать сообщения туда и обратно, когда тема не очень активна (или в нерабочее время дня). Если доступных данных не так много, но загрузка ЦП потребителя высока, то это свойство необходимо установить выше значения по умолчанию. Если количество потребителей велико, установка для этого свойства большего значения может уменьшить рабочую нагрузку брокера.

2. fetch.max.wait.ms

Мы говорим Kafka через fetch.min.bytes подождать, пока не будет достаточно данных, чтобы вернуть их потребителю. А fetch.max.wait.ms используется для указания времени ожидания брокера, по умолчанию 500 мс. Если в Kafka поступает недостаточно данных, требование потребителя о получении минимального объема данных не может быть выполнено, что приводит к задержке в 500 мс. Если вы хотите уменьшить потенциальную задержку (для выполнения SLA), вы можете установить для этого параметра более низкое значение. Если для fetch.max.wait.ms установлено значение 100 мс, а для fetch.min.bytes установлено значение 1 МБ, то Kafka либо вернет 1 МБ данных после получения запроса потребителя, либо вернет все доступные данные через 100 мс. Это зависит от того, какое условие встречается первым.

3. max.parition.fetch.bytes

Это свойство указывает максимальное количество байтов, которое сервер возвращает потребителям из каждого раздела. Его значение по умолчанию — 1 МБ, то есть метод KafkaConsumer.poll() возвращает не более байтов, указанных max.parition.fetch.bytes из каждого раздела. Если тема имеет 20 разделов и 5 потребителей, то каждому потребителю требуется не менее 4 МБ свободной памяти для получения записей. При выделении памяти для потребителей можно выделять им больше, потому что если один из потребителей в группе выйдет из строя, то оставшимся потребителям потребуется иметь дело с большим количеством разделов. Значение max.parition.fetch.bytes должно быть больше, чем максимальное количество байтов сообщений, которое может получить брокер (настраивается через свойство max.message.size), иначе потребитель не сможет прочитать эти сообщения, заставляя потребителя зависать все время попытки. Другим фактором, который следует учитывать при установке этого свойства, является время, необходимое потребителю для обработки данных. Потребителям необходимо часто вызывать метод poll(), чтобы избежать истечения срока действия сеанса и перебалансировки разделов.Если слишком много данных возвращается одним вызовом poll(), потребителям требуется больше времени для обработки, и они могут быть не в состоянии выполнить следующий опрос в время, чтобы избежать истечения сеанса. Если это произойдет, вы можете изменить значение max.parition.fetch.bytes на меньшее значение или увеличить время истечения сеанса.

4. session.timeout.ms

Это свойство указывает, как долго потребитель может отключиться от сервера, прежде чем он будет считаться мертвым, по умолчанию 3 секунды. Если потребитель не отправляет пульс координатору группы в течение времени, указанного в session.timeout.ms, он считается мертвым, и координатор инициирует перебалансировку, чтобы назначить свои разделы другим потребителям в группе. Это свойство тесно связано с heartbeat.interval.ms. heartbeat.interval.ms указывает, как часто метод poll() отправляет контрольные сигналы координатору, а session.timeout.ms указывает, как долго потребитель может ждать без отправки контрольных сигналов. Поэтому, как правило, необходимо изменять эти два свойства одновременно, а значение heartbeat.interval.ms должно быть меньше, чем session.timeout.ms, обычно это треть от session.timeout.ms. Если session.timeout.ms равен 3 с, то heartbeat.interval.ms должен быть равен ls. Установка session.timeout.ms меньше значения по умолчанию может быстрее обнаруживать и восстанавливать поврежденные узлы, но длительный опрос или сборка мусора могут вызвать непреднамеренную перебалансировку. Установка большего значения для этого свойства может уменьшить количество случайных перебалансировок, но обнаружение сбоев узлов займет больше времени.

5. auto.offset.reset

Это свойство указывает, что потребитель должен делать при чтении раздела без смещения или если смещение недействителен (потому что потребитель истек в течение длительного времени, запись, содержащая смещение, стала устаревшей и удалена). Его значение по умолчанию является последним, что означает, что в случае неверного смещения потребитель начнет чтение данных из последней записи (записи, сгенерированные после начала потребителя). Другое значение самое раннее, что означает, что потребитель будет прочитать записи раздела из исходной позиции, если смещение недействительна.

6. enable.auto.commit

Позже мы рассмотрим несколько различных способов фиксации смещений. Это свойство указывает, фиксирует ли потребитель смещение автоматически, значение по умолчанию — true. Чтобы максимально избежать дублирования данных и потери данных, вы можете установить для него значение false и контролировать, когда смещение фиксируется. Если установлено значение true, вы также можете контролировать частоту коммитов, настроив свойство auto.commit.interval.mls.

7. partition.assignment.strategy

Мы знаем, что разделы назначаются потребителям в группах. PartitionAssignor решает, какие разделы должны быть назначены какому потребителю на основе заданного потребителя и темы. У Kafka есть две стратегии распределения по умолчанию.

- Range

Эта стратегия назначает потребителям несколько смежных разделов темы. Предположим, что потребитель C1 и потребитель C2 подписываются на тему T1 и тему T2 одновременно, и каждая тема имеет 3 раздела. Тогда потребителю C1 можно назначить раздел 0 и раздел 1 этих двух тем, а потребителю C2 назначить раздел 2 этих двух тем. Поскольку каждый раздел имеет нечетное количество разделов, а распределение выполняется независимо внутри раздела, первый потребитель в конечном итоге распределяется на большее количество разделов, чем второй потребитель. Это происходит всякий раз, когда используется стратегия диапазона, а количество разделов не делится на количество потребителей.

- RoundRobin

Эта стратегия назначает потребителям все разделы темы один за другим. Если для назначения разделов потребителю C1 и потребителю C2 используется стратегия RoundRobin, то потребитель C1 будет назначен разделу 0 и 2 темы T1 и разделу 1 темы T2, а потребитель C2 будет назначен разделу l темы T1 и Раздел 0 и Раздел 2 темы T2. В общем, если все потребители подписываются на одну и ту же тему (что является обычным явлением), стратегия RoundRobin назначает всем потребителям одинаковое количество разделов (или не более одного раздела).

Стратегию раздела можно выбрать, установив partition.assignment.strategy . По умолчанию используется org.apache.kafka.clients.consumer.RangeAssignor Этот класс реализует стратегию Range, но его можно изменить на org.apache.kafka.clients.consumer.RoundRobinAssignor. Мы также можем использовать собственные стратегии, и в этом случае значением свойства partition.assignment.strategy является имя пользовательского класса.

8. client.id

Это свойство может быть любой строкой и используется брокером для идентификации сообщений, отправляемых клиентом, обычно используемых в журналах, метриках и квотах.

9. max.poll.records

Это свойство используется для управления количеством записей, которые могут быть возвращены одним вызовом метода call(), что может помочь вам контролировать объем данных, которые необходимо обработать в опросе.

10. получить.буфер.байты и отправить.буфер.байты

Также можно установить размер буфера TCP, используемого сокетом для чтения и записи данных. Если они установлены на -1, используются значения операционной системы по умолчанию. Если производитель или потребитель находится в другом центре обработки данных, чем брокер, эти значения могут быть соответствующим образом увеличены, поскольку сеть в центре обработки данных обычно имеет более высокую задержку и более низкую пропускную способность.