Это 21-й день моего участия в августовском испытании обновлений.Подробности о событии:Испытание августовского обновления
предисловие
Производитель Kafka был объяснен ранее, а соответствующая продукция является потребителем.В программе вы можете подписаться на тему через KafkaConsumer и получать сообщения из темы, на которую подписан. В Kafka потребители имеют концепцию большего количества групп, чем производителей, также известных как группы потребителей, тем самым повышая скорость потребления одной машины. В этой статье будет представлена концепция потребителей и групп потребителей, а затем подробно объяснено развитие клиента.
1. Потребители и группы потребителей
Потребители несут ответственность за подписку на темы в Kafka и извлечение из них сообщений, но, в отличие от производителей, он добавляет концепцию групп потребителей, поскольку потребители Kafka часто выполняют некоторые действия при потреблении сообщений. Действия с высокой задержкой, такие как запись данных в базу данных, чтение данных для расчета и обработки и т. д., намного медленнее, чем производитель.Поэтому увеличение групп потребителей используется для улучшения потребительской способности Кафки.Когда то же самое Когда сообщение темы придет снова, эти сообщения будут потребляется потребителями одной потребительской группы.
1.1 Графическая потребительская модель
Далее, давайте посмотрим на процесс этого потребления.
Дело 1:
Например, в компании есть служба печати. Предположим, что есть 6 разделов печати, соответствующих содержанию шести разделов: цветная печать word, цветная печать excel, цветная печать ppt, черно-белая word, черно-белая excel и черно-белый ppt В настоящее время нужен только один принтер. следующим образом
Сценарий второй:
Однако эффект печати этого принтера слишком медленный, и многие люди целыми днями толпятся в типографии и стоят в очереди, чтобы распечатать документы. В это время компания приобрела новый принтер, и пусть они обрабатывают запросы на печать отдельно, их можно поместить в группу потребителей и использовать данные этих разделов одновременно.
В этот момент они обрабатывают данные, выделенные разделам отдельно, и логически не мешают друг другу.Сообщения в одной теме будут опубликованы только для одного потребителя в группе потребителей..
Сценарий третий:
В это время компания хотела добавить функцию резервного копирования печати, поэтому она приобрела еще один принтер для синхронной печати всех файлов печати. (ps: я не знаю, что это за странная компания, я просто привел пример для сцены~)
следующим образом:
В это время данные каждого раздела будут отправлены группе потребителей B, а именноСообщения в одном и том же разделе могут потребляться потребителями из разных групп потребителей..
Случай четвертый:
В это время, чтобы подготовиться к финансированию, компания продемонстрировала свои мускулы инвесторам, поэтому купила еще пять принтеров.Сцена в это время выглядит следующим образом:
Хотя модель потребителей и групп потребителей может сделать общую емкость потребления горизонтально масштабируемой, в случае фиксированных разделов добавление потребителей не обязательно улучшает емкость потребления.Как показано на рисунке, в настоящее время принтер не может быть назначен. к разделу и не может потреблять данные.
1.2 Режим доставки сообщений
Ранее было сказано о двух режимах очередей сообщений, а именно点对点
а также发布订阅模式
. Kafka поддерживает оба режима одновременно. Следующее понимание имеет решающее значение.
- Одноранговый режим основан на очередях, аналогичных данным в той же группе потребителей.Производитель отправляет данные в раздел, а затем потребитель извлекает разделенные сообщения для потребления.В это время сообщения могут быть только потребляется потребителями из одной потребительской группы один раз.
- Режим публикации-подписки заключается в том, что сообщения раздела в kafka могут потребляться потребителями в разных группах потребителей. Это приложение в режиме широковещательной передачи «один ко многим».
Конечно, группа потребителей — это логическое понятие, которое настраивается через клиентский параметр group.id, а значением по умолчанию является пустая строка. Потребитель — это не логическое понятие, это сущность, которая фактически потребляет данные, которая может быть потоком или машиной.
Ну, я понимаю концепцию потребителей и потребительских групп.Далее мы официально открываем ящик Пандоры потребительского клиента.
Во-вторых, приложение потребителей Kafka
Точно так же потребители также зависят от клиентов Kafka.Нормальная логика потребления состоит из следующих шагов:
- 1. Настройте параметры клиента-потребителя и создайте соответствующие экземпляры-потребители.
- 2. Подпишитесь на тему
- 3. Получайте сообщения и потребляйте
- 4. Отправьте смещение потребления
- 5. Закройте экземпляр потребителя
Мы можем не знать, что здесь означает смещение, не волнуйтесь, мы поговорим об этом позже, давайте посмотрим, как оно должно быть записано для следующего типичного потребителя.
2.1 Демонстрация потребительского клиента
public class Consumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.81.101:9092");
props.put("group.id", "test"); //消费者组
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("xiaolei2"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records){
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
}
}
2.2 Необходимая конфигурация параметров
При создании потребителя у Kafka есть 4 обязательных параметра, на один больше, чем у производителя.
-
bootstrap.servers : этот параметр используется для указания списка адресов брокеров, подключающихся к кластеру Kafka.Это может быть один адрес или адрес кластера Kafka может быть разделен запятыми.
-
key.deserializer и value.deserializer: поскольку ключ и значение сериализуются для создания массива байтов при отправке сообщения, их необходимо десериализовать в исходные данные при использовании данных.
-
group.id : имя группы, в которой находится потребитель, значение по умолчанию — «», если оно установлено пустым, будет выдано исключение
Exception in thread "main" org.apache.kafka.common.errors.InvalidGroupIdException: To use the group management or offset commit APIs, you must provide a valid group.id in the consumer configuration.
2.3 Темы и разделы подписки
После создания потребителя нам нужно подписаться на связанные с ним темы, потребитель может подписаться на одну или несколько тем. Здесь доступны два API
- consumer.subscribe(Collection topics): указывает набор тем, на которые необходимо подписаться;
- consumer.subscribe(Pattern pattern): используйте регулярные выражения для сопоставления коллекций, на которые необходимо подписаться.
Для коллекции, на которую он подписан, мы можем легко понять, что Kafka может сопоставлять связанные темы с помощью регулярных выражений, таких как следующие:
consumer.subscribe(Pattern.compile("topic-.*"));
Однако, если потребитель определен повторно, последний имеет преимущественную силу, а тема xiaolei3 подписывается ниже.
consumer.subscribe(Arrays.asList("xiaolei2"));
consumer.subscribe(Arrays.asList("xiaolei3"));
После подписки на тему поговорим о том, как он определяет разделы.
Подпишитесь непосредственно на определенный раздел.
consumer.assign(Arrays.asList(new TopicPartition("xiaolei2",0)));
Метод assing используется здесь для подписки на определенный раздел. Что делать, если вы не знаете, какие там разделы?
Вы можете использовать метод partitionsFor() класса KafkaConsumer для запроса информации о метаданных указанной темы.
Этим было достигнуто следующее:
consumer.assign(Arrays.asList(new TopicPartition("xiaolei2",0)));
ArrayList<TopicPartition> topicPartitions = new ArrayList<>();
List<PartitionInfo> partitionInfos = consumer.partitionsFor("xiaolei2");
for (PartitionInfo partitionInfo : partitionInfos) {
topicPartitions.add(new TopicPartition(partitionInfo.topic(),partitionInfo.partition()));
}
consumer.assign(topicPartitions);
Наконец, потребление в Kafka основано на вытягивании, и существует два типа потребления сообщений:
- Один из них — push: сервер активно отправляет сообщения потребителям, например, отправляет статьи в общедоступные учетные записи WeChat.
- Одним из них является pull (опрос): потребитель активно инициирует запрос к серверу для его получения.
Kafka нужно только опрашивать API, чтобы периодически запрашивать данные с сервера. После того, как потребитель подпишется на тему, опрос будет обрабатывать все детали, такие как отправка пульса, получение данных, перебалансировка разделов и т. д. А мы занимаемся бизнесом.
3. Смещение потребления
3.1 Что такое смещение
Для раздела Kafka каждое сообщение имеет уникальное смещение, которое используется для отображения соответствующей позиции сообщения в разделе, которое является монотонно возрастающим целым числом. После версии 0.9 смещения Kafka хранятся в разделе Kafka _consumer_offsets. Потребитель отправит смещение потребления в эту тему после потребления сообщения. Когда потребитель перезапустится, он начнет потреблять сообщения из нового смещения потребления.
потому что,Коммит смещения выполняется после того, как все извлеченные сообщения были использованы., что может произойти, если смещение зафиксировано неправильно数据丢失
или重复消费
.
- Если возникает исключение и происходит сбой, когда потребление достигает x+2, после устранения сбоя повторное получение сообщения все еще начинается с x, тогда данные от x до x+2 будут повторно потребляться.
- Если смещение отправлено заранее, когда потребление достигает х + 2, сообщение не было потреблено в это время, а затем происходит сбой.После перезапуска потребление начинается с нового смещения х + 5, затем от х + 2 до x+ 5 Промежуточные сообщения теряются.
Поэтому особенно важно, когда подавать смещение.В Kafka представление смещения делится на ручное представление и автоматическое представление.Объясняются следующие два типа представлений.
3.2 Автоматическая фиксация смещений
Метод представления смещения потребления по умолчанию в Kafka:автоматическая фиксация. Это настраивается в параметре клиента-потребителя enable.auto.commit, и значение по умолчанию — true. Это максимальное смещение сообщения, которое периодически опрашивается в _comsumer_offsets. Периодическое время настраивается в auto.commit.interval.ms, по умолчанию 5 с.
Хотя очень удобно автоматически отправлять смещение потребления, это более лаконично, но проблема в том, что данные проблематичны, что является потерей данных и повторным потреблением, мы говорим.Поэтому Кафка обеспечивает ручное смещение подачи.Количество, более гибкая обработка потребительского смещения.
3.3 вручную совершать коммиты
Предпосылкой включения смещения фиксации вручную является отключение конфигурации автоматической фиксации и изменение конфигурации enable.auto.commit на false.
В соответствии с потребностями пользователя это значение смещения можно разделить на две категории:
- Обычный, максимальное смещение, до которого вручную фиксируется извлечение.
- Вручную зафиксируйте смещение с фиксированным значением.
Существует два способа ручной фиксации смещений: commitSync (синхронная фиксация) и commitAsync (асинхронная фиксация). Сходство между ними состоит в том, что оба будутПредставлено наибольшее смещение пакета данных в этом опросе.; Разница в том, что commitSync блокирует текущий поток до тех пор, пока отправка не будет успешной, и автоматически не сможет повторить попытку (вызванные неконтролируемыми факторами, также будут происходить сбои отправки); в то время как commitAsync не имеет механизма повторной попытки при ошибке, поэтому отправка может завершиться неудачно. .
3.3.1 Синхронная фиксация смещения
Поскольку синхронная отправка смещений имеет механизм повторной попытки при сбое, она более надежна.
public class CustomComsumer {
public static void main(String[] args) {
Properties props = new Properties();
//Kafka集群
props.put("bootstrap.servers", "hadoop102:9092");
//消费者组,只要group.id相同,就属于同一个消费者组
props.put("group.id", "test");
props.put("enable.auto.commit", "false");//关闭自动提交offset
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("first"));//消费者订阅主题
while (true) {
//消费者拉取数据
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
//同步提交,当前线程会阻塞直到offset提交成功
consumer.commitSync();
}
}
}
3.3.2 Асинхронная фиксация смещения
Хотя синхронная фиксация смещения более надежна, она будет блокировать текущий поток до тех пор, пока фиксация не будет успешной. Поэтому пропускная способность сильно пострадает. Поэтому в большем количестве случаев будет использоваться метод асинхронной подачи зачетов.
Ниже приведен пример асинхронной отправки смещений:
public class CustomConsumer {
public static void main(String[] args) {
Properties props = new Properties();
//Kafka集群
props.put("bootstrap.servers", "hadoop102:9092");
//消费者组,只要group.id相同,就属于同一个消费者组
props.put("group.id", "test");
//关闭自动提交offset
props.put("enable.auto.commit", "false");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("first"));//消费者订阅主题
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);//消费者拉取数据
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
//异步提交
consumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
if (exception != null) {
System.err.println("Commit failed for" + offsets);
}
}
});
}
}
}
Асинхронная отправка может повысить производительность программы, потому что в этот момент вы можете просто запросить данные, не дожидаясь ответа.
Так же бывают сбои при подаче асинхронно Допустим первый раз подается смещение 100, но подача не проходит, а второй раз подается смещение 200. Как с этим быть?
Если вы повторите попытку и снова отправите смещение 100, на этот раз отправка будет успешной, смещение 200 будет перезаписано, и на этот раз оно станет 100. Потом будет повторное потребление, и дальше потреблять со 100.
Следовательно, по этой причине можно использовать комбинацию синхронного + асинхронного.После 100 представлений вы должны дождаться успешного выполнения запроса, прежде чем отправлять 200 смещений.
3.3.3 Синхронная плюс асинхронная фиксация
Асинхронная отправка используется при обычном опросе для обеспечения пропускной способности, но до закрытия конечного потребителя или после возникновения исключения синхронная отправка используется для обеспечения успешной окончательной отправки. Это проверка в конце.
try {
while (true) {
// 拉取消息逻辑处理
// 异步提交
consumer.commitAsync();
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
// 即将要关闭消费者,同步提交保证提交成功
consumer.commitSync();
} finally {
consumer.close();
}
}
3.4 Указание расхода рабочего объема
Из-за существования смещения потребления мы можем найти местоположение смещения хранения и начать потребление, когда потребитель выключается, перезапускается и перебалансируется, но смещение потребления не всегда присутствует в начале, например, в следующих ситуациях:
- 1. При создании новой группы потребителей
- 2. Потребитель в группе потребителей подписывается на новую тему;
- 3. Информация о смещении темы _comsumer_offsets устарела и удалена
В этих случаях Kafka не может найти смещение потребления и решает, с чего начать потребление, в соответствии с настройкой параметра клиента auto.offset.reset.latest.
- самое раннее: когда есть отправленное смещение в каждом разделе, начать потребление с отправленного смещения, когда нет отправленного смещения, начать потребление с самого начала;
- последний: когда в каждом разделе есть отправленное смещение, начать потребление с отправленного смещения, когда нет отправленного смещения, потреблять вновь сгенерированные данные в этом разделе (значение по умолчанию);
- нет: когда в каждом разделе есть зафиксированное смещение, потребление начинается после смещения; пока в одном разделе нет зафиксированного смещения, NoOffsetForPartitionException генерируется напрямую;
Параметр auto.offset.reset Kafka может позволить нам начать потребление только с начала или с конца в грубой манере и не может указать точное смещение для начала извлечения сообщений, а метод seek() в KafkaConsumer просто предоставляет эту функцию. , что позволяет нам раннее потребление и ретроспективное потребление обеспечить большую гибкость для потребления сообщений.Метод seek() также может сохранять смещение сообщения на внешнем носителе через storeOffsetToDB, а также может взаимодействовать с прослушивателем перебалансировки, чтобы обеспечить более точную возможность потребления .
3.4.1 seek определяет расход рабочего объема.
Метод seek определяется следующим образом:
public void seek(TopicPartition partition, long offset)
- раздел означает раздел
- смещение указывает, где начать потребление в разделе
afkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("xiaolei2"));
consumer.poll(Duration.ofMillis(10000));
Set<TopicPartition> assignment = consumer.assignment();
for (TopicPartition tp : assignment) {
consumer.seek(tp,100);
}
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records){
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
Метод seek() может только сбросить позицию потребления раздела, выделенного потребителем, а выделение раздела осуществляется во время вызова метода poll(), то есть его нужно выполнить один раз перед seek( ) метод poll() ожидает, пока раздел не будет выделен, прежде чем сбрасывать позицию потребления.
Поэтому установите время в методе poll() для ожидания завершения раздела, а затем получите информацию о разделе с помощью метода присваивания() для потребления данных.
Если в методе poll() установлено значение 0, то раздел не может быть получен. Если это время будет слишком большим, это вызовет ненужное ожидание.Давайте посмотрим на оптимизированное решение.
3.4.2 Поиск оптимизации расхода заданного рабочего объема
consumer.subscribe(Arrays.asList("xiaolei2"));
Set<TopicPartition> assignment = new HashSet<>();
while (assignment.size()==0){
consumer.poll(Duration.ofMillis(100));
assignment=consumer.assignment();
}
for (TopicPartition tp : assignment) {
consumer.seek(tp,100);
}
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records){
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
3.4.3 поиск использует начало или конец раздела.
Если потребители в группе потребителей могут найти смещение потребления при запуске, параметр auto.offset.reset не будет работать, если смещение не выходит за пределы допустимого диапазона. В настоящее время, если вы хотите указать потребление с начала или с конца, вам также понадобится метод seek() для достижения этого.
Если вы потребляете в соответствии с указанным смещением, вам нужно сначала получить смещение в начале или конце каждого раздела. Можно использовать методы beginOffsets() и endOffsets().
Set<TopicPartition> assignment = new HashSet<>();
// 在poll()方法内部执行分区分配逻辑,该循环确保分区已被分配。
// 当分区消息为0时进入此循环,如果不为0,则说明已经成功分配到了分区。
while (assignment.size() == 0) {
consumer.poll(100);
// assignment()方法是用来获取消费者所分配到的分区消息的
// assignment的值为:topic-demo-3, topic-demo-0, topic-demo-2, topic-demo-1
assignment = consumer.assignment();
}
// 指定分区从头消费
Map<TopicPartition, Long> beginOffsets = consumer.beginningOffsets(assignment);
for (TopicPartition tp : assignment) {
Long offset = beginOffsets.get(tp);
System.out.println("分区 " + tp + " 从 " + offset + " 开始消费");
consumer.seek(tp, offset);
}
// 指定分区从末尾消费
Map<TopicPartition, Long> endOffsets = consumer.endOffsets(assignment);
for (TopicPartition tp : assignment) {
Long offset = endOffsets.get(tp);
System.out.println("分区 " + tp + " 从 " + offset + " 开始消费");
consumer.seek(tp, offset);
}
// 再次执行poll()方法,消费拉取到的数据。
// ...(省略)
На самом деле методы seekToBeginning() и seekToEnd() напрямую предоставляются в KafkaConsumer для реализации вышеуказанных функций. Конкретные определения следующие:
public void seekToBeginning(Collection<TopicPartition> partitions)
public void seekToEnd(Collection<TopicPartition> partitions)
Альтернативный код выглядит следующим образом:
Map<TopicPartition, Long> beginOffsets = consumer.beginningOffsets(assignment);
for (TopicPartition tp : assignment) {
Long offset = beginOffsets.get(tp);
System.out.println("分区 " + tp + " 从 " + offset + " 开始消费");
consumer.seek(tp, offset);
}
3.4.5 Потребление на основе метки времени
Например, если мы хотим использовать сообщение в этот момент позавчера, мы не можем напрямую отследить его до этой позиции.В это время мы можем использовать метод offsetsForTimes KafkaConsumer.
public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch)
Параметр timestampsToSearch метода offsetsForTimes() представляет собой тип Map, где ключ — запрашиваемый раздел, а значение — запрашиваемая временная метка.Этот метод вернет смещение и временную метку, соответствующие первому сообщению, временная метка которого больше или равно времени запроса.
Далее в качестве примера возьмем потребление сообщений за сутки до текущего времени, код такой:
Set<TopicPartition> assignment = new HashSet<>();
while (assignment.size() == 0) {
consumer.poll(100);
assignment = consumer.assignment();
}
Map<TopicPartition, Long> timestampToSearch = new HashMap<>();
for (TopicPartition tp : assignment) {
// 设置查询分区时间戳的条件:获取当前时间前一天之后的消息
timestampToSearch.put(tp, System.currentTimeMillis() - 24 * 3600 * 1000);
}
Map<TopicPartition, OffsetAndTimestamp> offsets = consumer.offsetsForTimes(timestampToSearch);
for(TopicPartition tp: assignment){
OffsetAndTimestamp offsetAndTimestamp = offsets.get(tp);
// 如果offsetAndTimestamp不为null,则证明当前分区有符合时间戳条件的消息
if (offsetAndTimestamp != null) {
consumer.seek(tp, offsetAndTimestamp.offset());
}
}
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
// 消费记录
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.offset() + ":" + record.value() + ":" + record.partition() + ":" + record.timestamp());
}
}
4. Контролируйте или ограничивайте потребление
KafkaConsumer предоставляет метод управления скоростью потребления.В какой-то момент мы можем закрыть или приостановить потребление определенного раздела и сначала использовать другие разделы, а затем возобновить потребление этих разделов при выполнении определенных условий.Эти два метода pause() (приостановка потребления) и возобновление() (возобновление потребления).
public void pause(Collection<TopicPartition> partitions) {
this.acquireAndEnsureOpen();
try {
this.log.debug("Pausing partitions {}", partitions);
Iterator var2 = partitions.iterator();
while(var2.hasNext()) {
TopicPartition partition = (TopicPartition)var2.next();
this.subscriptions.pause(partition);
}
} finally {
this.release();
}
}
public void resume(Collection<TopicPartition> partitions) {
this.acquireAndEnsureOpen();
try {
this.log.debug("Resuming partitions {}", partitions);
Iterator var2 = partitions.iterator();
while(var2.hasNext()) {
TopicPartition partition = (TopicPartition)var2.next();
this.subscriptions.resume(partition);
}
} finally {
this.release();
}
}
В дополнение к приостановке и возобновлению Kafka также предоставляет метод обеда paused() для возврата приостановленной коллекции разделов.
public Set<TopicPartition> paused()
5. Ребаланс
Перебалансировка относится к поведению при передаче права собственности на раздел от одного потребителя к другому. Например, когда добавляется новый потребитель, повторная балансировка приведет к повторному разделению раздела и потребителя, обеспечивая высокую доступность и масштабируемость для потребителя. группа сексуальная безопасность.
Пока происходит ребалансировка, потребители в группе потребителей не могут читать сообщения, то есть потребители становятся недоступными на короткий период времени, в течение которого происходит ребалансировка. Кроме того, ребалансировка также может вызвать дублирование сообщений, так как при назначении раздела другому потребителю состояние потребителя на тот момент будет утеряно, в это время нет времени на синхронизацию смещения потребления, и новый потребитель запустится с исходного.Смещение начинает потреблять, поэтому старайтесь избегать возникновения ребалансировки.
Мы можем использовать перегруженный метод subscribe для передачи пользовательского прослушивателя перебалансировки разделов.
/*订阅指定集合内的所有主题*/
subscribe(Collection<String> topics, ConsumerRebalanceListener listener)
/*使用正则匹配需要订阅的主题*/
subscribe(Pattern pattern, ConsumerRebalanceListener listener)
код показывает, как показано ниже:
Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
consumer.subscribe(Collections.singletonList(topic), new ConsumerRebalanceListener() {
/*该方法会在消费者停止读取消息之后,再均衡开始之前就调用*/
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
System.out.println("再均衡即将触发");
// 提交已经处理的偏移量
consumer.commitSync(currentOffsets);
// 清除局部变量
currentOffsets.clear();
}
/*该方法会在重新分配分区之后,消费者开始读取消息之前被调用*/
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
}
});
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record);
TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition());
OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(record.offset() + 1, "no metaData");
//TopicPartition 重写过 hashCode 和 equals 方法,所以能够保证同一主题和分区的实例不会被重复添加
currentOffsets.put(topicPartition, offsetAndMetadata);
}
consumer.commitAsync(currentOffsets, null);
}
} finally {
consumer.close();
}
В коде смещение сообщения временно сохраняется в локальной переменной currentOffsets, а смещение потребления может быть отправлено асинхронно во время обычного потребления, но до того, как произойдет действие перебалансировки, функция обратного вызова onPartitionsRevoked отправляется синхронно, чтобы избежать повторного потребления перебалансировки.
6. Перехватчик
Подобно механизму перехватчика клиента-производителя, логика перехватчика также определяется в клиенте-потребителе kafka путем реализацииConsumerInterceptor
реализовать пользовательскую логику перехватчика,ConsumerInterceptor
Существует три основных метода:
- public ConsumerRecords
onConsume(ConsumerRecords records) Потребитель вызовет этот метод до того, как метод опроса вернется, чтобы настроить сообщение, например изменить содержимое сообщения, отфильтровать сообщение в соответствии с определенными правилами и т. д. - public void onCommit(Map
offsets) Потребитель вызовет этот метод после отправки смещения потребления, и в этом методе можно отследить соответствующую информацию отправленного смещения. - public void close() : закрыть
public class ConsumerInterceptorPrefix implements ConsumerInterceptor<String,String> {
@Override
public ConsumerRecords<String,String> onConsume(ConsumerRecords<String,String> consumerRecords) {
Map<TopicPartition, List<ConsumerRecord<String, String>>> newRecords = new HashMap<>();
for (TopicPartition partition : consumerRecords.partitions()) {
List<ConsumerRecord<String, String>> recs = consumerRecords.records(partition);
List<ConsumerRecord<String, String>> newRecs = new ArrayList<>();
for(ConsumerRecord<String,String> rec:recs){
String newValue = "xiaolei-"+rec.value();
ConsumerRecord<String,String> newRec = new ConsumerRecord<>(rec.topic(),
rec.partition(),rec.offset(),rec.key(),newValue);
newRecs.add(newRec);
}
newRecords.put(partition,newRecs);
}
return new ConsumerRecords<>(newRecords);
}
@Override
public void close() {
}
@Override
public void onCommit(Map<TopicPartition, OffsetAndMetadata> map) {
map.forEach((tp,offsetAndMetadata) -> {
System.out.println(tp+" : "+offsetAndMetadata.offset());
});
}
@Override
public void configure(Map<String, ?> map) {
}
}
Добавьте перехватчик в класс конфигурации
props.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, ConsumerInterceptorPrefix.class.getName());
7. Важные потребительские параметры
7.1 fetch.min.bytes
Это свойство указывает минимальное количество байтов, которое потребитель может получить с сервера. Когда брокер получает запрос данных от потребителя, если объем доступных данных меньше размера, указанного в fetch.min.bytes, он будет ждать, пока не будет достаточно доступных данных, прежде чем вернуть их потребителю. Это снижает нагрузку на потребителей и брокеров, поскольку им не нужно обрабатывать сообщения туда и обратно, когда тема не очень активна. Если доступных данных не так много, но загрузка ЦП потребителя высока, значение этого свойства можно установить больше, чем значение по умолчанию. Если количество потребителей велико, установка для этого свойства большего значения может уменьшить рабочую нагрузку брокера.
7.2 fetch.max.wait.ms
Это свойство указывает время ожидания возврата сообщения брокером, по умолчанию 500 мс. Если в Kafka поступает недостаточно данных, требование потребителя о получении минимального объема данных не может быть выполнено, что приводит к задержке в 500 мс. Если вы хотите уменьшить возможную задержку (чтобы выполнить SLA), вы можете установить значение этого параметра меньше. Если для fetch.max.wait.ms установлено значение 100 мс, а для fetch.min.bytes установлено значение 1 МБ, Kafka либо вернет 1 МБ данных после получения запроса потребителя, либо вернет доступные данные через 100 мс, если условие встретил, он вернется немедленно.
7.3 max.partition.fetch.bytes
Это свойство указывает максимальное количество байтов, которое сервер возвращает потребителям из каждого раздела. Его значение по умолчанию — 1 МБ. Метод KafkaConsumer.poll() возвращает записи из каждого раздела до байтов, указанных в max.partition.fetch.bytes. Если тема имеет 20 разделов и 5 потребителей, то каждому потребителю требуется не менее 4 МБ свободной памяти для получения записей. При выделении памяти для потребителей можно выделять им больше, потому что если один из потребителей в группе выйдет из строя, то оставшимся потребителям потребуется иметь дело с большим количеством разделов.
Значение max.partition.fetch.bytes должно быть больше, чем максимальное количество байтов, которое брокер может получить (max.message.size), иначе потребитель не сможет прочитать эти сообщения, что приведет к зависанию потребителя и повторной попытке. .
При установке этого значения также необходимо учитывать, как долго потребитель обрабатывает данные. Потребителям необходимо часто вызывать метод poll(), чтобы избежать истечения срока действия сеанса и перебалансировки разделов.Если одним вызовом poll() возвращается слишком много данных, потребителям требуется больше времени для обработки, и они могут быть не в состоянии перейти к следующему. вовремя Опрос, чтобы избежать истечения сеанса. В этом случае вы можете уменьшить max.partition.fetch.bytes или увеличить время истечения сеанса.
7.4 session.timeout.ms
Значение этого свойства указывает, как долго потребитель может отключиться от сервера, прежде чем он будет считаться мертвым, по умолчанию — 3 с. Если потребитель не отправляет пульс координатору группы в течение времени, указанного в session.timeout.ms, он считается мертвым, и координатор инициирует перебалансировку, чтобы назначить свои разделы другим потребителям в группе. heartbeat.interval.ms указывает, как часто метод poll() отправляет контрольные сигналы координатору, а session.timeout.ms указывает, как долго потребители могут ждать без отправки контрольных сигналов. Поэтому обычно необходимо изменять эти два свойства одновременно.Heartbeat.interval.ms должен быть меньше, чем session.timeout.ms, который обычно составляет одну треть от session.timeout.ms.
Меньше session.timeout.ms: поврежденные узлы могут быть обнаружены и восстановлены быстрее, но длительный опрос или сборка мусора могут привести к непреднамеренной перебалансировке.
Увеличьте session.timeout.ms: это может уменьшить случайную перебалансировку, но для обнаружения сбоев узла требуется больше времени.
7.5 auto.offset.reset
Это свойство указывает, что должен делать потребитель при чтении раздела без смещения или если смещение недействительно (запись, содержащая смещение, устарела и удалена, поскольку срок действия потребителя истек в течение длительного времени). Его значение по умолчанию самое последнее.Если смещение недопустимо, потребитель начнет чтение данных с последней записи (записи, сгенерированной после запуска потребителя). Другое значение является самым ранним, в случае недопустимого смещения потребитель будет читать записи раздела с начальной позиции.
7.6 enable.auto.commit
Это свойство указывает, фиксирует ли потребитель смещение автоматически, значение по умолчанию — true. Чтобы максимально избежать дублирования данных и потери данных, вы можете установить для него значение false и контролировать, когда смещение фиксируется. Если установлено значение true, вы также можете контролировать частоту коммитов, настроив свойство auto.commit.interval.ms.
7.7 partition.assignment.strategy
Разделы назначаются потребителям в группе. PartitionAssignor решает, какие разделы должны быть назначены какому потребителю на основе заданного потребителя и темы. У Kafka есть две стратегии распределения по умолчанию.
Диапазон (по умолчанию): эта стратегия назначит потребителям несколько смежных разделов темы. Предположим, что потребители C1 и C2 подписываются на тему T1 и тему T2 одновременно, и каждая тема имеет 3 раздела. Затем потребитель C1 может быть назначен разделу 0 и разделу 1 этих двух тем, четырем разделам, а потребитель C2 назначен разделу 2 этих двух тем, двум разделам. Поскольку каждая тема имеет нечетное количество разделов, а назначения внутри темы выполняются независимо, первый потребитель в конечном итоге назначается большему количеству разделов, чем второй потребитель. Это происходит всякий раз, когда используется стратегия диапазона, а количество разделов не делится на количество потребителей.
org.apache.kafka.clients.consumer.RangeAssignor
RoundRobin: эта стратегия назначает потребителям все разделы темы один за другим. Если для назначения разделов потребителю C1 и потребителю C2 используется стратегия RoundRobin, то потребитель C1 будет назначен разделу 0 и разделу 2 темы T1 и разделу 1 темы T2; потребитель C2 будет назначен разделу 1 темы T1 и Раздел 0 и раздел 2 принципала T2.В общем случае, если все потребители подписываются на одну и ту же тему, стратегия RoundRobin назначает всем потребителям одинаковое количество разделов (разница не более чем в одном разделе).
org.apache.kafka.clients.consumer.RoundRobinAssignor
7.8 client.id
Это свойство может быть любой строкой и используется посредником для пометки сообщений, отправленных от клиента, обычно используемых в журналах, метриках и квотах.
7.9 max.poll.records
Это свойство используется для управления количеством записей, которые могут быть возвращены одним вызовом метода poll(), и может управлять объемом данных, которые необходимо обработать при опросе.
7.10 Receive.buffer.bytes и send.buffer.bytes
Также можно установить размер буфера TCP, используемого сокетом при чтении и записи данных. Если они установлены на -1, используются значения операционной системы по умолчанию. Если производитель или потребитель находится в другом центре обработки данных, чем брокер, эти значения могут быть соответствующим образом увеличены, поскольку сеть в центре обработки данных обычно имеет относительно высокую задержку и относительно низкую пропускную способность.
Использованная литература:
Эта статья занимает 7700 слов.После дня изучения и резюме, пожалуйста, поставьте лайк и поддержите.
- «Глубокое понимание основных принципов дизайна и практики Кафки»