Руководство по началу работы с Kafka | Начал с Luochen

Kafka

Kafka теперь широко используется в нашей компании, например, для сбора логов для AdServer и системы сообщений для службы Counter и т. д.

В этой статье сначала будут представлены некоторые основные концепции Kafka, затем рассказывается, как построить кластер Kafka и как его использовать, и, наконец, кратко представлен принцип реализации хранилища файлов Kafka.

Введение в основные понятия

  • BrokerЕго можно просто понимать как узел Kafka, а несколько узлов Broker составляют весь кластер Kafka;
  • Topicнабор сообщений определенного типа;
    • PartitionЭто физическая группировка темы, и несколько разделов будут храниться в разных узлах Kafka децентрализованным образом; сообщения одного раздела гарантированно будут упорядочены, но сообщения всей темы не обязательно упорядочены;
    • SegmentФайл указанного размера, содержащий содержимое сообщения, состоящий из файла индекса и файла журнала; раздел состоит из нескольких файлов сегментов.
      • OffsetЗначение индекса сообщения в файле сегмента, считая от 0
    • Replica (N)Резервная копия сообщения, что означает, что каждый Раздел будет иметь N идентичных резервных копий, и эти резервные копии будут максимально распределены и сохранены на разных машинах;
  • ProducerПубликовать новые сообщения в тему через Брокер;
  • ConsumerПолучать сообщения из Темы через Брокера;

Как использовать Кафку

Во-первых, давайте представим, как построить кластер Kafka.Мы создаем кластер из 2 узлов на основе docker-compose,здесьпредставляет собой подробный вводный документ.

Создайте кластер Kafka

Сначала напишитеdocker-compose.ymlдокумент:

version: '2'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"

  kafka:
    image: wurstmeister/kafka
    ports:
      - "9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: 192.168.99.100
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_CREATE_TOPICS: test:1:1
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock

вKAFKA_ADVERTISED_HOST_NAMEЕго необходимо заменить на ваш локальный IP-адрес, а неlocalhost 0.0.0.0такие адреса, как .KAFKA_CREATE_TOPICSЭто должно продемонстрировать, что некоторые темы по умолчанию могут быть созданы при запуске кластера Kafka;test:1:1Смысл в том, чтобы создать имя по умолчанию дляtest, Перегородка и тема с числом реплик, равным 1.

существуетdocker-compose.ymlКаталог, в котором находится файл, выполняетсяdocker-compose up -d --scale kafka=2Кластер Kafka с двумя узлами будет запущен локально:

➜  Kafka git:(master) docker-compose up -d --scale kafka=2
Creating network "kafka_default" with the default driver
Creating kafka_kafka_1     ... done
Creating kafka_kafka_2     ... done
Creating kafka_zookeeper_1 ... done
➜  Kafka git:(master) docker ps
CONTAINER ID        IMAGE                    COMMAND                  CREATED                  STATUS              PORTS                                                NAMES
d5927ffbd582        wurstmeister/kafka       "start-kafka.sh"         Less than a second ago   Up 6 seconds        0.0.0.0:32774->9092/tcp                              kafka_kafka_2
17916afee832        wurstmeister/zookeeper   "/bin/sh -c '/usr/sb…"   Less than a second ago   Up 7 seconds        22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp   kafka_zookeeper_1
578c02c01fd9        wurstmeister/kafka       "start-kafka.sh"         Less than a second ago   Up 6 seconds        0.0.0.0:32773->9092/tcp                              kafka_kafka_1

Кластеры Kafka двух узлов успешно запущены, и имена контейнеров, соответствующие узлам,kafka_kafka_1иkafka_kafka_2.

Продемонстрируйте создание и использование сообщений с помощью инструментов Cli.

Kafka официально поставляется с некоторыми инструментами cli, вы можете войти в контейнер для доступа к этим командам:

➜  Kafka git:(master) docker exec -it kafka_kafka_1 bash
bash-4.4# $KAFKA_HOME/bin/kafka-topics.sh --describe --zookeeper kafka_zookeeper_1:2181
Topic:test      PartitionCount:1        ReplicationFactor:1     Configs:
        Topic: test     Partition: 0    Leader: 1001    Replicas: 1001  Isr: 1001

Приведенная выше команда выводит список всех тем текущего кластера Kafka.

Я сам предпочитаю получать доступ к кластеру Kafka непосредственно на хосте, для чего сначала необходимо установить kafka, доступ к которому в macOS можно получить черезbrew install kafkaустановить.

Метод использования после установки аналогичен описанному выше, например, перечисление всех тем:

➜  Kafka git:(master) kafka-topics --describe --zookeeper localhost:2181
Topic:test      PartitionCount:1        ReplicationFactor:1     Configs:
        Topic: test     Partition: 0    Leader: 1001    Replicas: 1001  Isr: 1001

Далее мы покажем, как создавать и потреблять сообщения.

Создайте новую тему:

➜  Kafka git:(master) kafka-topics --create --topic chat --partitions 3 --zookeeper localhost:2181 --replication-factor 2
Created topic "chat".

Имя вновь созданной темы — чат, количество разделов — 3, количество реплик — 2. Вы можете проверить, успешно ли создана тема, выполнив следующую команду:

➜  Kafka git:(master) kafka-topics --describe --zookeeper localhost:2181
Topic:chat      PartitionCount:3        ReplicationFactor:2     Configs:
        Topic: chat     Partition: 0    Leader: 1001    Replicas: 1001,1002     Isr: 1001,1002
        Topic: chat     Partition: 1    Leader: 1002    Replicas: 1002,1001     Isr: 1002,1001
        Topic: chat     Partition: 2    Leader: 1001    Replicas: 1001,1002     Isr: 1001,1002
Topic:test      PartitionCount:1        ReplicationFactor:1     Configs:
        Topic: test     Partition: 0    Leader: 1001    Replicas: 1001  Isr: 1001

Создание процессов производителя и потребителя

Как для производства, так и для потребления сообщений необходимо знать соответствующий адрес брокера. Если вы обращаетесь к нему на хосте докера, вам необходимо знать соответствующий сопоставленный порт. Мы можем получить его с помощью следующей команды:

Затем используйте следующие команды для создания производителей и потребителей сообщений соответственно:

kafka-console-producer --broker-list localhost:32773 --topic chat
kafka-console-consumer --bootstrap-server localhost:32773 --topic chat --from-beginning

Введите сообщение в источнике, и вы увидите вывод соответствующего сообщения в потребителе. Эффект показан на следующем рисунке:

в состоянии пройти<Ctrl-c>для выхода из обоих процессов.

Введение в принцип хранения файлов

Давайте рассмотрим некоторую предыдущую информацию о тематическом чате:

Topic:chat      PartitionCount:3        ReplicationFactor:2     Configs:
        Topic: chat     Partition: 0    Leader: 1001    Replicas: 1001,1002     Isr: 1001,1002
        Topic: chat     Partition: 1    Leader: 1002    Replicas: 1002,1001     Isr: 1002,1001
        Topic: chat     Partition: 2    Leader: 1001    Replicas: 1001,1002     Isr: 1001,1002

Из вышеизложенного видно, что узел с ID 1001 (kafka_kafka_1) хранит Лидерскую часть Раздела 0 и Раздела 2, а также хранит резервную копию Раздела 1.

Раздел распространяется на несколько узлов Kafka в соответствии со следующим алгоритмом:

  • Сортировка всех N брокеров и M разделов, которые будут выделены;
  • Назначить i-й Раздел (i mod N)-му Брокеру;
  • Назначьте j-ю реплику i-го Раздела ((i + j) mod N)-му Брокеру.

Далее, давайте посмотрим, как хранится раздел.

Мы можем войти на узел 1001, чтобы увидеть соответствующее файловое хранилище:

➜  blog git:(hexo) ✗ docker exec -it kafka_kafka_1 bash
bash-4.4# cd /kafka/kafka-logs-578c02c01fd9/
bash-4.4# ls -d chat*
chat-0  chat-1  chat-2

Вы можете видеть, что каждый раздел соответствует каталогу, и каждый каталог содержит файл индекса и файл журнала:

bash-4.4# ls -lh chat-0
total 16
-rw-r--r--    1 root     root       10.0M May  8 20:52 00000000000000000000.index
-rw-r--r--    1 root     root          77 May  8 20:35 00000000000000000000.log
-rw-r--r--    1 root     root       10.0M May  8 20:52 00000000000000000000.timeindex
-rw-r--r--    1 root     root          10 May  8 20:52 00000000000000000001.snapshot
-rw-r--r--    1 root     root           8 May  8 20:35 leader-epoch-checkpoint

В файле журнала хранится фактическое содержимое сообщения, а в индексном файле с таким же именем хранятся индексные данные сообщения.Имя файла журнала хранит значение смещения последнего сообщения в предыдущем файле журнала.

Вы можете найти сообщение, соответствующее указанному смещению, следующим образом

  • Сначала найдите соответствующий сегмент, соответствующий сегмент можно найти непосредственно с помощью двоичного поиска по имени файла;
  • Затем найти положение смещения в лог-файле последовательно в индексном файле сегмента, индексный файл будет отображен в память.

Суммировать

Kafka может обеспечить лучшие возможности параллелизма, указав несколько разделов для тем, и каждый раздел распределяется по разным узлам.В то же время для разделов также можно указать соответствующее количество реплик, что значительно увеличивает емкость хранилища данных.Безопасность для предотвращения потери данных.

Дизайн помощи в поиске сообщений на основе имен файлов по-прежнему очень умен!

Когда я впервые планировал написать эту статью, я хотел объяснить это, спроектировав сцену чата. Отправитель — производитель сообщения, получатель — потребитель сообщения, и для каждого пользователя создается соответствующая тема. Позже я почувствовал, что нагрузка была немного. Если она немного больше, я сдаюсь. Может быть, я хочу изучить Kafka SDK Gosaramaбудет реализовывать этот пример.

использованная литература