1. Процесс отправки сообщения производителем
Во-первых, давайте представим процесс отправки сообщений производителями Kafka:
- Kafka упаковывает отправленное сообщение как объект ProducerRecord.Объект ProducerRecord содержит целевую тему и содержимое для отправки, а также может указывать ключ и раздел. Перед отправкой объекта ProducerRecord производитель сериализует объекты ключей и значений в массивы байтов, чтобы их можно было передавать по сети.
- Далее данные передаются разделителю. Если в объекте ProducerRecord ранее были указаны разделы, то разделитель ничего не сделает. Если раздел не указан, разделитель выберет раздел на основе ключа объекта ProducerRecord, а затем запись будет добавлена в пакет записей, и все сообщения в этом пакете будут отправлены в ту же тему и вышестоящий раздел. Отдельный поток отвечает за отправку пакетов этих записей соответствующему брокеру.
- Сервер возвращает ответ, когда он получает эти сообщения. Если сообщение успешно записано в Kafka, возвращается объект RecordMetaData, который содержит информацию о теме и разделе, а также смещение, записанное в разделе. Если запись не удалась, возвращается ошибка. Производитель попытается повторно отправить сообщение после получения ошибки.Если указанное количество попыток не увенчалось успехом, сразу будет выдано исключение, и повторная попытка не будет сделана.
2. Создайте производителя
2.1 Зависимости проекта
Этот проект создан с помощью Maven. Если вы хотите вызвать API производителя Kafka, вам нужно импортироватьkafka-clients
Зависимость, как показано ниже:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.2.0</version>
</dependency>
2.2 Создать производителя
При создании производителя Kafka необходимо указать следующие три свойства:
- bootstrap.servers: укажите список адресов посредников. Список не обязательно должен содержать все адреса посредников. Производитель будет искать информацию о посредниках у данного посредника. Тем не менее, для обеспечения отказоустойчивости рекомендуется предоставить как минимум информацию о двух брокерах;
- key.serializer: сериализатор для указанного ключа;
- value.serializer: Сериализатор для указанного значения.
Образец созданного кода выглядит следующим образом:
public class SimpleProducer {
public static void main(String[] args) {
String topicName = "Hello-Kafka";
Properties props = new Properties();
props.put("bootstrap.servers", "hadoop001:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
/*创建生产者*/
Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 10; i++) {
ProducerRecord<String, String> record = new ProducerRecord<>(topicName, "hello" + i,
"world" + i);
/* 发送消息*/
producer.send(record);
}
/*关闭生产者*/
producer.close();
}
}
Весь пример кода для этой статьи можно скачать с Github:kafka-basis
2.3 Тестирование
1. Запустите Кафку
Работа Kafka зависит от zookeeper, который нужно запустить заранее.Вы можете запустить встроенный zookeeper Kafka, или можете запустить собственную установку:
# zookeeper启动命令
bin/zkServer.sh start
# 内置zookeeper启动命令
bin/zookeeper-server-start.sh config/zookeeper.properties
Запускаем одноузловую кафку для тестирования:
# bin/kafka-server-start.sh config/server.properties
2. Создать тему
# 创建用于测试主题
bin/kafka-topics.sh --create \
--bootstrap-server hadoop001:9092 \
--replication-factor 1 --partitions 1 \
--topic Hello-Kafka
# 查看所有主题
bin/kafka-topics.sh --list --bootstrap-server hadoop001:9092
3. Запустите потребителя
Запустите консольный потребитель, чтобы наблюдать за ситуацией записи.Команда запуска выглядит следующим образом:
# bin/kafka-console-consumer.sh --bootstrap-server hadoop001:9092 --topic Hello-Kafka --from-beginning
4. Запустите проект
На данный момент вы можете увидеть потребительскую консоль, вывод выглядит следующим образом, здесьkafka-console-consumer
Будет распечатана только информация о значении, информация о ключе не будет распечатана.
2.4 Возможные проблемы
Здесь может возникнуть проблема, заключающаяся в том, что программа-производитель после запуска находилась в состоянии ожидания. Обычно это происходит, когда вы запускаете Kafka с конфигурацией по умолчанию, которая требуетserver.properties
в файлеlisteners
Конфигурация для изменения:
# hadoop001 为我启动kafka服务的主机名,你可以换成自己的主机名或者ip地址
listeners=PLAINTEXT://hadoop001:9092
2. Отправить сообщение
Пример программы выше вызываетsend
Метод ничего не делает после отправки сообщения, в этом случае у нас нет возможности узнать результат отправки сообщения. Если вы хотите узнать результат отправки сообщения, вы можете использовать синхронную отправку или асинхронную отправку.
2.1 Синхронная передача
вызовsend
затем можно вызвать методget()
метод,send
Возвращаемое значение метода — это объект Future
for (int i = 0; i < 10; i++) {
try {
ProducerRecord<String, String> record = new ProducerRecord<>(topicName, "k" + i, "world" + i);
/*同步发送消息*/
RecordMetadata metadata = producer.send(record).get();
System.out.printf("topic=%s, partition=%d, offset=%s \n",
metadata.topic(), metadata.partition(), metadata.offset());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
Результат, полученный в это время, следующий: смещение связано с количеством вызовов, и все записи размещены в разделе 0, это связано с тем, чтоHello-Kafka
тема, использование--partitions
Укажите его количество разделов как 1, то есть раздел только один.
topic=Hello-Kafka, partition=0, offset=40
topic=Hello-Kafka, partition=0, offset=41
topic=Hello-Kafka, partition=0, offset=42
topic=Hello-Kafka, partition=0, offset=43
topic=Hello-Kafka, partition=0, offset=44
topic=Hello-Kafka, partition=0, offset=45
topic=Hello-Kafka, partition=0, offset=46
topic=Hello-Kafka, partition=0, offset=47
topic=Hello-Kafka, partition=0, offset=48
topic=Hello-Kafka, partition=0, offset=49
2.2 Асинхронная отправка
Обычно нас волнует не успех отправки, а больше неудача, поэтому Kafka предоставляет функции асинхронной отправки и обратного вызова. код показывает, как показано ниже:
for (int i = 0; i < 10; i++) {
ProducerRecord<String, String> record = new ProducerRecord<>(topicName, "k" + i, "world" + i);
/*异步发送消息,并监听回调*/
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
System.out.println("进行异常处理");
} else {
System.out.printf("topic=%s, partition=%d, offset=%s \n",
metadata.topic(), metadata.partition(), metadata.offset());
}
}
});
}
3. Пользовательский разделитель
У Kafka есть механизм разделения по умолчанию:
- Если значение ключа равно null, используйте алгоритм циклического перебора для равномерного распределения сообщений по каждому разделу;
- Если значение ключа не равно нулю, Kafka будет использовать встроенный алгоритм хеширования для хэширования ключа и распределения его по разделам.
В некоторых случаях у вас могут быть собственные требования к разделению, которые можно реализовать с помощью пользовательского разделителя. Вот пример пользовательского разделителя:
3.1 Пользовательский разделитель
/**
* 自定义分区器
*/
public class CustomPartitioner implements Partitioner {
private int passLine;
@Override
public void configure(Map<String, ?> configs) {
/*从生产者配置中获取分数线*/
passLine = (Integer) configs.get("pass.line");
}
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value,
byte[] valueBytes, Cluster cluster) {
/*key 值为分数,当分数大于分数线时候,分配到 1 分区,否则分配到 0 分区*/
return (Integer) key >= passLine ? 1 : 0;
}
@Override
public void close() {
System.out.println("分区器关闭");
}
}
Вам необходимо указать разделитель при создании производителя и параметры конфигурации, требуемые разделителем:
public class ProducerWithPartitioner {
public static void main(String[] args) {
String topicName = "Kafka-Partitioner-Test";
Properties props = new Properties();
props.put("bootstrap.servers", "hadoop001:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
/*传递自定义分区器*/
props.put("partitioner.class", "com.heibaiying.producers.partitioners.CustomPartitioner");
/*传递分区器所需的参数*/
props.put("pass.line", 6);
Producer<Integer, String> producer = new KafkaProducer<>(props);
for (int i = 0; i <= 10; i++) {
String score = "score:" + i;
ProducerRecord<Integer, String> record = new ProducerRecord<>(topicName, i, score);
/*异步发送消息*/
producer.send(record, (metadata, exception) ->
System.out.printf("%s, partition=%d, \n", score, metadata.partition()));
}
producer.close();
}
}
3.2 Тестирование
Необходимо создать тему как минимум с двумя разделами:
bin/kafka-topics.sh --create \
--bootstrap-server hadoop001:9092 \
--replication-factor 1 --partitions 2 \
--topic Kafka-Partitioner-Test
На этом этапе ввод выглядит следующим образом: вы можете видеть, что те, у кого баллы больше или равны 6, относятся к 1-му дивизиону, а те, у кого меньше 6 баллов, относятся к 0-му дивизиону.
score:6, partition=1,
score:7, partition=1,
score:8, partition=1,
score:9, partition=1,
score:10, partition=1,
score:0, partition=0,
score:1, partition=0,
score:2, partition=0,
score:3, partition=0,
score:4, partition=0,
score:5, partition=0,
分区器关闭
4. Другие атрибуты производителя
При создании вышеупомянутых производителей указывается только адрес службы, сериализатор ключей и сериализатор значений.На самом деле производители Kafka имеют множество настраиваемых свойств, а именно:
1. acks
Параметр acks указывает, сколько реплик раздела должно получить сообщение, прежде чем производитель сочтет сообщение успешным:
- acks=0: сообщение считается успешным, когда оно отправлено, и оно не будет ждать ответа от сервера;
- acks=1: пока ведущий узел кластера получает сообщение, производитель получит успешный ответ от сервера;
- acks=all: производитель получит успешный ответ от сервера только тогда, когда все узлы, участвующие в репликации, получат сообщение.
2. buffer.memory
Устанавливает размер буфера памяти производителя.
3. compression.type
По умолчанию отправляемые сообщения не сжимаются. Если вы хотите сжать, вы можете настроить этот параметр, необязательные значения — snappy, gzip, lz4.
4. retries
Количество повторных отправок сообщения после возникновения ошибки. Если заданное значение достигнуто, производитель прекратит повторную попытку и вернет ошибку.
5. batch.size
Если в один и тот же раздел необходимо отправить несколько сообщений, производитель поместит их в один и тот же пакет. Этот параметр указывает объем памяти, который может использовать пакет, в байтах.
6. linger.ms
Этот параметр указывает, как долго производитель ожидает добавления дополнительных сообщений к пакету перед отправкой пакета.
7. clent.id
Идентификатор клиента, используемый сервером для идентификации источника сообщения.
8. max.in.flight.requests.per.connection
Указывает, сколько сообщений может отправить производитель, прежде чем получит ответ от сервера. Более высокие значения используют больше памяти, но также улучшают пропускную способность, установка значения 1 гарантирует, что сообщения будут записываться на сервер в том порядке, в котором они были отправлены, даже если происходят повторные попытки.
9. timeout.ms, request.timeout.ms & metadata.fetch.timeout.ms
- timeout.ms указывает время подтверждения, в течение которого посредник должен ждать, пока синхронная реплика вернет сообщение;
- request.timeout.ms указывает время ожидания производителем ответа сервера при отправке данных;
- metadata.fetch.timeout.ms указывает, как долго производитель ожидает ответа от сервера при получении метаданных (например, кто является лидером раздела).
10. max.block.ms
указанный в звонкеsend()
метод или использованиеpartitionsFor()
Время блокировки производителя, когда метод получает метаданные. Эти методы блокируются, когда буфер отправки производителя заполнен или когда нет доступных метаданных. Когда время блокировки достигает max.block.ms, производитель выдает исключение тайм-аута.
11. max.request.size
Этот параметр используется для управления размером запроса, отправляемого производителем. Он может относиться к максимальному значению одного отправленного сообщения или к общему размеру всех сообщений в одном запросе. Например, если предположить, что это значение равно 1000 КБ, то самое большое отдельное сообщение, которое может быть отправлено, равно 1000 КБ, или производитель может отправить пакет из 1000 сообщений, каждое размером 1 КБ, в одном запросе.
12. receive.buffer.bytes & send.buffer.byte
Эти два параметра определяют размер буферов приема и отправки пакетов TCP-сокета соответственно, а -1 представляет собой значение по умолчанию для операционной системы.
использованная литература
- Неха Наркхеде, Гвен Шапира, Тодд Палино (автор), Сюэ Миндэн (переводчик). Полное руководство по Кафке. People's Posts and Telecommunications Press. 26 декабря 2017 г.
Другие статьи серии больших данных можно найти в проекте с открытым исходным кодом GitHub.:Руководство для начинающих по большим данным