Руководство по началу работы с Kafka: Cluster Edition

Kafka

Всем привет, меня зовут Се Вэй, я программист.

Сегодняшняя тема: Руководство по началу работы с Kafka — Cluster Edition

Обзор помещения:кафка с одним узлом

1. Основные понятия

В системе сообщений задействованные концепции относительно схожи.При изучении системы сообщений концепции иногда плохо понимаются, и читатели должны возвращаться и неоднократно разъяснять основные концепции в соответствии с их собственным прогрессом в обучении.

Основные понятия представлены в формате вопросов и ответов ниже:

  1. Что такое брокер?

Проще говоря, сервер kafka — это брокер.

  1. Что такое продюсер-продюсер?

Проще говоря, система, предоставляющая сообщения, называется производителем.

  1. Что такое потребительский потребитель?

Проще говоря, система, которая обрабатывает сообщения, называется потребителем.

  1. Что такое тема?

Проще говоря, чтобы различать разные типы сообщений, даются искусственные имена, поэтому тема является логическим понятием.

  1. Что такое раздел раздела?

Проще говоря, это сущность, хранящая сообщения, то есть разделяющая тему на разные разделы. На физическом уровне это папка, названная по теме-N, и журнал сообщений хранится в папке. Конечно, партиция может быть на одном брокере или на разных брокерах, если вы используете кластерную версию kafka.

topic-go-0
topic-go-1
topic-go-2
...
  1. Что такое смещение?

Проще говоря, это число, представляющее смещение. Используется для обозначения потребителей. Например, если я отправлю вам 100 сообщений, как я узнаю, сколько вы израсходовали?

  1. Что такое потребительская группа?

Проще говоря, группа потребителей вместе потребляет одну или несколько тем.Конечно, потребитель потребляет сообщения в одном или нескольких разделах. Зачем нужны потребители и группы потребителей? Чтобы использовать сообщения, потребители должны подписаться на тему, а группа потребителей использует одну или несколько тем одновременно.Эффектом этого является масштабируемость и отказоустойчивость. Масштабируемость означает, что добавление нового потребителя может взять на себя некоторые задачи и снизить нагрузку на других потребителей; точно так же уменьшить потребителя и перераспределить сообщения потребителям. Этот механизм распределения называется в системе kafka: Rebalance, динамическая настройка.

Так когда будет ребаланс?

  • Изменение количества потребителей
  • смена темы
  • изменения зонирования

Среди них изменение количества потребителей является наиболее распространенным сценарием. Ребаланс имеет свои преимущества и недостатки Достоинства: масштабируемость и отказоустойчивость Недостатки: Ребаланс потребляет больше производительности и в определенный момент перестанет потреблять сообщения.

  1. Что такое кафка-кластер?

Проще говоря, кластер — это набор сервисов, типичная особенность: несколько машин и несколько сервисов. Эта функция может обеспечить высокую доступность и высокий уровень параллелизма системы. Внутри системы могут обнаруживать друг друга через zookeeper, метаданные и т. д., внешне это похоже на использование одного сервиса.

  1. Как контролировать размер «емкости»?

Файлы конфигурации, например, как я могу гарантировать, что производитель может отправлять сообщения точно, например, несколько разделов, какой стратегии разделения я придерживаюсь, например, следует ли сжимать сообщения производителя и какой метод сжатия использовать; например, потребители потребляют от самого последнего или самого старого потребления сообщений; например, какова стратегия перебалансировки группы потребителей?

Эти характеристики, я называю это размером способности, размером этих способностей, пользователь должен быть достаточно хорошо знаком, чтобы играть своими способностями или уметь анализировать конкретные проблемы.

  • Конфигурация «возможностей» брокера
  • Конфигурация «возможностей» производителя
  • Конфигурация потребительских «возможностей»
  • Конфигурация группы потребителей «Возможности»

2. Конфигурация

Файл конфигурации при запуске службы также является общим способом запуска большинства служб, таких как службы базы данных MySQL, такие как службы Redis и т. д., которые все настраиваются при запуске, чтобы предоставить им возможности.

broker

# 目录
config/server.properties
  • каталог хранения сообщений log.dirs, может быть несколько
log.dirs=/kafka/kafka-logs-kfk1
  • zookeeper.connect , может быть несколько, используется в кластерном режиме
zookeeper.connect=zookeeper-1:2181
  • рекламируемый.listeners внешний адрес
advertised.listeners=PLAINTEXT://kfk1:9092
  • протокол безопасности listener.security.protocol.map
listener.security.protocol.map=CONTROLLER:PLAINTEXT

Как правило, эти конфигурации подходят, другие значения по умолчанию, среди которых log.dirs, zookeeper.connect, являются наиболее важными.

topic

  • auto.create.topics.enable разрешить ли автоматическое создание тем
auto.create.topics.enable=false

После запуска службы, как правило, через клиентский инструмент, напишите код для завершения соответствующих настроек.

В go клиент kafka использует:sarama

type config struct {
    Producer struct {
        ...
    }
    Consumer struct {
        ...
        Group struct {
        ...
        }
    }
}
  • Настройте для потребителей, настройте config.Consumer
  • Настройте для производителя, настройте config.Producer
  • Настройте группы потребителей, настройте config.Consumer.Group

потребитель:

	c.Consumer.Fetch.Min = 1
	c.Consumer.Fetch.Default = 1024 * 1024
	c.Consumer.Retry.Backoff = 2 * time.Second
	c.Consumer.MaxWaitTime = 250 * time.Millisecond
	c.Consumer.MaxProcessingTime = 100 * time.Millisecond
	c.Consumer.Return.Errors = false
	c.Consumer.Offsets.CommitInterval = 1 * time.Second
	c.Consumer.Offsets.Initial = OffsetNewest
	c.Consumer.Offsets.Retry.Max = 3

Среди них общий по умолчанию, иначе настройте:

  • Возвращать ли ошибки: c.Consumer.Return.Errors
  • Начальное значение потребления: c.Consumer.Offsets.Initial
  • Механизм повтора: Повторить

Режиссер:

    //  消息的最大值大概 1MB
	c.Producer.MaxMessageBytes = 1000000
    // 消息是否应答:0: 不应答,禁用;1: leader 收到即可 ; -1: 所有的副本都收到
	c.Producer.RequiredAcks = WaitForLocal
    
	c.Producer.Timeout = 10 * time.Second
    
    // 分区策略:随机、轮询、hash 等
	c.Producer.Partitioner = NewHashPartitioner
    // 重试机制
	c.Producer.Retry.Max = 3
	c.Producer.Retry.Backoff = 100 * time.Millisecond
	c.Producer.Return.Errors = true
    
    // 压缩算法:gzip, zstd, lz4, snappy
	c.Producer.CompressionLevel = CompressionLevelDefault

Группа потребителей:

    // 间隔
	c.Consumer.Group.Session.Timeout = 10 * time.Second
    
    // 心跳
	c.Consumer.Group.Heartbeat.Interval = 3 * time.Second
    
    // Rebalance 策略
	c.Consumer.Group.Rebalance.Strategy = BalanceStrategyRange
	c.Consumer.Group.Rebalance.Timeout = 60 * time.Second
	c.Consumer.Group.Rebalance.Retry.Max = 4
	c.Consumer.Group.Rebalance.Retry.Backoff = 2 * time.Second

3. Группа потребителей

Обычные потребители обычно должны указать тему и смещение, чтобы указать потребление:

Например:

	config := sarama.NewConfig()
	config.Consumer.Return.Errors = true
	brokers := []string{"127.0.0.1:9092"}
	master, err := sarama.NewConsumer(brokers, config)
    consumer, err := master.ConsumePartition("topic-python", 0, sarama.OffsetNewest)

в:

ConsumePartition(topic string, partition int32, offset int64) (PartitionConsumer, error)
  • topic
  • partition
  • offset

Но вообще в такой форме нужно указывать смещение, что неудобно в использовании. Поэтому обычно используют форму потребительских групп.

type KafkaConsumerGroupAction struct {
	group sarama.ConsumerGroup
}

func NewKafkaConsumerGroupAction(brokers []string, groupId string) *KafkaConsumerGroupAction {
	config := sarama.NewConfig()
	sarama.Logger = log.New(os.Stdout, "[consumer_group]", log.Lshortfile)
	// 重平衡策略
	config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategySticky
	config.Consumer.Group.Session.Timeout = 20 * time.Second
	config.Consumer.Group.Heartbeat.Interval = 6 * time.Second
	config.Consumer.IsolationLevel = sarama.ReadCommitted
	config.Consumer.Offsets.Initial = sarama.OffsetNewest
	config.Version = sarama.V2_3_0_0
	consumerGroup, e := sarama.NewConsumerGroup(brokers, groupId, config)
	if e != nil {
		log.Println(e)
		return nil
	}
	return &KafkaConsumerGroupAction{group: consumerGroup}

}

func (K *KafkaConsumerGroupAction) Consume(topics []string, wg sync.WaitGroup, ctx context.Context) {
	var consumer = KafkaConsumerGroupHandler{ready: make(chan bool)}
	go func() {
		defer wg.Done()
		for {
			if err := K.group.Consume(ctx, topics, &consumer); err != nil {
				log.Panicf("Error from consumer: %v", err)
			}
			if ctx.Err() != nil {
				return
			}
			consumer.ready = make(chan bool)
		}
	}()
	<-consumer.ready
	log.Println("Sarama consumer up and running!...")
	sigterm := make(chan os.Signal, 1)
	signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
	select {
	case <-ctx.Done():
		log.Println("terminating: context cancelled")
	case <-sigterm:
		log.Println("terminating: via signal")
	}
	wg.Wait()
	if err := K.group.Close(); err != nil {
		log.Panicf("Error closing client: %v", err)
	}
}

type KafkaConsumerGroupHandler struct {
	ready chan bool
}

func (K *KafkaConsumerGroupHandler) Setup(sarama.ConsumerGroupSession) error {
	return nil
}

func (K *KafkaConsumerGroupHandler) Cleanup(sarama.ConsumerGroupSession) error {
	return nil
}
func (K *KafkaConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
	for message := range claim.Messages() {
		log.Printf("Message claimed: value = %s, timestamp = %v, topic = %s, partions = %d, offset = %d", string(message.Value), message.Timestamp, message.Topic, message.Partition, message.Offset)
		lag := claim.HighWaterMarkOffset() - message.Offset
		fmt.Println(lag)
		session.MarkMessage(message, "")
	}

	return nil
}

Группа потребителей:

type ConsumerGroup interface {
	Consume(ctx context.Context, topics []string, handler ConsumerGroupHandler) error
	Errors() <-chan error

	Close() error
}

в:

type ConsumerGroupHandler interface {

	Setup(ConsumerGroupSession) error

	Cleanup(ConsumerGroupSession) error

	ConsumeClaim(ConsumerGroupSession, ConsumerGroupClaim) error
}

Для реальной обработки сообщений необходимо реализовать интерфейс ConsumerGroupHandler.

4. Общий поток обработки производителя

Если вам понятны эти концепции, в чем сложность использования kafka в целом?

  • Как убедиться, что сообщения отправляются точно
  • Как убедиться, что сообщения не используются повторно
  • Как сделать так, чтобы сообщение не лагало, лучше всего, чтобы производитель отправлял его в систему сообщений, а потребитель потреблял его сразу без расширения
  • Как обеспечить высокую доступность системы
  1. Конфигурация производителя
  2. создать экземпляр производителя
  3. создать сообщение
  4. отправлять сообщения
  5. Закройте экземпляр производителя
func NewAsyncProducer(addrs []string, conf *Config) (AsyncProducer, error) {
	client, err := NewClient(addrs, conf)
	if err != nil {
		return nil, err
	}
	return newAsyncProducer(client)
}
//异步生产者
type AsyncProducer interface {

	AsyncClose()
	Close() error
	Input() chan<- *ProducerMessage // 发送消息
	Successes() <-chan *ProducerMessage
	Errors() <-chan *ProducerError
}

5. Общий процесс обработки для потребителей

Общий поток обработки потребителей:

  1. Конфигурация потребителя
  2. создать экземпляр потребителя
  3. Подписывайтесь на темы
  4. Совершить смещение
  5. близкий потребитель
func NewConsumer(addrs []string, config *Config) (Consumer, error) {
	client, err := NewClient(addrs, config)
	if err != nil {
		return nil, err
	}
	return newConsumer(client)
}
type Consumer interface {

	Topics() ([]string, error) // 消息
	Partitions(topic string) ([]int32, error) // 分区
	ConsumePartition(topic string, partition int32, offset int64) (PartitionConsumer, error) // 消费消息
	HighWaterMarks() map[string]map[int32]int64 // 高水位

	Close() error
}

6. Общий поток обработки групп потребителей

Обычные потребители должны указывать разделы и смещения для потребления, которые обычно не используются. Обычно выбирают группу потребителей.

Итак, каков общий процесс обработки группы потребителей?

  1. Настроить группы потребителей
  2. Группа потребителей экземпляра, укажите тему, укажите GroupID группы потребителей
  3. Использование сообщений
  4. закрытая группа потребителей
type ConsumerGroup interface {
	Consume(ctx context.Context, topics []string, handler ConsumerGroupHandler) error
	Errors() <-chan error

	Close() error
}

Процессор потребительской группы:

type ConsumerGroupHandler interface {

	Setup(ConsumerGroupSession) error
	Cleanup(ConsumerGroupSession) error
	ConsumeClaim(ConsumerGroupSession, ConsumerGroupClaim) error
}

7. Кластер

Как упоминалось выше, одной из характеристик кластера является наличие нескольких машин и нескольких служб.

В реальной онлайн-среде zookeeper развернут на разных машинах, а сервер kafka развернут на разных машинах, образуя систему, которая вместе обслуживает онлайн-систему.

Личное обучение, чтобы добиться эффекта кластеризации, то есть: использовать разные порты для различения.

Конечно, вы можете настроить zookeeper и kafka локально. Но мне вообще нравится использовать контейнерный метод, который удобно разворачивать.

  • многоузловой зоопарк
  zookeeper-1:
    image: zookeeper
    restart: always
    hostname: zookeeper-1
    ports:
      - 2181:2181
    environment:
      ZOO_MY_ID: 1
      ZOO_SERVERS: server.1=0.0.0.0:2888:3888;2181 server.2=zookeeper-2:2888:3888;2181 server.3=zookeeper-3:2888:3888;2181
    volumes:
      - /local/volumn/zookeeper1/data:/data
      - /local/volumn/zookeeper1/datalog:/datalog
  zookeeper-2:
    image: zookeeper
    restart: always
    hostname: zookeeper-2
    ports:
      - 2182:2181
    environment:
      ZOO_MY_ID: 2
      ZOO_SERVERS: server.1=zookeeper-1:2888:3888;2181 server.2=0.0.0.0:2888:3888;2181 server.3=zookeeper-3:2888:3888;2181
    volumes:
      - /local/volumn/zookeeper2/data:/data
      - /local/volumn/zookeeper2/datalog:/datalog
  zookeeper-3:
    image: zookeeper
    restart: always
    hostname: zookeeper-3
    ports:
      - 2183:2181
    environment:
      ZOO_MY_ID: 3
      ZOO_SERVERS: server.1=zookeeper-1:2888:3888;2181 server.2=zookeeper-2:2888:3888;2181 server.3=0.0.0.0:2888:3888;2181
    volumes:
      - /local/volumn/zookeeper3/data:/data
      - /local/volumn/zookeeper3/datalog:/datalog

Наиболее важными из них являются переменные среды:

ZOO_MY_ID 一般用一个数字表示 myid
ZOO_SERVERS

Абстрактная формула: server.A=B:C:D

  • А означает myid, что означает номер сервера
  • B представляет собой IP-адрес, представляющий сервер
  • C представляет собой порт, через который сервер обменивается информацией с ведущим сервером в кластере.
  • D представляет собой порт, через который серверы взаимодействуют друг с другом во время выборов.

Некоторые люди скажут: я не знаю, что делать с этими переменными среды, и я не знаю конкретных имен переменных среды?

См. конкретную документацию на хабе Docker:

Документация докер-хаба zookeeper:hub.docker.com/_/zookeeper

  kfk1:
    image: index.docker.io/wurstmeister/kafka:latest
    container_name: kfk1
    hostname: kfk1
    restart: always
    ports:
      - 9092:9092
      - 19999:9999
    expose:
      - 19092
    links:
      - zookeeper-1
      - zookeeper-2
      - zookeeper-3
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kfk1:9092
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
      JMX_PORT: 9999
    volumes:
      - /local/volumn/kfk1:/kafka/kafka-logs-kfk1
  kfk2:
    image: index.docker.io/wurstmeister/kafka:latest
    container_name: kfk2
    hostname: kfk2
    restart: always
    ports:
      - 29092:29092
      - 29999:9999
    expose:
      - 29092
    links:
      - zookeeper-1
      - zookeeper-2
      - zookeeper-3
    environment:
      KAFKA_BROKER_ID: 2
      KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kfk2:29092
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:29092
      JMX_PORT: 9999
    volumes:
      - /local/volumn/kfk2:/kafka/kafka-logs-kfk2
  kfk3:
    image: index.docker.io/wurstmeister/kafka:latest
    container_name: kfk3
    hostname: kfk3
    restart: always
    ports:
      - 39092:39092
      - 39999:9999
    expose:
      - 39092
    links:
      - zookeeper-1
      - zookeeper-2
      - zookeeper-3
    environment:
      KAFKA_BROKER_ID: 3
      KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kfk3:39092
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:39092
      JMX_PORT: 9999
    volumes:
      - /local/volumn/kfk3:/kafka/kafka-logs-kfk3

Наиболее важными из них являются следующие переменные среды:

KAFKA_BROKER_ID  broker.id 单节点时,默认值为-1
KAFKA_ZOOKEEPER_CONNECT kafka zookeeper 连接地址,对应上文 zookeeper 对外地址
KAFKA_ADVERTISED_LISTENERS 该节点对外公布的访问地址和端口
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:39092 该节点对外访问地址和端口
  ui:
    image: index.docker.io/sheepkiller/kafka-manager:latest
    restart: always
    links:
      - zookeeper-1
      - zookeeper-2
      - zookeeper-3
      - kfk1
      - kfk2
      - kfk3
    ports:
      - 9000:9000
    environment:
      ZK_HOSTS: zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181
      KAFKA_BROKERS: kfk1:19092,kfk2:29092,kfk3:39092

Где переменные среды:

ZK_HOSTS zookeeper 节点地址
KAFKA_BROKERS kafa 节点地址

запускать:

docker-compose -f docker-compose.yml up -d

Кластерная версия службы kafka в основном такая же, как и служба kafka с одним узлом.Кластерная версия системы более надежна и высокодоступна, например, с избыточным резервным копированием.Сбой узла не влияет на службу, если только все узлы выходят из строя.

  • Резервный:

Для создания темы количество резервных копий меньше или равно количеству узлов кафки. Например, есть три узла, две резервные копии, а на трех узлах могут быть любые два.

  • раздел

Для одного узла все разделы топика находятся в одной папке, для кластерного варианта разделы могут быть примерно равномерно распределены по узлам кластера

Внешний сервис точно такой же, как у одного узла.

topic-go.png

Тема-го 10 разделов, 2 резервные копии: три узла хранятся отдельно: 6, 7, 7 разделы

broker.png

Возможные проблемы с версией кластера?

  • Было установлено, что темы не создаются автоматически, не забудьте сначала создать темы вручную.

  • Адрес доступа к кластеру заблокирован. 1 Настройте /etc/hosts 2 Откройте порты, особенно облачные серверы, не забудьте открыть порты

  • Лаги расхода Лаги, что делать? Добавить экземпляр потребителя

Ссылаться на:

Кодовый адрес: