Kafka теперь широко используется в нашей компании, например, для сбора логов для AdServer и системы сообщений для службы Counter и т. д.
В этой статье сначала будут представлены некоторые основные концепции Kafka, затем рассказывается, как построить кластер Kafka и как его использовать, и, наконец, кратко представлен принцип реализации хранилища файлов Kafka.
Введение в основные понятия
-
Broker
Его можно просто понимать как узел Kafka, а несколько узлов Broker составляют весь кластер Kafka; -
Topic
набор сообщений определенного типа;-
Partition
Это физическая группировка темы, и несколько разделов будут храниться в разных узлах Kafka децентрализованным образом; сообщения одного раздела гарантированно будут упорядочены, но сообщения всей темы не обязательно упорядочены; -
Segment
Файл указанного размера, содержащий содержимое сообщения, состоящий из файла индекса и файла журнала; раздел состоит из нескольких файлов сегментов.-
Offset
Значение индекса сообщения в файле сегмента, считая от 0
-
-
Replica (N)
Резервная копия сообщения, что означает, что каждый Раздел будет иметь N идентичных резервных копий, и эти резервные копии будут максимально распределены и сохранены на разных машинах;
-
-
Producer
Публиковать новые сообщения в тему через Брокер; -
Consumer
Получать сообщения из Темы через Брокера;
Как использовать Кафку
Во-первых, давайте представим, как построить кластер Kafka.Мы создаем кластер из 2 узлов на основе docker-compose,здесьпредставляет собой подробный вводный документ.
Создайте кластер Kafka
Сначала напишитеdocker-compose.yml
документ:
version: '2'
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
ports:
- "9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: 192.168.99.100
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_CREATE_TOPICS: test:1:1
volumes:
- /var/run/docker.sock:/var/run/docker.sock
вKAFKA_ADVERTISED_HOST_NAME
Его необходимо заменить на ваш локальный IP-адрес, а неlocalhost
0.0.0.0
такие адреса, как .KAFKA_CREATE_TOPICS
Это должно продемонстрировать, что некоторые темы по умолчанию могут быть созданы при запуске кластера Kafka;test:1:1
Смысл в том, чтобы создать имя по умолчанию дляtest
, Перегородка
и тема с числом реплик, равным 1.
существуетdocker-compose.yml
Каталог, в котором находится файл, выполняетсяdocker-compose up -d --scale kafka=2
Кластер Kafka с двумя узлами будет запущен локально:
➜ Kafka git:(master) docker-compose up -d --scale kafka=2
Creating network "kafka_default" with the default driver
Creating kafka_kafka_1 ... done
Creating kafka_kafka_2 ... done
Creating kafka_zookeeper_1 ... done
➜ Kafka git:(master) docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
d5927ffbd582 wurstmeister/kafka "start-kafka.sh" Less than a second ago Up 6 seconds 0.0.0.0:32774->9092/tcp kafka_kafka_2
17916afee832 wurstmeister/zookeeper "/bin/sh -c '/usr/sb…" Less than a second ago Up 7 seconds 22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp kafka_zookeeper_1
578c02c01fd9 wurstmeister/kafka "start-kafka.sh" Less than a second ago Up 6 seconds 0.0.0.0:32773->9092/tcp kafka_kafka_1
Кластеры Kafka двух узлов успешно запущены, и имена контейнеров, соответствующие узлам,kafka_kafka_1
иkafka_kafka_2
.
Продемонстрируйте создание и использование сообщений с помощью инструментов Cli.
Kafka официально поставляется с некоторыми инструментами cli, вы можете войти в контейнер для доступа к этим командам:
➜ Kafka git:(master) docker exec -it kafka_kafka_1 bash
bash-4.4# $KAFKA_HOME/bin/kafka-topics.sh --describe --zookeeper kafka_zookeeper_1:2181
Topic:test PartitionCount:1 ReplicationFactor:1 Configs:
Topic: test Partition: 0 Leader: 1001 Replicas: 1001 Isr: 1001
Приведенная выше команда выводит список всех тем текущего кластера Kafka.
Я сам предпочитаю получать доступ к кластеру Kafka непосредственно на хосте, для чего сначала необходимо установить kafka, доступ к которому в macOS можно получить черезbrew install kafka
установить.
Метод использования после установки аналогичен описанному выше, например, перечисление всех тем:
➜ Kafka git:(master) kafka-topics --describe --zookeeper localhost:2181
Topic:test PartitionCount:1 ReplicationFactor:1 Configs:
Topic: test Partition: 0 Leader: 1001 Replicas: 1001 Isr: 1001
Далее мы покажем, как создавать и потреблять сообщения.
Создайте новую тему:
➜ Kafka git:(master) kafka-topics --create --topic chat --partitions 3 --zookeeper localhost:2181 --replication-factor 2
Created topic "chat".
Имя вновь созданной темы — чат, количество разделов — 3, количество реплик — 2. Вы можете проверить, успешно ли создана тема, выполнив следующую команду:
➜ Kafka git:(master) kafka-topics --describe --zookeeper localhost:2181
Topic:chat PartitionCount:3 ReplicationFactor:2 Configs:
Topic: chat Partition: 0 Leader: 1001 Replicas: 1001,1002 Isr: 1001,1002
Topic: chat Partition: 1 Leader: 1002 Replicas: 1002,1001 Isr: 1002,1001
Topic: chat Partition: 2 Leader: 1001 Replicas: 1001,1002 Isr: 1001,1002
Topic:test PartitionCount:1 ReplicationFactor:1 Configs:
Topic: test Partition: 0 Leader: 1001 Replicas: 1001 Isr: 1001
Создание процессов производителя и потребителя
Как для производства, так и для потребления сообщений необходимо знать соответствующий адрес брокера. Если вы обращаетесь к нему на хосте докера, вам необходимо знать соответствующий сопоставленный порт. Мы можем получить его с помощью следующей команды:
Затем используйте следующие команды для создания производителей и потребителей сообщений соответственно:
kafka-console-producer --broker-list localhost:32773 --topic chat
kafka-console-consumer --bootstrap-server localhost:32773 --topic chat --from-beginning
Введите сообщение в источнике, и вы увидите вывод соответствующего сообщения в потребителе. Эффект показан на следующем рисунке:
в состоянии пройти<Ctrl-c>
для выхода из обоих процессов.
Введение в принцип хранения файлов
Давайте рассмотрим некоторую предыдущую информацию о тематическом чате:
Topic:chat PartitionCount:3 ReplicationFactor:2 Configs:
Topic: chat Partition: 0 Leader: 1001 Replicas: 1001,1002 Isr: 1001,1002
Topic: chat Partition: 1 Leader: 1002 Replicas: 1002,1001 Isr: 1002,1001
Topic: chat Partition: 2 Leader: 1001 Replicas: 1001,1002 Isr: 1001,1002
Из вышеизложенного видно, что узел с ID 1001 (kafka_kafka_1) хранит Лидерскую часть Раздела 0 и Раздела 2, а также хранит резервную копию Раздела 1.
Раздел распространяется на несколько узлов Kafka в соответствии со следующим алгоритмом:
- Сортировка всех N брокеров и M разделов, которые будут выделены;
- Назначить i-й Раздел (i mod N)-му Брокеру;
- Назначьте j-ю реплику i-го Раздела ((i + j) mod N)-му Брокеру.
Далее, давайте посмотрим, как хранится раздел.
Мы можем войти на узел 1001, чтобы увидеть соответствующее файловое хранилище:
➜ blog git:(hexo) ✗ docker exec -it kafka_kafka_1 bash
bash-4.4# cd /kafka/kafka-logs-578c02c01fd9/
bash-4.4# ls -d chat*
chat-0 chat-1 chat-2
Вы можете видеть, что каждый раздел соответствует каталогу, и каждый каталог содержит файл индекса и файл журнала:
bash-4.4# ls -lh chat-0
total 16
-rw-r--r-- 1 root root 10.0M May 8 20:52 00000000000000000000.index
-rw-r--r-- 1 root root 77 May 8 20:35 00000000000000000000.log
-rw-r--r-- 1 root root 10.0M May 8 20:52 00000000000000000000.timeindex
-rw-r--r-- 1 root root 10 May 8 20:52 00000000000000000001.snapshot
-rw-r--r-- 1 root root 8 May 8 20:35 leader-epoch-checkpoint
В файле журнала хранится фактическое содержимое сообщения, а в индексном файле с таким же именем хранятся индексные данные сообщения.Имя файла журнала хранит значение смещения последнего сообщения в предыдущем файле журнала.
Вы можете найти сообщение, соответствующее указанному смещению, следующим образом
- Сначала найдите соответствующий сегмент, соответствующий сегмент можно найти непосредственно с помощью двоичного поиска по имени файла;
- Затем найти положение смещения в лог-файле последовательно в индексном файле сегмента, индексный файл будет отображен в память.
Суммировать
Kafka может обеспечить лучшие возможности параллелизма, указав несколько разделов для тем, и каждый раздел распределяется по разным узлам.В то же время для разделов также можно указать соответствующее количество реплик, что значительно увеличивает емкость хранилища данных.Безопасность для предотвращения потери данных.
Дизайн помощи в поиске сообщений на основе имен файлов по-прежнему очень умен!
Когда я впервые планировал написать эту статью, я хотел объяснить это, спроектировав сцену чата. Отправитель — производитель сообщения, получатель — потребитель сообщения, и для каждого пользователя создается соответствующая тема. Позже я почувствовал, что нагрузка была немного. Если она немного больше, я сдаюсь. Может быть, я хочу изучить Kafka SDK Gosaramaбудет реализовывать этот пример.