Создание и экспериментирование среды разработки или развертывания кластера Kafka, ZK

Архитектура Kafka

Эта статья принадлежит к оригиналу, перепечатайте и укажите источник, добро пожаловать, чтобы обратить внимание на апплет WeChat小白AI博客Публичный аккаунт WeChat小白AIили сайтxiaobaiai.netили мой CSDNblog.csdn.net/freeape

[TOC]

1. Введение

О других возможностях Kafka здесь рассказывать не буду, в основном эта статья посвящена построению кластерной среды разработки. Конкретный вводной контент, связанный с Кафкой, и сопутствующий анализ можно найти в другой статье «Углубленное понимание Кафки и прилегающих областей». Kafka зависит от Zookeeper, а также требуется для одной машины и одной службы. Команда поддержки Apacke Kafka начала обсуждение удаления Zookeeper (6 ноября 2019 г.), в настоящее время Kafka использует ZooKeeper для хранения метаданных для разделов и брокеров и выбирает брокера в качестве контроллера Kafka, и мы надеемся, что устранение зависимости от ZooKeeper позволит Kafka работать более масштабируемым и надежным способом. Управление метаданными, включение поддержки больше разделов, а также упростит развертывание и настройку Kafka. Но пока нам все еще нужен Zookeeper.

Zookeeper написан на Java, поэтому сначала необходимо установить JDK.

Эта статья в основном реализует следующее содержание:

  • Одна машина, один Kafka Broker и один Zookeeper (просто запустите ZK и Kafka после установки по умолчанию)
  • Автономный кластер Kafka Broker и кластер Zookeeper
  • Многомашинный кластер Kafka Broker и кластер Zookeeper

Кроме того, Zookeeper поставляется с Kafka, поэтому здесь нет необходимости устанавливать Zookeeper, цель состоит в том, чтобы создать среду разработки. Среда установки — Ubuntu16.04.

2 Установка

2.1 OpenJDK

Здесь выберите установку OpenJDK 8:

# 软件源用的是清华的
$ sudo apt-get update
$ sudo apt-get install openjdk-8-jdk
# 多版本JDK管理和切换,怎么设置更加参考update-alternatives的用法
sudo update-alternatives --config java
# 查看安装后版本(openjdk version 1.8.0_222)
$ java -version

2.2 Версия Kafka и установка

2.2.1 Примечания к выпуску

Kafka также похожа на Hadoop, но отличается发行版:

  • Apache Kafka: Быстрая скорость итерации, высокая реакция сообщества, отсутствие расширенных функций, отсутствие инфраструктуры или инструментов мониторинга, среда мониторинга с открытым исходным кодом, такая как менеджер Kafka, коннектор (коннектор) относительно прост, нет коннектора для взаимодействия с другими внешними системами, вы нужно сделать кодирование самостоятельно;
  • Confluent Kafka: Бывшие сотрудники LinkedIn основали Confluent, сосредоточив внимание на предоставлении решений для потоковой обработки корпоративного уровня на основе Kafka, таких как резервное копирование между центрами обработки данных и Schema; Confluent Kafka в настоящее время делится на два типа: бесплатная версия и корпоративная версия. Бесплатная версия включает в себя Реестр схемы, REST Прокси-сервер имеет доступ к различным функциям и большему количеству коннекторов Kafka, но не имеет расширенных функций, таких как мониторинг кластера и резервное копирование между центрами обработки данных;
  • Cloudera/Hortonworks Kafka: его легко установить, развернуть, управлять и контролировать, но он снижает контроль над кафкой, которая управляется интерфейсом, а версия отстает от версии сообщества;

Две основные версии Kafka 1.0 и 2.0 в основном все ещеKafka StreamsРазличные улучшения не добавили слишком много важных функций в механизм сообщений. Версия 0.11 является относительно стабильной с точки зрения обработки сообщений.

Версия Kafka, опубликованная на веб-сайте Apache, содержит следующую информацию:

Поскольку Kafka написана на языке Scala, на рисунке2.11а также2.12Относится к версии Scala, что означает скомпилированную Scala 2.11 или 2.12.Kafka2.3.1Бинарная версия, а 2.3.1 — это версия Кафки.

2.2.2 Загрузите и установите

С официального сайтаkafka.apache.org/downloadsЗагрузите Kafka или загрузите его из проекта Releases на Github, Последняя версия на данный момент — 2.3.1 (выпущена 24 октября 2019 г.). Здесь мы загружаем бинарную версию напрямую, если вы загружаете исходную версию, вам нужно скомпилировать ее самостоятельно.

После декомпрессии все ок, поддержка естьLinuxа такжеWindowsзапустить скрипт.

3 Кластер и конфигурация

3.1 Конфигурация по умолчанию (один компьютер, один Kafka Broker и один ZK)

  • Файл конфигурации службы Kafka по умолчанию./config/server.properties
    • конфигурация идентификатора брокера
    • каталог вывода файла журнала/tmp/kafka-logs
    • По умолчанию количество разделов журнала на тему равно 1.
    • Связанная конфигурация количества потоков
    • Связанные настройки размера буфера приема и отправки ввода-вывода
    • Конфигурация адреса службы ZK, по умолчаниюlocalhost:2181
    • Минимальный срок хранения лог-файлов, по умолчанию 168 часов.
    • Порт прослушивания по умолчанию для службы:9092
    • ......
  • Файл конфигурации ZK по умолчанию./config/zookeeper.properties
    • каталог данных снимка/tmp/zookeeper
    • сервисный порт2181
    • Количество подключений, используемых для каждого IP-адреса клиента, ограничено до неограниченного (0)
    • ......
  • Другие файлы конфигурации (здесь не буду раскрываться, я узнаю об этом больше, когда буду использовать)
    • ./config/log4j.properties
    • ./config/connect-log4j.properties
    • ./config/producer.properties
    • ./config/consumer.properties
    • ./config/connect-console-sink.properties
    • ./config/connect-console-source.properties
    • ......

Запустите Zookeeper, состояние демона запустится:./bin/zookeeper-server-start.sh -daemon config/zookeeper.properties

Запустите Kafka и запустите в фоновом режиме:./bin/kafka-server-start.sh config/server.properties &

Уведомление: Zookeeper должен быть запущен до запуска Kafka.Как упоминалось в предисловии, Kafka зависит от ZK.

3.2 Одномашинный кластер Kafka Broker и конфигурация кластера Zookeeper

Чтобы реализовать конфигурацию одномашинного кластера Kafka Broker и кластера Zookeeper, вам нужно всего лишь запустить несколько Brokers и ZK, установить разные порты прослушивания для каждой службы и задать разные каталоги журналов (здесь три брокера используются в качестве пример):

# Kafka 3个broker配置
# kafka broker1
broker.id=1
listeners = PLAINTEXT://localhost:9092
log.dir=/data/kafka/logs-1
zookeeper.connect=localhost:2181,localhost:2182,localhost:2183
# kafka broker2
broker.id=2
listeners = PLAINTEXT://localhost:9093
log.dir=/data/kafka/logs-2
zookeeper.connect=localhost:2181,localhost:2182,localhost:2183
# kafka broker3
broker.id=3
listeners = PLAINTEXT://localhost:9094
log.dir=/data/kafka/logs-3
zookeeper.connect=localhost:2181,localhost:2182,localhost:2183
# Zookeeper 3个节点配置
# zk 1
dataDir=/tmp/zookeeper-1
clientPort=2181
maxClientCnxns=0
initLimit=5
syncLimit=2
server.1=localhost:2666:3666
server.2=localhost:2667:3667
server.3=localhost:2668:3668
# zk 2
dataDir=/tmp/zookeeper-2
clientPort=2182
maxClientCnxns=0
initLimit=5
syncLimit=2
server.1=localhost:2666:3666
server.2=localhost:2667:3667
server.3=localhost:2668:3668
# zk 3
dataDir=/tmp/zookeeper-3
clientPort=2183
maxClientCnxns=0
initLimit=5
syncLimit=2
server.1=localhost:2666:3666
server.2=localhost:2667:3667
server.3=localhost:2668:3668

иллюстрировать: server.x в конфигурации ZK, этот x — Broker ID Kafka. Zookeeper имеет три службы, одна из которых является основной службой, а две другие — подчиненными службами.Если один сервер не работает, Zookeeper автоматически выберет лидера. Сервер прослушивает три порта, как в примере выше:2181для клиентских подключений;2666для подключения с сервера (если он ведущий);3666Используется для подключения к другим серверам на этапе выбора лидера; сервер ZooKeeper работает в двух режимах: автономный и реплицированный режим (или режим кворума, реплицированный режим часто используется в производственных средах), а автономный режим означает, что есть только один сервер или только одна услуга.. Также в режиме копированияinitLimitмаксимальное количество тактов, которое может быть допустимо во время начального соединения между подчиненным (подчиненным) сервером и ведущим (главным) сервером в кластере (tickTimeчисло), при этомtickTimeЭто временной интервал для поддержания пульса между серверами Zookeeper или между клиентом и сервером.Значение по умолчанию — 3000, то есть каждыйtickTimeВремя отправит сердцебиение в миллисекундах;syncLimitмаксимальное количество тактов (количество тиков), допустимое между запросами и ответами между подчиненными и ведущими устройствами в кластере.

Запустите службу ZK по очереди:

$ ./bin/zookeeper-server-start.sh -daemon config/zookeeper1.properties
$ ./bin/zookeeper-server-start.sh -daemon config/zookeeper2.properties
$ ./bin/zookeeper-server-start.sh -daemon config/zookeeper3.properties

но сообщу/tmp/zookeeper-1/myid file is missingЭто ненормально, потому что мы сейчас развертываем несколько сервисов на одной машине.Чтобы ZK мог идентифицировать каждый сервис, нам нужноdataDirЭтот параметр соответствует созданию каталога и настройкеmyidфайл, введите номер, т.е.server.xизxЭто соответствует номеру.

После запуска всех сервисов ZK запускаем Kafka Broker один за другим:

$ ./bin/kafka-server-start.sh  config/server1.properties &
$ ./bin/kafka-server-start.sh  config/server2.properties &
$ ./bin/kafka-server-start.sh  config/server3.properties &

3.3 Многомашинный кластер Kafka Broker и конфигурация кластера Zookeeper

Конфигурация многоуровневого кластера Kafka Broker и кластера Zookeeper такая же, как и у одномашинного кластера Kafka Broker с несколькими компьютерами + кластера Zookeeper. Однако порт ZK и порт Kafka могут быть одинаковыми.Обратите внимание, что IP-адрес соединения в это время является IP-адресом каждого хоста. Если у вас нет нескольких машин, вы можете использоватьDockerдля имитации реализации.

4 Конфигурация журнала

$KAFKA_HOME/bin/kafka-run-class.sh

5 экспериментов

5.1 Публикация сообщения и подписка

Затем мы используем инструмент сценария, предоставленный в Kafka, для проверки публикации/подписки темы.Когда есть только одна машина, один Брокер и один ZK,--zookeeper,broker-listа такжеbootstrap-serverПросто укажите один.

  1. использоватьkafka-topics.shСоздание темы с одной репликой из одного разделаusers.
# 创建后,主题会持久化到本地,重启服务后还有,需要用--delete选项删除
$ ./bin/kafka-topics.sh --create --zookeeper localhost:2181,localhost:2182,localhost:2183 --replication-factor 1 --partitions 1 --topic users
# 删除主题(主题已经在订阅的过程中是无法删除的)
$ ./bin/kafka-topics.sh --delete --zookeeper localhost:2181,localhost:2182,localhost:2183 --topic users
  1. Ознакомьтесь с темами, которые мы создали
$ ./bin/kafka-topics.sh --list --zookeeper localhost:2181,localhost:2182,localhost:2183
  1. К темеusersотправлять сообщения
$ ./bin/kafka-console-producer.sh --broker-list localhost:9092,localhost:9093,localhost:9094 --topic users
> hello
> world
  1. получить сообщение и распечатать его в терминале
# --from-beginning 是指将历史未消费消息开始消费(针对同一个消费者,不同的消费者都会从最早的消息开始消费)
$ ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092,localhost:9093,localhost:9094 --topic users --from-beginning

Затем вы можете получить сообщение, отправленное производителем.

Уведомление: Кафка запустится с версии 2.2kafka-console-consumer.sh,kafka-topics.shи т.д. в сценарии−−zookeeper(С этим параметром сообщением управляет ZK) Параметр помечен как "устаревший", рекомендуется использовать−−bootstrap-serverпараметр,−−bootstrap-serverУказывается не служебный адрес zookeeper, а служебный адрес Kafka, и сообщением управляет Kafka. Очевидно, смотритель зоопарка будет постепенно заменяться. Подробное описание реплик и разделов см. в разделе «Углубленное понимание Kafka и его окружения».

5.2 Импорт и экспорт данных Kafka Connect

Роль Kafka Connect Как видно из рисунка выше, данные можно импортировать из локальной базы данных или файла через источник Kafka Connect в кластер брокеров, а затем в приемник Kafka Connect (или в указанную тему, не показанную на рисунке), а затем потребителям или другим целевым базам данных. Здесь мы показываем запись данных в локальный текстовый файл, а затем реализуем указанный выше путь к данным. нужно использоватьconnect-standalone.shилиconnect-distributed.sh(кластер Kafka Connect).

  1. Настройте файл конфигурации connect-standalone.sh./config/connect-standalone.properties, один Брокер может быть настроен без настройки
bootstrap.servers=localhost:9092,localhost:9093,localhost:9094
  1. настроитьconnect-file-source.propertiesПараметры (без каких-либо изменений, сохранить конфигурацию по умолчанию)
# 默认输入是文件流类型,这里主要是配置输入的文件名,和创建的主题
name=local-file-source                                                                                                                      
connector.class=FileStreamSource                                                                                                                             
tasks.max=1                                                                                                                                                  
file=test.txt                                                                                                                                                
topic=connect-test
  1. настроитьconnect-file-sink.propertiesПараметры (без каких-либо изменений, сохранить конфигурацию по умолчанию)
name=local-file-sink
connector.class=FileStreamSink
tasks.max=1    
file=test.sink.txt                                                                                                                                           topics=connect-test   
  1. Источник ввода в локально созданной конфигурацииtest.txt
echo -e "xiaobaiai.net\nethan" > test.txt
  1. начать импорт
$ ./bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
  1. Просмотрите результаты, файл, сгенерированный в файле конфигурации стока, будет сгенерирован локальноtest.sink.txt, а если продолжатьtest.txtЕсли содержимое введено в сгенерированный файл, содержимое также будет получено в этом сгенерированном файле или использовано напрямую.connect-testТело также может получить содержимое:
xiaobaiai.net
ethan
$ ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092,localhost:9093,localhost:9094 --topic connect-test --from-beginning
{"schema":{"type":"string","optional":false},"payload":"xiaobaiai.net"}
{"schema":{"type":"string","optional":false},"payload":"ethan"}

5.3 Эксперимент Kafka Streams

Подробнее см.Kafka.apache.org/23/document…, здесь пока не развернуто.

6 Другие инструкции по настройке Kafka

Пример:

# ---------------------------
# ./config/server.properties
# ---------------------------
# 默认设置创建主题时副本为1
default.replication.factor=1
# 指定创建主题时默认分区数为3
num.partitions=3
элемент конфигурации тип По умолчанию Пример описывать
broker.id Целое число 0 0 идентификатор брокера kafka
num.network.threads Целое число 3 3 Количество потоков, которые Kafka принимает и отправляет сообщения, зависит от количества ядер процессора на машине.
num.io.threads Целое число 8 8 Количество потоков, которые сервер использует для обработки запросов ввода-вывода (возможно, включая дисковый ввод-вывод).
socket.send.buffer.bytes Целое число 102400 102400 Параметр конфигурации SO_SNDBUF ОС Linux, если -1, использовать конфигурацию ОС по умолчанию.
socket.receive.buffer.bytes Целое число 102400 102400 Параметр конфигурации SO_RCVBUF ОС Linux, если -1, использовать конфигурацию ОС по умолчанию.
socket.request.max.bytes Целое число 104857600 104857600 Сколько байтов является максимальным размером сообщения сокета
log.dirs нить /tmp/kafka-logs /tmp/kafka-logs каталог данных кафки
num.partitions Целое число 1 1 Количество разделов по умолчанию
num.recovery.threads.per.data.dir Целое число 1 1 Количество потоков на каталог данных, используемых для восстановления журнала при запуске и сброса при завершении работы.
offsets.topic.replication.factor Целое число 1 1 При автоматическом создании темы, когда количество доступных узлов меньше этого числа, создание завершается ошибкой до тех пор, пока не будет достаточно доступных узлов.
transaction.state.log.replication.factor Целое число 1 1 Количество факторов репликации для тем транзакций
transaction.state.log.min.isr Целое число 1 1 Переопределить конфигурацию min.insync.replicas
log.retention.hours Целое число 168 168 Данные хранятся до 7 дней
log.segment.bytes Целое число 1073741824 1073741824 Тема в Kafka состоит из нескольких разделов, хранится один раздел и один файл сегмента, а новый файл сегмента будет создан при достижении порога log.segment.bytes или log.roll.hours (log.roll.ms). .
log.retention.check.interval.ms Целое число 300000 300000 интервал проверки очистки журнала, по умолчанию 5 минут
zookeeper.connect нить localhost:2181 localhost:2181 конфигурация подключения службы zookeeper
zookeeper.connection.timeout.ms Целое число 6000 6000 настройка времени ожидания соединения zk
group.initial.rebalance.delay.ms Целое число 0 0 Когда к группе присоединяются другие потребительские процессы JVM, интервал времени с момента последней перебалансировки

Больше можно найти по адресу:kafka.apache.org/document ATI…

7 Расширение

Kafka — это высокодоступный сервис с несколькими архитектурами брокеров. Тема соответствует нескольким разделам, а раздел может иметь несколько реплик. Эти реплики хранятся в нескольких брокерах для обеспечения высокой доступности. Однако, несмотря на наличие нескольких наборов реплик раздела, имеется только один рабочий набор реплик. По умолчанию первый выделенный набор реплик (предпочтительная реплика) является лидером, отвечающим за запись и чтение данных. Когда мы обновляем агент или обновляем конфигурацию агента, нам нужно перезапустить службу, а затем нам нужно перенести раздел на доступный агент. Здесь есть три случая:

  • Непосредственное завершение работы брокера: когда брокер закрывается, кластер брокера переизбирает нового брокера в качестве лидера раздела, и раздел на брокере будет недоступен в течение короткого периода времени во время выборов.
  • Включите контролируемое завершение работы: когда агент отключается, сам агент сначала попытается передать роль лидера другим доступным агентам.
  • Использование инструментов командной строки: используйтеbin/kafka-preferred-replica-election.shВручную инициировать смену роли лидера раздела

8 Резюме

Эта статья является первой частью практики.В ней реализуется построение кластерной среды разработки Kafka, проводятся эксперименты по созданию топиков, публикации сообщений и подписке.В следующей статье будет реализован Spring Boot для интеграции Kafka, продолжайте!

9 ссылок

Эта статья принадлежит к оригиналу, перепечатайте и укажите источник, добро пожаловать, чтобы обратить внимание на CSDNfreeapeили апплет WeChat小白AI博客Публичный аккаунт WeChat小白AIили сайтxiaobaiai.net