Эта статья принадлежит к оригиналу, перепечатайте и укажите источник, добро пожаловать, чтобы обратить внимание на апплет WeChat小白AI博客
Публичный аккаунт WeChat小白AI
или сайтxiaobaiai.netили мой CSDNblog.csdn.net/freeape
[TOC]
1. Введение
О других возможностях Kafka здесь рассказывать не буду, в основном эта статья посвящена построению кластерной среды разработки. Конкретный вводной контент, связанный с Кафкой, и сопутствующий анализ можно найти в другой статье «Углубленное понимание Кафки и прилегающих областей». Kafka зависит от Zookeeper, а также требуется для одной машины и одной службы. Команда поддержки Apacke Kafka начала обсуждение удаления Zookeeper (6 ноября 2019 г.), в настоящее время Kafka использует ZooKeeper для хранения метаданных для разделов и брокеров и выбирает брокера в качестве контроллера Kafka, и мы надеемся, что устранение зависимости от ZooKeeper позволит Kafka работать более масштабируемым и надежным способом. Управление метаданными, включение поддержки больше разделов, а также упростит развертывание и настройку Kafka. Но пока нам все еще нужен Zookeeper.
Zookeeper написан на Java, поэтому сначала необходимо установить JDK.
Эта статья в основном реализует следующее содержание:
- Одна машина, один Kafka Broker и один Zookeeper (просто запустите ZK и Kafka после установки по умолчанию)
- Автономный кластер Kafka Broker и кластер Zookeeper
- Многомашинный кластер Kafka Broker и кластер Zookeeper
Кроме того, Zookeeper поставляется с Kafka, поэтому здесь нет необходимости устанавливать Zookeeper, цель состоит в том, чтобы создать среду разработки. Среда установки — Ubuntu16.04.
2 Установка
2.1 OpenJDK
Здесь выберите установку OpenJDK 8:
# 软件源用的是清华的
$ sudo apt-get update
$ sudo apt-get install openjdk-8-jdk
# 多版本JDK管理和切换,怎么设置更加参考update-alternatives的用法
sudo update-alternatives --config java
# 查看安装后版本(openjdk version 1.8.0_222)
$ java -version
2.2 Версия Kafka и установка
2.2.1 Примечания к выпуску
Kafka также похожа на Hadoop, но отличается发行版
:
-
Apache Kafka
: Быстрая скорость итерации, высокая реакция сообщества, отсутствие расширенных функций, отсутствие инфраструктуры или инструментов мониторинга, среда мониторинга с открытым исходным кодом, такая как менеджер Kafka, коннектор (коннектор) относительно прост, нет коннектора для взаимодействия с другими внешними системами, вы нужно сделать кодирование самостоятельно; -
Confluent Kafka
: Бывшие сотрудники LinkedIn основали Confluent, сосредоточив внимание на предоставлении решений для потоковой обработки корпоративного уровня на основе Kafka, таких как резервное копирование между центрами обработки данных и Schema; Confluent Kafka в настоящее время делится на два типа: бесплатная версия и корпоративная версия. Бесплатная версия включает в себя Реестр схемы, REST Прокси-сервер имеет доступ к различным функциям и большему количеству коннекторов Kafka, но не имеет расширенных функций, таких как мониторинг кластера и резервное копирование между центрами обработки данных; -
Cloudera/Hortonworks Kafka
: его легко установить, развернуть, управлять и контролировать, но он снижает контроль над кафкой, которая управляется интерфейсом, а версия отстает от версии сообщества;
Две основные версии Kafka 1.0 и 2.0 в основном все ещеKafka Streams
Различные улучшения не добавили слишком много важных функций в механизм сообщений. Версия 0.11 является относительно стабильной с точки зрения обработки сообщений.
Версия Kafka, опубликованная на веб-сайте Apache, содержит следующую информацию:
Поскольку Kafka написана на языке Scala, на рисунке2.11
а также2.12
Относится к версии Scala, что означает скомпилированную Scala 2.11 или 2.12.Kafka2.3.1
Бинарная версия, а 2.3.1 — это версия Кафки.
2.2.2 Загрузите и установите
С официального сайтаkafka.apache.org/downloadsЗагрузите Kafka или загрузите его из проекта Releases на Github, Последняя версия на данный момент — 2.3.1 (выпущена 24 октября 2019 г.). Здесь мы загружаем бинарную версию напрямую, если вы загружаете исходную версию, вам нужно скомпилировать ее самостоятельно.
После декомпрессии все ок, поддержка естьLinux
а такжеWindows
запустить скрипт.
3 Кластер и конфигурация
3.1 Конфигурация по умолчанию (один компьютер, один Kafka Broker и один ZK)
- Файл конфигурации службы Kafka по умолчанию
./config/server.properties
- конфигурация идентификатора брокера
- каталог вывода файла журнала
/tmp/kafka-logs
- По умолчанию количество разделов журнала на тему равно 1.
- Связанная конфигурация количества потоков
- Связанные настройки размера буфера приема и отправки ввода-вывода
- Конфигурация адреса службы ZK, по умолчанию
localhost:2181
- Минимальный срок хранения лог-файлов, по умолчанию 168 часов.
- Порт прослушивания по умолчанию для службы:
9092
- ......
- Файл конфигурации ZK по умолчанию
./config/zookeeper.properties
- каталог данных снимка
/tmp/zookeeper
- сервисный порт
2181
- Количество подключений, используемых для каждого IP-адреса клиента, ограничено до неограниченного (0)
- ......
- каталог данных снимка
- Другие файлы конфигурации (здесь не буду раскрываться, я узнаю об этом больше, когда буду использовать)
./config/log4j.properties
./config/connect-log4j.properties
./config/producer.properties
./config/consumer.properties
./config/connect-console-sink.properties
./config/connect-console-source.properties
- ......
Запустите Zookeeper, состояние демона запустится:./bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
Запустите Kafka и запустите в фоновом режиме:./bin/kafka-server-start.sh config/server.properties &
Уведомление: Zookeeper должен быть запущен до запуска Kafka.Как упоминалось в предисловии, Kafka зависит от ZK.
3.2 Одномашинный кластер Kafka Broker и конфигурация кластера Zookeeper
Чтобы реализовать конфигурацию одномашинного кластера Kafka Broker и кластера Zookeeper, вам нужно всего лишь запустить несколько Brokers и ZK, установить разные порты прослушивания для каждой службы и задать разные каталоги журналов (здесь три брокера используются в качестве пример):
# Kafka 3个broker配置
# kafka broker1
broker.id=1
listeners = PLAINTEXT://localhost:9092
log.dir=/data/kafka/logs-1
zookeeper.connect=localhost:2181,localhost:2182,localhost:2183
# kafka broker2
broker.id=2
listeners = PLAINTEXT://localhost:9093
log.dir=/data/kafka/logs-2
zookeeper.connect=localhost:2181,localhost:2182,localhost:2183
# kafka broker3
broker.id=3
listeners = PLAINTEXT://localhost:9094
log.dir=/data/kafka/logs-3
zookeeper.connect=localhost:2181,localhost:2182,localhost:2183
# Zookeeper 3个节点配置
# zk 1
dataDir=/tmp/zookeeper-1
clientPort=2181
maxClientCnxns=0
initLimit=5
syncLimit=2
server.1=localhost:2666:3666
server.2=localhost:2667:3667
server.3=localhost:2668:3668
# zk 2
dataDir=/tmp/zookeeper-2
clientPort=2182
maxClientCnxns=0
initLimit=5
syncLimit=2
server.1=localhost:2666:3666
server.2=localhost:2667:3667
server.3=localhost:2668:3668
# zk 3
dataDir=/tmp/zookeeper-3
clientPort=2183
maxClientCnxns=0
initLimit=5
syncLimit=2
server.1=localhost:2666:3666
server.2=localhost:2667:3667
server.3=localhost:2668:3668
иллюстрировать: server.x в конфигурации ZK, этот x — Broker ID Kafka. Zookeeper имеет три службы, одна из которых является основной службой, а две другие — подчиненными службами.Если один сервер не работает, Zookeeper автоматически выберет лидера. Сервер прослушивает три порта, как в примере выше:2181
для клиентских подключений;2666
для подключения с сервера (если он ведущий);3666
Используется для подключения к другим серверам на этапе выбора лидера; сервер ZooKeeper работает в двух режимах: автономный и реплицированный режим (или режим кворума, реплицированный режим часто используется в производственных средах), а автономный режим означает, что есть только один сервер или только одна услуга.. Также в режиме копированияinitLimit
максимальное количество тактов, которое может быть допустимо во время начального соединения между подчиненным (подчиненным) сервером и ведущим (главным) сервером в кластере (tickTime
число), при этомtickTime
Это временной интервал для поддержания пульса между серверами Zookeeper или между клиентом и сервером.Значение по умолчанию — 3000, то есть каждыйtickTime
Время отправит сердцебиение в миллисекундах;syncLimit
максимальное количество тактов (количество тиков), допустимое между запросами и ответами между подчиненными и ведущими устройствами в кластере.
Запустите службу ZK по очереди:
$ ./bin/zookeeper-server-start.sh -daemon config/zookeeper1.properties
$ ./bin/zookeeper-server-start.sh -daemon config/zookeeper2.properties
$ ./bin/zookeeper-server-start.sh -daemon config/zookeeper3.properties
но сообщу/tmp/zookeeper-1/myid file is missing
Это ненормально, потому что мы сейчас развертываем несколько сервисов на одной машине.Чтобы ZK мог идентифицировать каждый сервис, нам нужноdataDir
Этот параметр соответствует созданию каталога и настройкеmyid
файл, введите номер, т.е.server.x
изx
Это соответствует номеру.
После запуска всех сервисов ZK запускаем Kafka Broker один за другим:
$ ./bin/kafka-server-start.sh config/server1.properties &
$ ./bin/kafka-server-start.sh config/server2.properties &
$ ./bin/kafka-server-start.sh config/server3.properties &
3.3 Многомашинный кластер Kafka Broker и конфигурация кластера Zookeeper
Конфигурация многоуровневого кластера Kafka Broker и кластера Zookeeper такая же, как и у одномашинного кластера Kafka Broker с несколькими компьютерами + кластера Zookeeper. Однако порт ZK и порт Kafka могут быть одинаковыми.Обратите внимание, что IP-адрес соединения в это время является IP-адресом каждого хоста. Если у вас нет нескольких машин, вы можете использоватьDocker
для имитации реализации.
4 Конфигурация журнала
$KAFKA_HOME/bin/kafka-run-class.sh
5 экспериментов
5.1 Публикация сообщения и подписка
Затем мы используем инструмент сценария, предоставленный в Kafka, для проверки публикации/подписки темы.Когда есть только одна машина, один Брокер и один ZK,--zookeeper
,broker-list
а такжеbootstrap-server
Просто укажите один.
- использовать
kafka-topics.sh
Создание темы с одной репликой из одного разделаusers
.
# 创建后,主题会持久化到本地,重启服务后还有,需要用--delete选项删除
$ ./bin/kafka-topics.sh --create --zookeeper localhost:2181,localhost:2182,localhost:2183 --replication-factor 1 --partitions 1 --topic users
# 删除主题(主题已经在订阅的过程中是无法删除的)
$ ./bin/kafka-topics.sh --delete --zookeeper localhost:2181,localhost:2182,localhost:2183 --topic users
- Ознакомьтесь с темами, которые мы создали
$ ./bin/kafka-topics.sh --list --zookeeper localhost:2181,localhost:2182,localhost:2183
- К теме
users
отправлять сообщения
$ ./bin/kafka-console-producer.sh --broker-list localhost:9092,localhost:9093,localhost:9094 --topic users
> hello
> world
- получить сообщение и распечатать его в терминале
# --from-beginning 是指将历史未消费消息开始消费(针对同一个消费者,不同的消费者都会从最早的消息开始消费)
$ ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092,localhost:9093,localhost:9094 --topic users --from-beginning
Затем вы можете получить сообщение, отправленное производителем.
Уведомление: Кафка запустится с версии 2.2
kafka-console-consumer.sh
,kafka-topics.sh
и т.д. в сценарии−−zookeeper
(С этим параметром сообщением управляет ZK) Параметр помечен как "устаревший", рекомендуется использовать−−bootstrap-server
параметр,−−bootstrap-server
Указывается не служебный адрес zookeeper, а служебный адрес Kafka, и сообщением управляет Kafka. Очевидно, смотритель зоопарка будет постепенно заменяться. Подробное описание реплик и разделов см. в разделе «Углубленное понимание Kafka и его окружения».
5.2 Импорт и экспорт данных Kafka Connect
Роль Kafka Connect Как видно из рисунка выше, данные можно импортировать из локальной базы данных или файла через источник Kafka Connect в кластер брокеров, а затем в приемник Kafka Connect (или в указанную тему, не показанную на рисунке), а затем потребителям или другим целевым базам данных. Здесь мы показываем запись данных в локальный текстовый файл, а затем реализуем указанный выше путь к данным. нужно использоватьconnect-standalone.sh
илиconnect-distributed.sh
(кластер Kafka Connect).
- Настройте файл конфигурации connect-standalone.sh
./config/connect-standalone.properties
, один Брокер может быть настроен без настройки
bootstrap.servers=localhost:9092,localhost:9093,localhost:9094
- настроить
connect-file-source.properties
Параметры (без каких-либо изменений, сохранить конфигурацию по умолчанию)
# 默认输入是文件流类型,这里主要是配置输入的文件名,和创建的主题
name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=test.txt
topic=connect-test
- настроить
connect-file-sink.properties
Параметры (без каких-либо изменений, сохранить конфигурацию по умолчанию)
name=local-file-sink
connector.class=FileStreamSink
tasks.max=1
file=test.sink.txt topics=connect-test
- Источник ввода в локально созданной конфигурации
test.txt
echo -e "xiaobaiai.net\nethan" > test.txt
- начать импорт
$ ./bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
- Просмотрите результаты, файл, сгенерированный в файле конфигурации стока, будет сгенерирован локально
test.sink.txt
, а если продолжатьtest.txt
Если содержимое введено в сгенерированный файл, содержимое также будет получено в этом сгенерированном файле или использовано напрямую.connect-test
Тело также может получить содержимое:
xiaobaiai.net
ethan
$ ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092,localhost:9093,localhost:9094 --topic connect-test --from-beginning
{"schema":{"type":"string","optional":false},"payload":"xiaobaiai.net"}
{"schema":{"type":"string","optional":false},"payload":"ethan"}
5.3 Эксперимент Kafka Streams
Подробнее см.Kafka.apache.org/23/document…, здесь пока не развернуто.
6 Другие инструкции по настройке Kafka
Пример:
# ---------------------------
# ./config/server.properties
# ---------------------------
# 默认设置创建主题时副本为1
default.replication.factor=1
# 指定创建主题时默认分区数为3
num.partitions=3
элемент конфигурации | тип | По умолчанию | Пример | описывать |
---|---|---|---|---|
broker.id | Целое число | 0 | 0 | идентификатор брокера kafka |
num.network.threads | Целое число | 3 | 3 | Количество потоков, которые Kafka принимает и отправляет сообщения, зависит от количества ядер процессора на машине. |
num.io.threads | Целое число | 8 | 8 | Количество потоков, которые сервер использует для обработки запросов ввода-вывода (возможно, включая дисковый ввод-вывод). |
socket.send.buffer.bytes | Целое число | 102400 | 102400 | Параметр конфигурации SO_SNDBUF ОС Linux, если -1, использовать конфигурацию ОС по умолчанию. |
socket.receive.buffer.bytes | Целое число | 102400 | 102400 | Параметр конфигурации SO_RCVBUF ОС Linux, если -1, использовать конфигурацию ОС по умолчанию. |
socket.request.max.bytes | Целое число | 104857600 | 104857600 | Сколько байтов является максимальным размером сообщения сокета |
log.dirs | нить | /tmp/kafka-logs | /tmp/kafka-logs | каталог данных кафки |
num.partitions | Целое число | 1 | 1 | Количество разделов по умолчанию |
num.recovery.threads.per.data.dir | Целое число | 1 | 1 | Количество потоков на каталог данных, используемых для восстановления журнала при запуске и сброса при завершении работы. |
offsets.topic.replication.factor | Целое число | 1 | 1 | При автоматическом создании темы, когда количество доступных узлов меньше этого числа, создание завершается ошибкой до тех пор, пока не будет достаточно доступных узлов. |
transaction.state.log.replication.factor | Целое число | 1 | 1 | Количество факторов репликации для тем транзакций |
transaction.state.log.min.isr | Целое число | 1 | 1 | Переопределить конфигурацию min.insync.replicas |
log.retention.hours | Целое число | 168 | 168 | Данные хранятся до 7 дней |
log.segment.bytes | Целое число | 1073741824 | 1073741824 | Тема в Kafka состоит из нескольких разделов, хранится один раздел и один файл сегмента, а новый файл сегмента будет создан при достижении порога log.segment.bytes или log.roll.hours (log.roll.ms). . |
log.retention.check.interval.ms | Целое число | 300000 | 300000 | интервал проверки очистки журнала, по умолчанию 5 минут |
zookeeper.connect | нить | localhost:2181 | localhost:2181 | конфигурация подключения службы zookeeper |
zookeeper.connection.timeout.ms | Целое число | 6000 | 6000 | настройка времени ожидания соединения zk |
group.initial.rebalance.delay.ms | Целое число | 0 | 0 | Когда к группе присоединяются другие потребительские процессы JVM, интервал времени с момента последней перебалансировки |
Больше можно найти по адресу:kafka.apache.org/document ATI…
7 Расширение
Kafka — это высокодоступный сервис с несколькими архитектурами брокеров. Тема соответствует нескольким разделам, а раздел может иметь несколько реплик. Эти реплики хранятся в нескольких брокерах для обеспечения высокой доступности. Однако, несмотря на наличие нескольких наборов реплик раздела, имеется только один рабочий набор реплик. По умолчанию первый выделенный набор реплик (предпочтительная реплика) является лидером, отвечающим за запись и чтение данных. Когда мы обновляем агент или обновляем конфигурацию агента, нам нужно перезапустить службу, а затем нам нужно перенести раздел на доступный агент. Здесь есть три случая:
- Непосредственное завершение работы брокера: когда брокер закрывается, кластер брокера переизбирает нового брокера в качестве лидера раздела, и раздел на брокере будет недоступен в течение короткого периода времени во время выборов.
- Включите контролируемое завершение работы: когда агент отключается, сам агент сначала попытается передать роль лидера другим доступным агентам.
- Использование инструментов командной строки: используйте
bin/kafka-preferred-replica-election.sh
Вручную инициировать смену роли лидера раздела
8 Резюме
Эта статья является первой частью практики.В ней реализуется построение кластерной среды разработки Kafka, проводятся эксперименты по созданию топиков, публикации сообщений и подписке.В следующей статье будет реализован Spring Boot для интеграции Kafka, продолжайте!
9 ссылок
- GitHub.com/Apache/Кафка…
- kafka.apache.org/downloads
- medium.com/@Кира на PS11/…
- http://https//segmentfault.com/q/1010000010623287
- zookeeper.apache.org/doc/th 3.1.2/…
- Woohoo.core Java Stakes.com/big data/zoo…
- zookeeper.apache.org/doc/th 3.2.2/…
- docs.confluent.IO/current/zoo…
- spring.IO/проекты/билеты…
- kafka.apache.org/quickstart
- kafka.apache.org/document ATI…
Эта статья принадлежит к оригиналу, перепечатайте и укажите источник, добро пожаловать, чтобы обратить внимание на CSDNfreeapeили апплет WeChat小白AI博客
Публичный аккаунт WeChat小白AI
или сайтxiaobaiai.net