Kafka Series (4) — Подробное объяснение потребителей Kafka

Kafka

1. Потребители и группы потребителей

В Kafka потребители обычно являются частью группы потребителей, и когда несколько групп потребителей читают одну и ту же тему вместе, они не влияют друг на друга. Причина, по которой Kafka вводит концепцию групп потребителей, заключается в том, что потребители Kafka часто выполняют некоторые операции с высокой задержкой, такие как запись данных в базу данных или HDFS, или выполнение трудоемких вычислений. скорость генерации данных. В настоящее время можно добавить больше потребителей, чтобы разделить нагрузку и обрабатывать сообщения из некоторых разделов соответственно.Это основной метод Kafka для достижения горизонтального масштабирования.

https://github.com/heibaiying

Следует отметить, что один и тот же раздел может быть прочитан только одним потребителем в одной группе потребителей, и невозможно, чтобы один и тот же раздел читался несколькими потребителями в одной группе потребителей, как показано на рисунке:

https://github.com/heibaiying

Видно, что даже если потребитель Consumer5 простаивает, он не будет читать данные ни одного раздела, что также напоминает нам о том, что количество потребителей должно быть установлено разумно, чтобы избежать простоя и дополнительных накладных расходов.

2. Перебалансировка разделов

Поскольку потребители в группе совместно читают разделы темы, когда потребитель завершает работу или дает сбой, он покидает группу, а разделы, первоначально прочитанные им, будут прочитаны другими потребителями в группе Pick. В то же время при изменении темы, например добавлении нового раздела, также будет происходить перераспределение разделов и потребителей, а право собственности на разделы переходит от одного потребителя к другому, такое поведение называется ребалансировкой. Благодаря ребалансировке группы потребителей могут гарантировать высокую доступность и масштабируемость.

Потребители сохраняют свою принадлежность к группе и право собственности на разделы, отправляя тактовые импульсы брокеру, в котором находится координатор группы. Пока потребитель отправляет тактовые импульсы с регулярными интервалами, он считается активным, что указывает на то, что он все еще читает сообщения в разделе. Потребители отправляют тактовые импульсы при опросе сообщений или совершении смещений. Если потребитель перестанет отправлять тактовые импульсы достаточно долго, срок действия сеанса истекает, и координатор группы считает его мертвым, вызывая перебалансировку.

3. Создайте потребителей Kafka

При создании потребителя необходимы следующие три параметра:

  • bootstrap.servers: укажите список адресов брокеров. Список не обязательно должен содержать все адреса брокеров. Производитель будет искать информацию о брокерах у данного брокера. Тем не менее, для обеспечения отказоустойчивости рекомендуется предоставить как минимум информацию о двух брокерах;
  • key.deserializer: десериализатор для указанного ключа;
  • value.deserializer: десериализатор для указанного значения.

Кроме того, вам также необходимо указать тему, на которую вы хотите подписаться.Вы можете использовать следующие два API:

  • consumer.subscribe(Collection<String> topics): указывает набор тем, на которые необходимо подписаться;
  • consumer.subscribe(Pattern pattern): используйте регулярные выражения для сопоставления коллекций, на которые необходимо подписаться.

Наконец, просто передайте API опроса (poll) для периодического запроса данных с сервера. Как только потребитель подписывается на тему, опрос обрабатывает все детали, включая групповую координацию, перебалансировку разделов, отправку тактов и выборку данных, что позволяет разработчикам сосредоточиться только на данных, возвращаемых из разделов, а затем выполнять бизнес-обработку. Пример выглядит следующим образом:

String topic = "Hello-Kafka";
String group = "group1";
Properties props = new Properties();
props.put("bootstrap.servers", "hadoop001:9092");
/*指定分组 ID*/
props.put("group.id", group);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

/*订阅主题 (s)*/
consumer.subscribe(Collections.singletonList(topic));

try {
    while (true) {
        /*轮询获取数据*/
        ConsumerRecords<String, String> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS));
        for (ConsumerRecord<String, String> record : records) {
            System.out.printf("topic = %s,partition = %d, key = %s, value = %s, offset = %d,\n",
           record.topic(), record.partition(), record.key(), record.value(), record.offset());
        }
    }
} finally {
    consumer.close();
}

Весь пример кода для этой статьи можно скачать с Github:kafka-basis

3. Автоматически фиксируйте смещения

3.1 Важность смещений

Каждое сообщение в Kafka имеет свойство смещения, которое записывает его положение в разделе, а смещение представляет собой монотонно возрастающее целое число. Потребители проходят_consumer_offsetСпециальная тема для отправки сообщений, содержащих смещения для каждого раздела. Если потребитель работал, смещение не имеет значения. Какая польза. Однако, если потребитель уйдет или присоединяется новый раздел, будет запущена перебалансировка. После завершения ребалансировки каждый потребитель может быть назначен на новый раздел вместо ранее обработанного. Чтобы иметь возможность продолжить предыдущую работу, потребителю необходимо прочитать смещение последней фиксации каждого раздела, а затем продолжить обработку с точки, указанной смещением. По этой причине, если смещение не может быть отправлено правильно, это может привести к потере данных или повторному использованию, например, в следующих ситуациях:

  • Если отправленное смещение меньше, чем смещение последнего сообщения, обработанного клиентом, то сообщение между двумя смещениями будет потребляться повторно;
  • Если зафиксированное смещение больше, чем смещение последнего сообщения, обработанного клиентом, то сообщения между двумя смещениями будут потеряны.

3.2 Автоматическая фиксация смещений

Kafka поддерживает как автоматическую фиксацию, так и фиксацию смещения вручную. Вот относительно простая автоматическая отправка:

Только у потребителяenable.auto.commitсвойства настроены какtrueНастройка автоматической фиксации завершена. В это время через равные промежутки времени потребители будутpoll()Максимальное смещение, полученное методом отправки, интервал отправки определяетсяauto.commit.interval.msсвойство для настройки, значение по умолчанию — 5 с.

В использовании автоматической фиксации есть скрытые опасности. Предположим, мы используем интервал фиксации по умолчанию, равный 5 с, и перебалансировка происходит через 3 с после последней фиксации.После перебалансировки потребители начинают читать сообщения со смещения позиции последней фиксации. В это время смещение уже отстает на 3 с, поэтому сообщения, поступающие в течение этих 3 с, будут обрабатываться повторно. Можно чаще фиксировать смещения, изменив интервал фиксации, чтобы уменьшить временное окно, в течение которого могут появляться повторяющиеся сообщения, но этого нельзя полностью избежать. По этой причине Kafka также предоставляет API для ручной отправки смещений, что позволяет пользователям более гибко отправлять смещения.

В-четвертых, вручную отправьте смещение

Пользователи могутenable.auto.commitустановить какfalse, затем зафиксируйте смещение вручную. Представление смещений вручную на основе требований пользователя можно разделить на две категории:

  • Вручную отправить текущее смещение: то есть вручную отправить максимальное смещение текущего опроса;
  • Вручную отправьте фиксированное смещение: то есть отправьте фиксированное смещение в соответствии с бизнес-требованиями.

Согласно Kafka API, ручная отправка смещения может быть разделена на синхронную отправку и асинхронную отправку.

4.1 Синхронная фиксация

позвонивconsumer.commitSync()Для выполнения синхронной отправки, когда никакие параметры не передаются, отправляется максимальное смещение текущего опроса.

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS));
    for (ConsumerRecord<String, String> record : records) {
        System.out.println(record);
    }
    /*同步提交*/
    consumer.commitSync();
}

Если отправка не удалась, синхронная отправка также будет повторена, что может гарантировать, что данные могут быть отправлены успешно в максимальной степени, но это также снизит пропускную способность программы. По этой причине Kafka также предоставляет API для асинхронных коммитов.

4.2 Асинхронная фиксация

Асинхронная отправка может повысить пропускную способность программы, поскольку в это время вы можете запрашивать данные, не дожидаясь ответа брокера. код показывает, как показано ниже:

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS));
    for (ConsumerRecord<String, String> record : records) {
        System.out.println(record);
    }
    /*异步提交并定义回调*/
    consumer.commitAsync(new OffsetCommitCallback() {
        @Override
        public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
          if (exception != null) {
             System.out.println("错误处理");
             offsets.forEach((x, y) -> System.out.printf("topic = %s,partition = %d, offset = %s \n",
                                                            x.topic(), x.partition(), y.offset()));
            }
        }
    });
}

Проблема с асинхронными фиксациями заключается в том, что в случае сбоя фиксации не происходит автоматического повтора, и фактически нет автоматического повтора. Предположим, что программа одновременно отправляет смещения 200 и 300. В этот раз смещение 200 завершается ошибкой, но успешно выполняется смещение 300. Если вы повторите попытку, смещение 200 перезапишет смещение 300. возможности. Синхронная отправка не имеет этой проблемы, потому что в случае синхронной отправки запрос на отправку 300 должен ждать, пока сервер вернет успешный ответ на запрос на отправку 200, прежде чем он будет отправлен. По этой причине в некоторых случаях необходимо совмещать как синхронные, так и асинхронные методы фиксации.

Примечание. Хотя программа не может автоматически повторить попытку в случае сбоя, мы можем повторить попытку вручную.Вы можете сохранить смещение каждого раздела, отправленного вами, с помощью смещений Map, а затем, когда произойдет сбой. неудачное смещение меньше, чем последнее отправленное смещение той же темы и того же раздела, который вы поддерживаете. Если оно меньше, это означает, что вы отправили запрос на большее смещение, и вам не нужно повторять попытку в это время. В противном случае , можно выполнить повторную попытку вручную.

4.3 Синхронная плюс асинхронная фиксация

В следующем случае асинхронная отправка используется при обычном опросе для обеспечения пропускной способности, но поскольку потребитель должен быть закрыт в конце, в это время требуется синхронная отправка, чтобы обеспечить максимальный успех отправки.

try {
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS));
        for (ConsumerRecord<String, String> record : records) {
            System.out.println(record);
        }
        // 异步提交
        consumer.commitAsync();
    }
} catch (Exception e) {
    e.printStackTrace();
} finally {
    try {
        // 因为即将要关闭消费者,所以要用同步提交保证提交成功
        consumer.commitSync();
    } finally {
        consumer.close();
    }
}

4.4 Зафиксировать конкретное смещение

Фактически, в приведенных выше API синхронной и асинхронной отправки мы не передавали параметры методу фиксации. В настоящее время отправка по умолчанию — это максимальное смещение текущего опроса. Если вам нужно отправить определенные смещения, вы можете вызвать их перегруженный метод.

/*同步提交特定偏移量*/
commitSync(Map<TopicPartition, OffsetAndMetadata> offsets) 
/*异步提交特定偏移量*/    
commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback)

Обратите внимание, что поскольку вы можете подписаться на несколько тем,offsetsдолжен содержать смещение каждого раздела всех тем, пример кода выглядит следующим образом:

try {
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS));
        for (ConsumerRecord<String, String> record : records) {
            System.out.println(record);
            /*记录每个主题的每个分区的偏移量*/
            TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition());
            OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(record.offset()+1, "no metaData");
            /*TopicPartition 重写过 hashCode 和 equals 方法,所以能够保证同一主题和分区的实例不会被重复添加*/
            offsets.put(topicPartition, offsetAndMetadata);
        }
        /*提交特定偏移量*/
        consumer.commitAsync(offsets, null);
    }
} finally {
    consumer.close();
}

5. Отслеживайте перебалансировку разделов

Поскольку повторная балансировка разделов приведет к перераспределению разделов и потребителей, иногда вам может потребоваться выполнить некоторые действия перед повторной балансировкой: зафиксировать смещения, которые были обработаны, но еще не зафиксированы, закрыть соединения с базой данных и т. д. В этот момент при подписке на тему звонитеsubscribeПерегруженный метод передается в настраиваемый прослушиватель перебалансировки разделов.

 /*订阅指定集合内的所有主题*/
subscribe(Collection<String> topics, ConsumerRebalanceListener listener)
 /*使用正则匹配需要订阅的主题*/    
subscribe(Pattern pattern, ConsumerRebalanceListener listener)    

Пример кода выглядит следующим образом:

Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();

consumer.subscribe(Collections.singletonList(topic), new ConsumerRebalanceListener() {
    /*该方法会在消费者停止读取消息之后,再均衡开始之前就调用*/
    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        System.out.println("再均衡即将触发");
        // 提交已经处理的偏移量
        consumer.commitSync(offsets);
    }

    /*该方法会在重新分配分区之后,消费者开始读取消息之前被调用*/
    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {

    }
});

try {
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS));
        for (ConsumerRecord<String, String> record : records) {
            System.out.println(record);
            TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition());
            OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(record.offset() + 1, "no metaData");
            /*TopicPartition 重写过 hashCode 和 equals 方法,所以能够保证同一主题和分区的实例不会被重复添加*/
            offsets.put(topicPartition, offsetAndMetadata);
        }
        consumer.commitAsync(offsets, null);
    }
} finally {
    consumer.close();
}

6. Выход из опроса

Кафка обеспечиваетconsumer.wakeup()метод используется для выхода из опроса, он выдаетWakeupExceptionисключение, чтобы выйти из цикла. Важно отметить, что вызовы, которые лучше всего отображаются при выходе из потокаconsumer.close()В это время потребители будут представлены все, что еще не представлено и отправляют сообщения в Группу координатора, сказали, что они должны покинуть группу, то она будет запускаться перебалансировать, не дожидаясь время ожидания сеанса.

Следующий пример кода предназначен для мониторинга вывода консоли при вводеexitКогда опрос заканчивается, потребитель закрывается и программа завершает работу:

/*调用 wakeup 优雅的退出*/
final Thread mainThread = Thread.currentThread();
new Thread(() -> {
    Scanner sc = new Scanner(System.in);
    while (sc.hasNext()) {
        if ("exit".equals(sc.next())) {
            consumer.wakeup();
            try {
                /*等待主线程完成提交偏移量、关闭消费者等操作*/
                mainThread.join();
                break;
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}).start();

try {
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS));
        for (ConsumerRecord<String, String> rd : records) {
            System.out.printf("topic = %s,partition = %d, key = %s, value = %s, offset = %d,\n",
                              rd.topic(), rd.partition(), rd.key(), rd.value(), rd.offset());
        }
    }
} catch (WakeupException e) {
    //对于 wakeup() 调用引起的 WakeupException 异常可以不必处理
} finally {
    consumer.close();
    System.out.println("consumer 关闭");
}

7. Независимые потребители

Поскольку Kafka рассчитана на высокую пропускную способность и низкую задержку, в Kafka потребители обычно подчиняются группе, поскольку вычислительная мощность одного потребителя ограничена. Но иногда ваши потребности могут быть очень простыми, например, вам может понадобиться только один потребитель для чтения данных из всех разделов темы или определенного раздела.В настоящее время нет необходимости в группах потребителей и ребалансировке, просто нужно назначить темы или разделы для потребителей, затем начните читать сообщения и отправлять смещения.

В этом случае нет необходимости подписываться на тему, вместо этого потребитель назначает раздел себе. Потребитель может подписаться на тему (и присоединиться к группе потребителей) или назначить себе раздел, но не то и другое одновременно. Пример кода для выделения разделов выглядит следующим образом:

List<TopicPartition> partitions = new ArrayList<>();
List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);

/*可以指定读取哪些分区 如这里假设只读取主题的 0 分区*/
for (PartitionInfo partition : partitionInfos) {
    if (partition.partition()==0){
        partitions.add(new TopicPartition(partition.topic(), partition.partition()));
    }
}

// 为消费者指定分区
consumer.assign(partitions);


while (true) {
    ConsumerRecords<Integer, String> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS));
    for (ConsumerRecord<Integer, String> record : records) {
        System.out.printf("partition = %s, key = %d, value = %s\n",
                          record.partition(), record.key(), record.value());
    }
    consumer.commitSync();
}

Приложение: Дополнительные свойства Kafka Consumer

1. fetch.min.byte

Минимальное количество байтов, которое потребитель может получить с сервера. Если количество доступных данных меньше установленного значения, брокер будет ждать достаточного количества доступных данных, прежде чем вернуть их потребителю.

2. fetch.max.wait.ms

Время ожидания для брокера, чтобы вернуть данные потребителям, по умолчанию составляет 500 мс.

3. max.partition.fetch.bytes

Это свойство указывает максимальное количество байтов, которое сервер возвращает потребителям из каждого раздела, по умолчанию 1 МБ.

4. session.timeout.ms

Количество времени, в течение которого потребитель может отключиться от сервера, прежде чем он будет считаться мертвым, по умолчанию составляет 3 с.

5. auto.offset.reset

Это свойство указывает, что должен делать потребитель при чтении раздела без смещения или если смещение недопустимо:

  • last (по умолчанию): в случае недопустимого смещения потребитель начнет чтение данных с последней записи (последней записи, сгенерированной после запуска потребителя);
  • самый ранний: в случае, если смещение неверно, потребитель будет читать записи раздела с начальной позиции.

6. enable.auto.commit

Следует ли автоматически фиксировать смещения, значение по умолчанию — true. Чтобы избежать повторного потребления и потери данных, можно установить значение false.

7. client.id

Идентификатор клиента, используемый сервером для идентификации источника сообщения.

8. max.poll.records

одиночный звонокpoll()Количество записей, которые может вернуть метод.

9. receive.buffer.bytes & send.buffer.byte

Эти два параметра определяют размер буферов приема и отправки пакетов TCP-сокета соответственно, а -1 представляет собой значение по умолчанию для операционной системы.

использованная литература

  1. Неха Наркхеде, Гвен Шапира, Тодд Палино (автор), Сюэ Миндэн (переводчик). Полное руководство по Кафке. People's Posts and Telecommunications Press. 26 декабря 2017 г.

Другие статьи серии больших данных можно найти в проекте с открытым исходным кодом GitHub.:Руководство для начинающих по большим данным