Всем привет, меня зовут Се Вэй, я программист.
Сегодняшняя тема: Руководство по началу работы с Kafka — Cluster Edition
Обзор помещения:кафка с одним узлом
1. Основные понятия
В системе сообщений задействованные концепции относительно схожи.При изучении системы сообщений концепции иногда плохо понимаются, и читатели должны возвращаться и неоднократно разъяснять основные концепции в соответствии с их собственным прогрессом в обучении.
Основные понятия представлены в формате вопросов и ответов ниже:
- Что такое брокер?
Проще говоря, сервер kafka — это брокер.
- Что такое продюсер-продюсер?
Проще говоря, система, предоставляющая сообщения, называется производителем.
- Что такое потребительский потребитель?
Проще говоря, система, которая обрабатывает сообщения, называется потребителем.
- Что такое тема?
Проще говоря, чтобы различать разные типы сообщений, даются искусственные имена, поэтому тема является логическим понятием.
- Что такое раздел раздела?
Проще говоря, это сущность, хранящая сообщения, то есть разделяющая тему на разные разделы. На физическом уровне это папка, названная по теме-N, и журнал сообщений хранится в папке. Конечно, партиция может быть на одном брокере или на разных брокерах, если вы используете кластерную версию kafka.
topic-go-0
topic-go-1
topic-go-2
...
- Что такое смещение?
Проще говоря, это число, представляющее смещение. Используется для обозначения потребителей. Например, если я отправлю вам 100 сообщений, как я узнаю, сколько вы израсходовали?
- Что такое потребительская группа?
Проще говоря, группа потребителей вместе потребляет одну или несколько тем.Конечно, потребитель потребляет сообщения в одном или нескольких разделах. Зачем нужны потребители и группы потребителей? Чтобы использовать сообщения, потребители должны подписаться на тему, а группа потребителей использует одну или несколько тем одновременно.Эффектом этого является масштабируемость и отказоустойчивость. Масштабируемость означает, что добавление нового потребителя может взять на себя некоторые задачи и снизить нагрузку на других потребителей; точно так же уменьшить потребителя и перераспределить сообщения потребителям. Этот механизм распределения называется в системе kafka: Rebalance, динамическая настройка.
Так когда будет ребаланс?
- Изменение количества потребителей
- смена темы
- изменения зонирования
Среди них изменение количества потребителей является наиболее распространенным сценарием. Ребаланс имеет свои преимущества и недостатки Достоинства: масштабируемость и отказоустойчивость Недостатки: Ребаланс потребляет больше производительности и в определенный момент перестанет потреблять сообщения.
- Что такое кафка-кластер?
Проще говоря, кластер — это набор сервисов, типичная особенность: несколько машин и несколько сервисов. Эта функция может обеспечить высокую доступность и высокий уровень параллелизма системы. Внутри системы могут обнаруживать друг друга через zookeeper, метаданные и т. д., внешне это похоже на использование одного сервиса.
- Как контролировать размер «емкости»?
Файлы конфигурации, например, как я могу гарантировать, что производитель может отправлять сообщения точно, например, несколько разделов, какой стратегии разделения я придерживаюсь, например, следует ли сжимать сообщения производителя и какой метод сжатия использовать; например, потребители потребляют от самого последнего или самого старого потребления сообщений; например, какова стратегия перебалансировки группы потребителей?
Эти характеристики, я называю это размером способности, размером этих способностей, пользователь должен быть достаточно хорошо знаком, чтобы играть своими способностями или уметь анализировать конкретные проблемы.
- Конфигурация «возможностей» брокера
- Конфигурация «возможностей» производителя
- Конфигурация потребительских «возможностей»
- Конфигурация группы потребителей «Возможности»
2. Конфигурация
Файл конфигурации при запуске службы также является общим способом запуска большинства служб, таких как службы базы данных MySQL, такие как службы Redis и т. д., которые все настраиваются при запуске, чтобы предоставить им возможности.
broker
# 目录
config/server.properties
- каталог хранения сообщений log.dirs, может быть несколько
log.dirs=/kafka/kafka-logs-kfk1
- zookeeper.connect , может быть несколько, используется в кластерном режиме
zookeeper.connect=zookeeper-1:2181
- рекламируемый.listeners внешний адрес
advertised.listeners=PLAINTEXT://kfk1:9092
- протокол безопасности listener.security.protocol.map
listener.security.protocol.map=CONTROLLER:PLAINTEXT
Как правило, эти конфигурации подходят, другие значения по умолчанию, среди которых log.dirs, zookeeper.connect, являются наиболее важными.
topic
- auto.create.topics.enable разрешить ли автоматическое создание тем
auto.create.topics.enable=false
После запуска службы, как правило, через клиентский инструмент, напишите код для завершения соответствующих настроек.
В go клиент kafka использует:sarama
type config struct {
Producer struct {
...
}
Consumer struct {
...
Group struct {
...
}
}
}
- Настройте для потребителей, настройте config.Consumer
- Настройте для производителя, настройте config.Producer
- Настройте группы потребителей, настройте config.Consumer.Group
потребитель:
c.Consumer.Fetch.Min = 1
c.Consumer.Fetch.Default = 1024 * 1024
c.Consumer.Retry.Backoff = 2 * time.Second
c.Consumer.MaxWaitTime = 250 * time.Millisecond
c.Consumer.MaxProcessingTime = 100 * time.Millisecond
c.Consumer.Return.Errors = false
c.Consumer.Offsets.CommitInterval = 1 * time.Second
c.Consumer.Offsets.Initial = OffsetNewest
c.Consumer.Offsets.Retry.Max = 3
Среди них общий по умолчанию, иначе настройте:
- Возвращать ли ошибки: c.Consumer.Return.Errors
- Начальное значение потребления: c.Consumer.Offsets.Initial
- Механизм повтора: Повторить
Режиссер:
// 消息的最大值大概 1MB
c.Producer.MaxMessageBytes = 1000000
// 消息是否应答:0: 不应答,禁用;1: leader 收到即可 ; -1: 所有的副本都收到
c.Producer.RequiredAcks = WaitForLocal
c.Producer.Timeout = 10 * time.Second
// 分区策略:随机、轮询、hash 等
c.Producer.Partitioner = NewHashPartitioner
// 重试机制
c.Producer.Retry.Max = 3
c.Producer.Retry.Backoff = 100 * time.Millisecond
c.Producer.Return.Errors = true
// 压缩算法:gzip, zstd, lz4, snappy
c.Producer.CompressionLevel = CompressionLevelDefault
Группа потребителей:
// 间隔
c.Consumer.Group.Session.Timeout = 10 * time.Second
// 心跳
c.Consumer.Group.Heartbeat.Interval = 3 * time.Second
// Rebalance 策略
c.Consumer.Group.Rebalance.Strategy = BalanceStrategyRange
c.Consumer.Group.Rebalance.Timeout = 60 * time.Second
c.Consumer.Group.Rebalance.Retry.Max = 4
c.Consumer.Group.Rebalance.Retry.Backoff = 2 * time.Second
3. Группа потребителей
Обычные потребители обычно должны указать тему и смещение, чтобы указать потребление:
Например:
config := sarama.NewConfig()
config.Consumer.Return.Errors = true
brokers := []string{"127.0.0.1:9092"}
master, err := sarama.NewConsumer(brokers, config)
consumer, err := master.ConsumePartition("topic-python", 0, sarama.OffsetNewest)
в:
ConsumePartition(topic string, partition int32, offset int64) (PartitionConsumer, error)
- topic
- partition
- offset
Но вообще в такой форме нужно указывать смещение, что неудобно в использовании. Поэтому обычно используют форму потребительских групп.
type KafkaConsumerGroupAction struct {
group sarama.ConsumerGroup
}
func NewKafkaConsumerGroupAction(brokers []string, groupId string) *KafkaConsumerGroupAction {
config := sarama.NewConfig()
sarama.Logger = log.New(os.Stdout, "[consumer_group]", log.Lshortfile)
// 重平衡策略
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategySticky
config.Consumer.Group.Session.Timeout = 20 * time.Second
config.Consumer.Group.Heartbeat.Interval = 6 * time.Second
config.Consumer.IsolationLevel = sarama.ReadCommitted
config.Consumer.Offsets.Initial = sarama.OffsetNewest
config.Version = sarama.V2_3_0_0
consumerGroup, e := sarama.NewConsumerGroup(brokers, groupId, config)
if e != nil {
log.Println(e)
return nil
}
return &KafkaConsumerGroupAction{group: consumerGroup}
}
func (K *KafkaConsumerGroupAction) Consume(topics []string, wg sync.WaitGroup, ctx context.Context) {
var consumer = KafkaConsumerGroupHandler{ready: make(chan bool)}
go func() {
defer wg.Done()
for {
if err := K.group.Consume(ctx, topics, &consumer); err != nil {
log.Panicf("Error from consumer: %v", err)
}
if ctx.Err() != nil {
return
}
consumer.ready = make(chan bool)
}
}()
<-consumer.ready
log.Println("Sarama consumer up and running!...")
sigterm := make(chan os.Signal, 1)
signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
select {
case <-ctx.Done():
log.Println("terminating: context cancelled")
case <-sigterm:
log.Println("terminating: via signal")
}
wg.Wait()
if err := K.group.Close(); err != nil {
log.Panicf("Error closing client: %v", err)
}
}
type KafkaConsumerGroupHandler struct {
ready chan bool
}
func (K *KafkaConsumerGroupHandler) Setup(sarama.ConsumerGroupSession) error {
return nil
}
func (K *KafkaConsumerGroupHandler) Cleanup(sarama.ConsumerGroupSession) error {
return nil
}
func (K *KafkaConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for message := range claim.Messages() {
log.Printf("Message claimed: value = %s, timestamp = %v, topic = %s, partions = %d, offset = %d", string(message.Value), message.Timestamp, message.Topic, message.Partition, message.Offset)
lag := claim.HighWaterMarkOffset() - message.Offset
fmt.Println(lag)
session.MarkMessage(message, "")
}
return nil
}
Группа потребителей:
type ConsumerGroup interface {
Consume(ctx context.Context, topics []string, handler ConsumerGroupHandler) error
Errors() <-chan error
Close() error
}
в:
type ConsumerGroupHandler interface {
Setup(ConsumerGroupSession) error
Cleanup(ConsumerGroupSession) error
ConsumeClaim(ConsumerGroupSession, ConsumerGroupClaim) error
}
Для реальной обработки сообщений необходимо реализовать интерфейс ConsumerGroupHandler.
4. Общий поток обработки производителя
Если вам понятны эти концепции, в чем сложность использования kafka в целом?
- Как убедиться, что сообщения отправляются точно
- Как убедиться, что сообщения не используются повторно
- Как сделать так, чтобы сообщение не лагало, лучше всего, чтобы производитель отправлял его в систему сообщений, а потребитель потреблял его сразу без расширения
- Как обеспечить высокую доступность системы
- Конфигурация производителя
- создать экземпляр производителя
- создать сообщение
- отправлять сообщения
- Закройте экземпляр производителя
func NewAsyncProducer(addrs []string, conf *Config) (AsyncProducer, error) {
client, err := NewClient(addrs, conf)
if err != nil {
return nil, err
}
return newAsyncProducer(client)
}
//异步生产者
type AsyncProducer interface {
AsyncClose()
Close() error
Input() chan<- *ProducerMessage // 发送消息
Successes() <-chan *ProducerMessage
Errors() <-chan *ProducerError
}
5. Общий процесс обработки для потребителей
Общий поток обработки потребителей:
- Конфигурация потребителя
- создать экземпляр потребителя
- Подписывайтесь на темы
- Совершить смещение
- близкий потребитель
func NewConsumer(addrs []string, config *Config) (Consumer, error) {
client, err := NewClient(addrs, config)
if err != nil {
return nil, err
}
return newConsumer(client)
}
type Consumer interface {
Topics() ([]string, error) // 消息
Partitions(topic string) ([]int32, error) // 分区
ConsumePartition(topic string, partition int32, offset int64) (PartitionConsumer, error) // 消费消息
HighWaterMarks() map[string]map[int32]int64 // 高水位
Close() error
}
6. Общий поток обработки групп потребителей
Обычные потребители должны указывать разделы и смещения для потребления, которые обычно не используются. Обычно выбирают группу потребителей.
Итак, каков общий процесс обработки группы потребителей?
- Настроить группы потребителей
- Группа потребителей экземпляра, укажите тему, укажите GroupID группы потребителей
- Использование сообщений
- закрытая группа потребителей
type ConsumerGroup interface {
Consume(ctx context.Context, topics []string, handler ConsumerGroupHandler) error
Errors() <-chan error
Close() error
}
Процессор потребительской группы:
type ConsumerGroupHandler interface {
Setup(ConsumerGroupSession) error
Cleanup(ConsumerGroupSession) error
ConsumeClaim(ConsumerGroupSession, ConsumerGroupClaim) error
}
7. Кластер
Как упоминалось выше, одной из характеристик кластера является наличие нескольких машин и нескольких служб.
В реальной онлайн-среде zookeeper развернут на разных машинах, а сервер kafka развернут на разных машинах, образуя систему, которая вместе обслуживает онлайн-систему.
Личное обучение, чтобы добиться эффекта кластеризации, то есть: использовать разные порты для различения.
Конечно, вы можете настроить zookeeper и kafka локально. Но мне вообще нравится использовать контейнерный метод, который удобно разворачивать.
- многоузловой зоопарк
zookeeper-1:
image: zookeeper
restart: always
hostname: zookeeper-1
ports:
- 2181:2181
environment:
ZOO_MY_ID: 1
ZOO_SERVERS: server.1=0.0.0.0:2888:3888;2181 server.2=zookeeper-2:2888:3888;2181 server.3=zookeeper-3:2888:3888;2181
volumes:
- /local/volumn/zookeeper1/data:/data
- /local/volumn/zookeeper1/datalog:/datalog
zookeeper-2:
image: zookeeper
restart: always
hostname: zookeeper-2
ports:
- 2182:2181
environment:
ZOO_MY_ID: 2
ZOO_SERVERS: server.1=zookeeper-1:2888:3888;2181 server.2=0.0.0.0:2888:3888;2181 server.3=zookeeper-3:2888:3888;2181
volumes:
- /local/volumn/zookeeper2/data:/data
- /local/volumn/zookeeper2/datalog:/datalog
zookeeper-3:
image: zookeeper
restart: always
hostname: zookeeper-3
ports:
- 2183:2181
environment:
ZOO_MY_ID: 3
ZOO_SERVERS: server.1=zookeeper-1:2888:3888;2181 server.2=zookeeper-2:2888:3888;2181 server.3=0.0.0.0:2888:3888;2181
volumes:
- /local/volumn/zookeeper3/data:/data
- /local/volumn/zookeeper3/datalog:/datalog
Наиболее важными из них являются переменные среды:
ZOO_MY_ID 一般用一个数字表示 myid
ZOO_SERVERS
Абстрактная формула: server.A=B:C:D
- А означает myid, что означает номер сервера
- B представляет собой IP-адрес, представляющий сервер
- C представляет собой порт, через который сервер обменивается информацией с ведущим сервером в кластере.
- D представляет собой порт, через который серверы взаимодействуют друг с другом во время выборов.
Некоторые люди скажут: я не знаю, что делать с этими переменными среды, и я не знаю конкретных имен переменных среды?
См. конкретную документацию на хабе Docker:
Документация докер-хаба zookeeper:hub.docker.com/_/zookeeper
- Многоузловая кафка: (адрес концентратора докеров кафки:Hubei.docker.com/day/pollution экологическая еда…)
kfk1:
image: index.docker.io/wurstmeister/kafka:latest
container_name: kfk1
hostname: kfk1
restart: always
ports:
- 9092:9092
- 19999:9999
expose:
- 19092
links:
- zookeeper-1
- zookeeper-2
- zookeeper-3
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kfk1:9092
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
JMX_PORT: 9999
volumes:
- /local/volumn/kfk1:/kafka/kafka-logs-kfk1
kfk2:
image: index.docker.io/wurstmeister/kafka:latest
container_name: kfk2
hostname: kfk2
restart: always
ports:
- 29092:29092
- 29999:9999
expose:
- 29092
links:
- zookeeper-1
- zookeeper-2
- zookeeper-3
environment:
KAFKA_BROKER_ID: 2
KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kfk2:29092
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:29092
JMX_PORT: 9999
volumes:
- /local/volumn/kfk2:/kafka/kafka-logs-kfk2
kfk3:
image: index.docker.io/wurstmeister/kafka:latest
container_name: kfk3
hostname: kfk3
restart: always
ports:
- 39092:39092
- 39999:9999
expose:
- 39092
links:
- zookeeper-1
- zookeeper-2
- zookeeper-3
environment:
KAFKA_BROKER_ID: 3
KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kfk3:39092
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:39092
JMX_PORT: 9999
volumes:
- /local/volumn/kfk3:/kafka/kafka-logs-kfk3
Наиболее важными из них являются следующие переменные среды:
KAFKA_BROKER_ID broker.id 单节点时,默认值为-1
KAFKA_ZOOKEEPER_CONNECT kafka zookeeper 连接地址,对应上文 zookeeper 对外地址
KAFKA_ADVERTISED_LISTENERS 该节点对外公布的访问地址和端口
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:39092 该节点对外访问地址和端口
- Узел мониторинга (адрес докер-хаба kafka-manager:Hubei.docker.com/day/убить овец…)
ui:
image: index.docker.io/sheepkiller/kafka-manager:latest
restart: always
links:
- zookeeper-1
- zookeeper-2
- zookeeper-3
- kfk1
- kfk2
- kfk3
ports:
- 9000:9000
environment:
ZK_HOSTS: zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181
KAFKA_BROKERS: kfk1:19092,kfk2:29092,kfk3:39092
Где переменные среды:
ZK_HOSTS zookeeper 节点地址
KAFKA_BROKERS kafa 节点地址
запускать:
docker-compose -f docker-compose.yml up -d
Кластерная версия службы kafka в основном такая же, как и служба kafka с одним узлом.Кластерная версия системы более надежна и высокодоступна, например, с избыточным резервным копированием.Сбой узла не влияет на службу, если только все узлы выходят из строя.
- Резервный:
Для создания темы количество резервных копий меньше или равно количеству узлов кафки. Например, есть три узла, две резервные копии, а на трех узлах могут быть любые два.
- раздел
Для одного узла все разделы топика находятся в одной папке, для кластерного варианта разделы могут быть примерно равномерно распределены по узлам кластера
Внешний сервис точно такой же, как у одного узла.
Тема-го 10 разделов, 2 резервные копии: три узла хранятся отдельно: 6, 7, 7 разделы
Возможные проблемы с версией кластера?
-
Было установлено, что темы не создаются автоматически, не забудьте сначала создать темы вручную.
-
Адрес доступа к кластеру заблокирован. 1 Настройте /etc/hosts 2 Откройте порты, особенно облачные серверы, не забудьте открыть порты
-
Лаги расхода Лаги, что делать? Добавить экземпляр потребителя
Ссылаться на:
Кодовый адрес: