Кафка для очереди сообщений

Java Kafka

Особенности Кафки

Kafka была сначала разработана LinkedIn как распределенная система обмена сообщениями на основе публикации/подписки, а позже стала проектом верхнего уровня Apache. Основные особенности заключаются в следующем:

1. Высокая пропускная способность как для публикации, так и для подписки

Целью разработки Kafka является обеспечение сохраняемости сообщений с временной сложностью O(1) и обеспечение постоянной производительности доступа даже к терабайтам данных и более. Даже на очень дешевых коммерческих машинах одна машина может поддерживать передачу 100 тысяч сообщений в секунду.

2. Сохранение сообщения

Сохраняйте сообщения на диск, чтобы их можно было использовать для пакетного потребления, например ETL, а также для приложений реального времени. Предотвратите потерю данных за счет сохранения данных на жестком диске и репликации.

3. Распределенный

Он поддерживает разделение сообщений и распределенное потребление между серверами, обеспечивая при этом последовательную передачу сообщений внутри каждого раздела. Это упрощает масштабирование, и будет несколько распределенных производителей, брокеров и потребителей. Машины могут быть расширены без простоев.

4. Используйте режим извлечения для получения сообщений

Состояние обрабатываемого сообщения сохраняется на стороне потребителя, а не на стороне сервера.Брокер не имеет состояния, и потребитель сам сохраняет смещение.

5. Поддержка онлайн и оффлайн сценариев.

Поддерживается как автономная обработка данных, так и обработка данных в реальном времени.

Основные понятия Кафки

1. Broker

Один или несколько серверов в кластере Kafka вместе называются брокером.

2. Topic

Каждое сообщение, опубликованное в Kafka, имеет категорию Topic. (физически разные Тематические сообщения хранятся отдельно. Логически, хотя сообщение темы хранится на одном или нескольких брокерах, пользователю нужно только указать тему сообщения для создания или потребления данных, не беспокоясь о том, где данные хранятся)

3. Partition

Тема физически сгруппирована.Тема может быть разделена на несколько разделов, и каждый раздел представляет собой упорядоченную очередь. Каждому сообщению в разделе назначается упорядоченный идентификатор (смещение).

4. Producer

Под производителем сообщений и данных можно понимать клиента, который отправляет сообщения в Kafka.

5. Consumer

Потребителей сообщений и данных можно понимать как клиентов, которые получают сообщения от Kafka.

6. Consumer Group

Каждый Потребитель принадлежит к определенной Группе Потребителей (Имя группы может быть указано для каждого Потребителя, если Имя группы не указано, оно принадлежит к Группе по умолчанию). Это метод, который Kafka использует для реализации широковещательной рассылки сообщения Topic (отправляемой всем Потребителям) и одноадресной рассылки (отправляемой любому Потребителю). Тема может иметь несколько групп потребителей. Тематические сообщения будут реплицированы (концептуально, не реплицированы на самом деле) во все группы потребителей, но каждая группа потребителей будет отправлять сообщения только одному потребителю в группе потребителей. Если вы хотите внедрить широковещательную рассылку, при условии, что у каждого потребителя есть независимая группа потребителей. Если вы хотите реализовать одноадресную рассылку, пока все потребители находятся в одной группе потребителей. Группы потребителей также можно использовать для свободной группировки потребителей без многократной отправки сообщений в разные темы.

Кафка инсталляция

Пользователи Mac используют HomeBrew для установки, и brew необходимо обновить перед установкой.

brew update

Затем установите кафку

brew install kafka

После завершения установки вы можете просмотреть файл конфигурации kafka

cd /usr/local/etc/kafka

kafka 配置文件
Расположение kafka, установленного HomeBrew на моем компьютере, — это /usr/local/Cellar/kafka/0.11.0.1/bin , вы можете видеть, что версия kafka, установленная HomeBrew, уже 0.11.0.1.

Для Kafka требуется zookeeper.Когда HomeBrew устанавливает kafka, Zookeeper будет установлен одновременно. Давайте сначала запустим zookeeper:

zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties

Затем запустите кафку

cd /usr/local/Cellar/kafka/0.11.0.1
./bin/kafka-server-start /usr/local/etc/kafka/server.properties

Создайте тему, установите количество разделов равным 2 и назовите тему тестовой темой. В следующих примерах эта тема используется

cd /usr/local/Cellar/kafka/0.11.0.1
./bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic test-topic

Просмотр созданных тем

cd /usr/local/Cellar/kafka/0.11.0.1
./bin/kafka-topics --list --zookeeper localhost:2181

Тест командной строки Kafka

Отправить сообщение

cd /usr/local/Cellar/kafka/0.11.0.1/bin
kafka-console-producer --broker-list localhost:9092 --topic test-topic

Использование сообщений

cd /usr/local/Cellar/kafka/0.11.0.1/bin
kafka-console-consumer --bootstrap-server localhost:9092 --topic test-topic --from-beginning

удалить тему

cd /usr/local/Cellar/kafka/0.11.0.1/bin
./bin/kafka-topics --delete --zookeeper localhost:2181 --topic test-topic

Если в server.properties в конфигурационном файле, загружаемом при запуске kafka, не настроено delete.topic.enable=true, то удаление в это время не является реальным удалением, а тема помечается как: отмечена для удаления

просмотреть все темы

cd /usr/local/Cellar/kafka/0.11.0.1/bin
./bin/kafka-topics --zookeeper localhost:2181 --list 

Физически удалять темы

登录zookeeper客户端:/usr/local/Cellar/zookeeper/3.4.10/bin/zkCli
找到topic所在的目录:ls /brokers/topics
找到要删除的topic,执行命令:rmr /brokers/topics/test-topic 即可,此时topic被彻底删除

Доступ к Java-клиенту

1. Добавьте зависимости в файл pom проекта maven.

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.11.0.1</version>
</dependency>

2. Производитель сообщений

package org.study.kafka;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.HashMap;
import java.util.Map;

public class ProducerSample {

    public static void main(String[] args) {
        Map<String, Object> props = new HashMap<String, Object>();
        props.put("zk.connect", "127.0.0.1:2181");//zookeeper 的地址
        props.put("bootstrap.servers", "localhost:9092");//用于建立与 kafka 集群连接的 host/port 组。
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        String topic = "test-topic";
        Producer<String, String> producer = new KafkaProducer<String, String>(props);
        producer.send(new ProducerRecord<String, String>(topic, "idea-key2", "java-message 1"));
        producer.send(new ProducerRecord<String, String>(topic, "idea-key2", "java-message 2"));
        producer.send(new ProducerRecord<String, String>(topic, "idea-key2", "java-message 3"));

        producer.close();
    }

}

3. Потребители сообщений

package org.study.kafka;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;

public class ConsumerSample {

    public static void main(String[] args) {
        String topic = "test-topic";// topic name

        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");//用于建立与 kafka 集群连接的 host/port 组。
        props.put("group.id", "testGroup1");// Consumer Group Name
        props.put("enable.auto.commit", "true");// Consumer 的 offset 是否自动提交
        props.put("auto.commit.interval.ms", "1000");// 自动提交 offset 到 zookeeper 的时间间隔,时间是毫秒
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        Consumer<String, String> consumer = new KafkaConsumer(props);
        consumer.subscribe(Arrays.asList(topic));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records)
                System.out.printf("partition = %d, offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value());
        }

    }
}

4. Запустите зоопарк

zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties

5. Запустите кафка-сервер

kafka-server-start /usr/local/etc/kafka/server.properties

6. Запустите потребителя

Сначала запустите приемник, чтобы, когда производитель отправляет сообщение, вы могли видеть запись сообщения в бэкэнде потребителя.

7. Запустите продюсер

Запустите Producer, опубликуйте несколько сообщений, и вы сможете увидеть полученные сообщения в консоли Consumer.

Consumer 控制台

Конфигурация кластера Кафка

Обычно для kafka существует три типа конфигураций кластера, а именно: один узел — один брокер, один узел — несколько брокеров, несколько узлов — несколько брокеров.

Первые два на самом делеНа официальном сайте есть введение.

single node - single broker

单节点单 broker

1. Запустите зоопарк

zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties

2. Запустите брокера кафки

kafka-server-start /usr/local/etc/kafka/server.properties

3. Создайте тему кафки

kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic topic-singlenode-singlebroker

4. Запустите производителя для отправки информации

kafka-console-producer --broker-list localhost:9092 --topic topic-singlenode-singlebroker

Два параметра Broker-List и Topic являются обязательными.broker-list указывает адрес брокера, к которому необходимо подключиться, в формате node_address:port. тема обязательна, потому что сообщения должны быть отправлены подписавшимся Группа потребителей темы. Теперь можно ввести некоторую информацию в командную строку, и каждая строка будет рассматриваться как сообщение.

发送消息

5. Запустите потребителя для потребления сообщений

kafka-console-consumer --bootstrap-server localhost:9092 --topic topic-singlenode-singlebroker

После запуска зоопарка, брокера, производителя и потребителя в разных окнах терминала Введите сообщение в терминал производителя, и сообщение будет отображаться в терминале потребителя.

消息显示

single node - multiple broker

单节点多 broker

1. Запустите зоопарк

zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties

2. Запустите брокера

Если вам нужно запустить несколько брокеров на одном узле (т. е. машине) (здесь для примера запущены три брокера), вам нужно подготовить несколько файлов server.properties, поэтому вам нужно скопировать /usr/local/etc /kafka/server файл .properties. Поскольку для каждого брокера необходимо указать отдельный файл конфигурации свойств, три свойства Broker.id , port , log.dir должны быть разными.

Создайте новый каталог kafka-example и три каталога для хранения журналов.

mkdir kafka-example
mkdir kafka-logs-1
mkdir kafka-logs-2
mkdir kafka-logs-3

Скопируйте файл /usr/local/etc/kafka/server.properties три раза.

cp server.properties /Users/niwei/Downloads/kafka-example/server-1.properties
cp server.properties /Users/niwei/Downloads/kafka-example/server-2.properties
cp server.properties /Users/niwei/Downloads/kafka-example/server-3.properties

В файле конфигурации server-1.properties брокера1 необходимо изменить следующие параметры:

broker.id=1
port=9093
log.dirs=/Users/niwei/Downloads/kafka-example/kafka-logs-1

В файле конфигурации server-2.properties брокера2 необходимо изменить следующие параметры:

broker.id=2
port=9094
log.dirs=/Users/niwei/Downloads/kafka-example/kafka-logs-2

В файле конфигурации server-3.properties брокера3 необходимо изменить следующие параметры:

broker.id=3
port=9095
log.dirs=/Users/niwei/Downloads/kafka-example/kafka-logs-3

запустить каждого брокера

cd /Users/niwei/Downloads/kafka-example
kafka-server-start server-1.properties
kafka-server-start server-2.properties
kafka-server-start server-3.properties

3. Создайте тему

Создайте тему с именем theme-singlenode-multiplebroker.

kafka-topics --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic topic-singlenode-multiplebroker

4. Запустите производителя для отправки информации

Если производителю необходимо подключиться к нескольким брокерам, ему необходимо передать параметр Broker-List.

kafka-console-producer --broker-list localhost:9093, localhost:9094, localhost:9095 --topic topic-singlenode-multiplebroker

5. Запустите потребителя для потребления сообщений

kafka-console-consumer --zookeeper localhost:2181 --topic topic-singlenode-multiplebroker

单节点多 broker 消费消息

multiple node - multiple broker

多节点多 broker
В кластере с несколькими узлами и несколькими брокерами на каждом узле необходимо установить Kafka, и все брокеры подключены к одному и тому же зоопарку. Конечно, zookeeper также можно настроить как кластер, конкретные шаги см. в том, что я писал ранее.Создайте кластер зоопарка.

1. Конфигурация кластера Kafka

broker.id=1  #当前机器在集群中的唯一标识
port=9093 #当前 kafka 对外提供服务的端口,默认是 9092
host.name=192.168.121.101 #这个参数默认是关闭的,在0.8.1有个bug,DNS解析问题,失败率的问题。
log.dirs=/Users/niwei/Downloads/kafka-example/kafka-logs-1 #消息存放的目录,这个目录可以配置为逗号分割的表达式
zookeeper.connect=192.168.120.101:2181,192.168.120.102:2181,192.168.120.103:2181 #设置 zookeeper 集群的连接端口

num.network.threads=3 #这个是 borker 进行网络处理的线程数
num.io.threads=5 #这个是 borker 进行 IO 处理的线程数
socket.send.buffer.bytes=102400 #发送缓冲区的大小,数据先回存储到缓冲区了到达一定的大小后在发送能提高性能
socket.receive.buffer.bytes=102400 #接收缓冲区的大小,当数据到达一定大小后在序列化到磁盘
socket.request.max.bytes=104857600 #这个参数是向 kafka 请求消息或者向 kafka 发送消息的请求的最大数,这个值不能超过 jvm 的堆栈大小
num.partitions=1 #默认的分区数,一个 topic 默认1个分区数
log.retention.hours=24 #默认消息的最大持久化时间,24小时
message.max.byte=5242880  #消息保存的最大值5M
default.replication.factor=2  #kafka 保存消息的副本数,如果一个副本失效了,另一个还可以继续提供服务
replica.fetch.max.bytes=5242880  #取消息的最大直接数
log.segment.bytes=1073741824 #这个参数是因为 kafka 的消息是以追加的形式落地到文件,当超过这个值的时候,kafka 会新建一个文件
log.retention.check.interval.ms=300000 #每隔 300000 毫秒去检查上面配置的 log 失效时间(log.retention.hours=24 ),到目录查看是否有过期的消息如果有则删除
log.cleaner.enable=false #是否启用 log 压缩,一般不用启用,启用的话可以提高性能

Поскольку он многоузловой и мультиброкерский, конфигурационный файл server.properties каждого брокера необходимо изменить в соответствии с приведенными выше инструкциями.

2. Модификация конфигурации производителя

kafka-console-producer --broker-list 192.168.21.1:9092,192.168.21.2:9092,192.168.21.3:9092 --topic topic-multiplenode-multiplebroker

3. Модификация конфигурации потребителя

kafka-console-consumer --zookeeper 192.168.120.101:2181,192.168.120.102:2181,192.168.120.103:2181 --topic topic-multiplenode-multiplebroker

Конфигурация высокой надежности Kafka

Kafka обеспечивает высокую эластичность избыточности данных.Для сценариев, требующих высокой надежности данных, вы можете увеличить количество резервных копий данных (replication.factor), увеличить минимальное количество реплик записи (min.insync.replicas) и т. д. и т. д., но это повлияет на производительность. Наоборот, производительность повышается, а надежность снижается, и пользователям приходится делать некоторый компромисс между своими бизнес-характеристиками.

Чтобы данные, записываемые в Kafka, были безопасными и надежными, необходимы следующие конфигурации:

1. Конфигурация темы

replication.factor>=3, то есть количество реплик не менее 3 2

2. Настройка брокера

Условие выбора лидера unclean.leader.election.enable=false

3. конфигурация производителя

request.required.acks=-1, производитель.тип=синхронизация

Секрет высокой производительности в Kafka

С функциональной точки зрения ПО промежуточного слоя сообщений делится на две категории: запись данных и чтение данных Оптимизация также может рассматриваться с этих двух сторон.

Для оптимизации скорости записи Kafka использует следующие методы:

1. Последовательное письмо

Большинство дисков по-прежнему являются механическими структурами (SSD не обсуждается). механическое перемещение (относительно памяти). ) отнимает много времени, из-за чего скорость записи на диск оказывается на порядки хуже скорости записи в память. Чтобы избежать затрат времени, вызванных случайной записью, Kafka использует последовательную запись для хранения данных, как показано на следующем рисунке:

顺序写
Каждое сообщение добавляется к разделу, принадлежащему диску с последовательной записью, поэтому эффективность очень высока. Но у этого подхода есть недостаток: нет возможности удалить данные. Таким образом, Kafka не будет удалять данные, он сохранит все данные, у каждого потребителя (Consumer) есть по одному для каждой темы. смещение используется для указания количества прочитанных фрагментов данных.
消费消息
На приведенном выше рисунке есть два потребителя, и Consumer1 имеет два смещения, соответствующие Partition0 и Partition1 соответственно (при условии, что каждая тема имеет один раздел). Consumer2 имеет смещение, соответствующее Partition2. Это смещение сохраняется клиентским SDK.Kafka's Broker полностью игнорирует существование этой вещи.В нормальных условиях SDK сохранит его в zookeeper. Если сообщение не удалить, жесткий диск точно будет заполнен, поэтому Kakfa предлагает две стратегии удаления данных. Один основан на времени, а другой на размере файла раздела.Для конкретной конфигурации см. соответствующий документ конфигурации. Даже если оно записывается последовательно, большое количество мелких операций ввода-вывода, которые слишком часты, вызовут узкое место на диске, поэтому обработка Kakfa здесь заключается в группировании этих сообщений вместе и отправке их пакетами, что уменьшает чрезмерную работу дисковый ввод-вывод и вместо отправки одного сообщения за раз.

2. Файлы с отображением памяти

Даже если она будет записываться на жесткий диск последовательно, скорость доступа жесткого диска все равно невозможно догнать в памяти. Таким образом, данные Kafka не записываются на жесткий диск в режиме реального времени, а в полной мере использует память подкачки современной операционной системы, чтобы использовать память для повышения эффективности ввода-вывода. Файлы отображения памяти (далее именуемые mmap) также преобразуются в файлы отображения памяти, которые обычно могут представлять файлы данных 20G в 64-разрядной операционной системе.Его принцип работы заключается в непосредственном использовании страницы операционной системы для реализации прямого отображения файлы в физическую память. Операции с физической памятью после выполнения сопоставления синхронизируются с жестким диском (при необходимости операционной системой). Процесс mmap читает и записывает память так же, как читает и записывает жесткие диски, и ему не нужно заботиться о размере памяти.Для нас есть виртуальная память. Использование этого метода может значительно улучшить ввод-вывод, потому что он экономит накладные расходы на копирование пространства пользователя в пространство ядра (вызов функции чтения файла сначала поместит данные в память пространства ядра, а затем скопирует их в пространство ядра). пространство пользователя в памяти) Но у этого есть и очевидный недостаток — ненадежность, данные, записываемые в mmap, на самом деле не записываются на жесткий диск, и операционная система фактически записывает данные на жесткий диск только тогда, когда программа активно вызывает флеш. Поэтому в Kafka предусмотрен параметр - Producer.type для управления активно ли сбрасывается.Если Kafka пишет в mmap, то сбрасывает сразу и потом возвращается в Producer, что называется синхронизацией (sync).Если возвращает сразу после записи в mmap , Продюсер не вызывает флеш. , называется асинхронным (async).

3. Стандартизированный двоичный формат сообщения

Чтобы избежать неэффективного копирования байтов, влияние является значительным, особенно в условиях высокой нагрузки. Чтобы избежать этого, Kafka принимает стандартизированный двоичный формат сообщений, используемый производителями, брокерами и потребителями, так что блоки данных могут свободно передаваться между ними без преобразования, что снижает накладные расходы на копирование байтов.

При оптимизации скорости чтения Kafka в основном использует нулевое копирование.

Технология нулевого копирования:

В традиционном режиме читаем файл с жесткого диска вот так

文件传输到 Socket 的常规方式
(1) Операционная система считывает данные с диска в область буфера страниц пространства ядра.

(2) Приложение считывает данные из пространства ядра в кеш пользовательского пространства.

(3) Приложение записывает данные в кэш сокетов пространства ядра.

(4) Операционная система записывает данные из кэша сокетов в кэш сетевой карты, чтобы данные можно было отправить по сети.

Это явно неэффективно, тут четыре копии и два системных вызова. Операционные системы Unix предоставляют оптимизированный путь для передачи данных из буфера страницы в сокет для этой ситуации. В Linux это делается с помощью системного вызова sendfile. Java предоставляет метод для доступа к этому системному вызову: FileChannel.transferTo API. Для этого подхода требуется только одна копия: операционная система отправляет данные непосредственно из кэша страниц в сеть, и в этом оптимизированном пути требуется только последний шаг копирования данных в кэш сетевой карты.

零拷贝方式传输到 Socket
Эта технология на самом деле очень распространена.Есть также очень подробное введение в проблему C10K.Nginx также использует эту технологию.Вы можете найти много информации с небольшим поиском.

Секрет скорости Kafka в том, что он превращает все сообщения в один файл. Увеличение ввода/вывода с помощью mmap При записи данных он добавляется в конце, поэтому скорость оптимальная, при чтении данных напрямую выводится с помощью sendfile. Таким образом, нет никакого смысла просто тестировать скорость MQ.Насильственный подход Кафки снял нижнее белье MQ и больше похож на насильственный передатчик данных.