Всем привет, меня зовут Се Вэй, я программист.
Сегодняшняя тема: руководство по использованию kafka, одноузловая версия.
1. Сценарии использования
Если вы являетесь бэкенд-инженером, приложение, которое вы разработали, нормально работает в сети, а событие seckill внезапно приводит к сбою системы, а система проверки обнаруживает, что большой объем трафика не обрабатывается, что приводит к зависанию системы. есть два типа идей: 1. обратный прокси-сервер nginx, пересылка большего количества запросов на сервер во внутренней сети для обработки, чтобы достичь цели балансировки нагрузки 2. использовать систему сообщений для «кеширования» большего количества запросов с помощью промежуточного программного обеспечения, а затем кэшировать запросы постоянно извлекаются из этой системы для дальнейшей обработки.
Система сообщений, используемая последним, является сценарием использования kafka.
Так что же такое кафка?
Kafka представляет собой распределенную систему обмена сообщениями и в настоящее время позиционируется как распределенная стриминговая платформа.
Проще говоря, система A отправляет сообщение в систему сообщений, а система B получает сообщение из системы сообщений для последующей обработки.
Общее слово, используемое для описания сценариев приложений kafka: отсечение пиков и заполнение впадин, снижение пикового трафика, заполнение впадин трафика и обеспечение максимально возможной плавности системы.
Отсюда: три типичных сценария применения kafka
- система сообщений
- Система хранения
- Распределенная потоковая платформа
Система сообщений в настоящее время является наиболее широко используемым приложением; передача сообщений должна сохраняться для последующего извлечения системы, поэтому ее также можно использовать в качестве системы хранения; после извлечения сообщений они фактически предназначены для последующей обработки системой, так почему бы не включить данные обработка тоже?система kafka? Платформа распределенной обработки потоков, вероятно, означает это.
Основное приложение указано ниже: система обмена сообщениями
2. Основные понятия
Сообщение генерируется системой A и отправляется в систему сообщений, а система B извлекает его из системы сообщений, что включает в себя множество концепций.
- Система А называется производителем-производителем, и ее целью является отправка сообщений
- Система сообщений называется брокером, и ее суть заключается в том, что целью процесса обслуживания является получение сообщений от производителей, запросов на получение сообщений от потребителей и их сохранение.
- Система B называется потребителем-потребителем, целью которой является получение сообщений в системе сообщений.
Существуют разные параметры настройки для производителей и потребителей, которые определяют различное поведение производителей и потребителей.
Чтобы отправить сообщение, производитель должен сначала знать, куда его отправить, то есть знать адрес брокера, знать адрес брокера, а настройки брокера (сервера kafka) ограничивают адрес и другие варианты поведения постоянное хранилище.Кроме того, как быть с различными типами отправляемых сообщений? Система kafka использует логическую концепцию для этой концепции различения сообщений: Topic, то есть Topic, указанный производителем, отличается, и адрес хранения отличается.
Для топиков простой сценарий - в него непрерывно отправляется контент, а постоянное хранилище постоянно хранится в дополнительном режиме.В простых сценариях проблем нет.Проблема в том, что если данных сообщений слишком много, то это не способствует к потреблению системы. Это очень простая идея. Различные «файлы» дополнительно хранятся для уменьшения общего масштаба. Эта концепция называется разделом в кафке: раздел. Сообщения могут непрерывно отправляться на раздел в дополнительном режиме. Раздел имеет число, начальный бит равен 0, а сообщение Режим добавления сохраняется в разделе и дает смещение числа
Чтобы получать сообщения из системы брокера, потребители должны, во-первых, знать адрес брокера, а во-вторых, им нужно знать тему.Более подробно, вы можете указать, какой раздел и с какого смещения начинают потреблять сообщения.
Что делать, если новость потерялась? Простой способ — избыточное резервное копирование: репликация, несколько резервных копий, одна из которых является ведущей, а другая — ведомой, роль ведущей заключается в подключении к сообщению, ведомая не связана напрямую с сообщением, а отвечает только за связь с ведущим, непрерывная синхронизация данных.
Несколько брокеров образуют кластер kafka. В случае зависания системы kafka система kafka полагается на zookeeper для переизбрания для создания нового лидера.
kafka cluster:
kafka тема: Концепции разбиения
кафка кластер:
3. Использование клиента
Основываясь на приведенных выше концепциях: Итак, как создать службу Kafka для завершения системы сообщений?
- Запустить сервисный процесс: брокер
Поддельный код:
type Broker struct{
Addr
Config
...
}
- Производитель подключается к брокеру
Поддельный код:
type Producer struct{
Config
Message
...
}
- Потребитель подключается к брокеру
Поддельный код
type Consumer strcut{
Config
Topic
Partitions
Offset
...
}
Основная мысль:
- Запустить службу кафки
- Система А подключается к сервису и отправляет сообщение
- Система B подключается к службе и использует сообщения
В сочетании с примером официального сайта: Как выполнить самую простую отправку и получение сообщений.
Загрузите установочный пакет: kafka_2.12-2.3.0.tgz
- 2.12 относится к версии компилятора
- 2.3.0 относится к версии кафки
После распаковки остаются две наиболее важные директории:
- bin : серия скриптов, таких как запуск службы zookeeper, создание тем, отправка сообщений производителями, потребление сообщений потребителями и т. д.
zookeeper-server-start.sh
zookeeper-server-stop.sh
kafka-configs.sh
kafka-console-consumer.sh
kafka-console-producer.sh
kafka-consumer-groups.sh
kafka-topics.sh
kafka-server-start.sh
kafka-server-stop.sh
...
- config: файл конфигурации: например, настройка порта zookeeper, настройка каталога хранения журнала kafka, внешнего порта, максимальной емкости сообщений, частого сохранения и т. д.
zookeeper.properties
server.properties
producer.properties
consumer.properties
...
Около 200 параметров, извините, не помню. Что делать тогда? Если вы не будете учиться, вы не сможете зарабатывать деньги или повышать зарплату.
Основные настройки по умолчанию, частично по категориям:
- zookeeper.properties
kafka полагается на распределенную координацию zookeeper
dataDir=/tmp/zookeeper
clientPort=2181
Помните этот клиентский порт по умолчанию = 2181
- server.properties
кафка серверная служба
log.dirs=/tmp/kafka-logs //日志存储目录
log.retention.hours=168 // 日志存储时长
broker.id=0 // 默认 broker id,集群方式的 kafka 设置,给每个 broker 编号
listeners=PLAINTEXT://:9092 // 对外提供的服务入口地址
zookeeper.connect=localhost:2181 // ZooKeeper集群地址
...
- producer.properties
Содержание контрактных сообщений и т. д.
- consumer.properties
Согласовать содержание сообщений о потреблении и т. д.
После настройки параметров конфигурации:
- начать смотритель зоопарка
> bin/zookeeper-server-start.sh config/zookeeper.properties
- Запустите сервисный процесс kafka
> bin/kafka-server-start.sh config/server.properties
Для создания тем, запросов тем и т. д. можно использовать: kafka-topics.sh
Производитель может использовать: kafka-console-producer.sh
Потребители могут получать сообщения, используя: kafka-console-consumer.sh
Конечно, эти операции обычно используются только для тестирования, а фактическое использование заключается в использовании клиента, соответствующего языку.
4. Демонстрация
клиент версии kafka go:
Загрузить и установить:
go get -u -v github.com/Shopify/sarama
4.1 Производители
Система А
- режиссер
type KafkaAction struct {
DataSyncProducer sarama.SyncProducer
DataAsyncProducer sarama.AsyncProducer
}
// 同步方式
func newDataSyncProducer(brokerList []string) sarama.SyncProducer {
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll // Wait for all in-sync replicas to ack the message
config.Producer.Retry.Max = 5 // Retry up to 10 times to produce the message
config.Producer.Return.Successes = true
config.Producer.Partitioner = sarama.NewRoundRobinPartitioner
producer, err := sarama.NewSyncProducer(brokerList, config)
if err != nil {
log.Fatalln("Failed to start Sarama producer1:", err)
}
return producer
}
// 异步方式
func newDataAsyncProducer(brokerList []string) sarama.AsyncProducer {
config := sarama.NewConfig()
sarama.Logger = log.New(os.Stdout, "[KAFKA] ", log.LstdFlags)
config.Producer.RequiredAcks = sarama.WaitForLocal // Only wait for the leader to ack
config.Producer.Compression = sarama.CompressionSnappy // Compress messages
config.Producer.Flush.Frequency = 500 * time.Millisecond // Flush batches every 500ms
config.Producer.Partitioner = sarama.NewRoundRobinPartitioner
producer, err := sarama.NewAsyncProducer(brokerList, config)
if err != nil {
log.Fatalln("Failed to start Sarama producer2:", err)
}
go func() {
for err := range producer.Errors() {
log.Println("Failed to write access log entry:", err)
}
}()
return producer
}
Помните, что производители имеют ряд параметров конфигурации? в конфиге есть эта функция, там есть значения по умолчанию, соответствующие значения вы можете выставить сами.
Например: алгоритм сжатия
config.Producer.Compression = sarama.CompressionSnappy
Обычно используемые алгоритмы сжатия:
- gzip
- snappy
- lz4
- zstd
Различные алгоритмы сжатия отличаются в основном степенью сжатия и пропускной способностью.
такие как правила раздела
config.Producer.Partitioner = sarama.NewRoundRobinPartitioner
Часто используемые правила разделения:
- механизм опроса
- случайный раздел
- раздел по ключу
Например: возвращает ли отправка сообщения успех или нет
onfig.Producer.RequiredAcks = sarama.WaitForLocal
- Сообщение: Производитель только передает данные группы байтов.
интерфейс
type Encoder interface {
Encode() ([]byte, error)
Length() int
}
Отправляемое сообщение должно реализовывать интерфейс Encoder, то есть определенная структура сообщения должна реализовывать методы Encode и Length.
type SendMessage struct {
Method string `json:"method"`
URL string `json:"url"`
Value string `json:"value"`
Date string `json:"date"`
encoded []byte
err error
}
func (S *SendMessage) Length() int {
b, e := json.Marshal(S)
S.encoded = b
S.err = e
return len(string(b))
}
func (S *SendMessage) Encode() ([]byte, error) {
return S.encoded, S.err
}
- отправлять сообщения
func (K *KafkaAction) Do(v interface{}) {
message := v.(SendMessage)
// 发送的消息返回分区和偏移量
partition, offset, err := K.DataSyncProducer.SendMessage(&sarama.ProducerMessage{
Topic: TOPIC,
Value: &message,
})
if err != nil {
log.Println(err)
return
}
value := map[string]string{
"method": message.Method,
"url": message.URL,
"value": message.Value,
"date": message.Date,
}
fmt.Println(fmt.Sprintf("/%d/%d/%+v", partition, offset, value))
}
Например, мы отправляем сообщения в соответствии с приведенной выше конфигурацией: тема: тема-голанг раздел/смещение/значение
/0/0/map[date:12344 method:get5 url:www.baidu.com4 value:da4]
/0/1/map[date:12344 method:get5 url:www.baidu.com4 value:da4]
/0/2/map[date:12344 method:get5 url:www.baidu.com4 value:da4]
/0/3/map[date:12344 method:get5 url:www.baidu.com4 value:da4]
Выше есть только один раздел, и значение смещения продолжает увеличиваться.
Создайте еще одну тему, разделенную на 10 областей. тема: тема-питон
Как это выглядит в журнале?
// cd log.dirs ; server.properties 中的设置
topic-golang-0
topic-python-0
topic-python-1
topic-python-2
topic-python-3
topic-python-4
topic-python-5
topic-python-6
topic-python-7
topic-python-8
topic-python-9
Отправляем логи в топик-python и опрашиваем правила раздела:
/0/0/map[date:12344 method:get5 url:www.baidu.com4 value:da4]
/1/0/map[date:12344 method:get5 url:www.baidu.com4 value:da4]
/2/0/map[date:12344 method:get5 url:www.baidu.com4 value:da4]
/3/0/map[date:12344 method:get5 url:www.baidu.com4 value:da4]
/4/0/map[date:12344 method:get5 url:www.baidu.com4 value:da4]
/5/0/map[date:12344 method:get5 url:www.baidu.com4 value:da4]
/6/0/map[date:12344 method:get5 url:www.baidu.com4 value:da4]
/7/0/map[date:12344 method:get5 url:www.baidu.com4 value:da4]
/8/0/map[date:12344 method:get5 url:www.baidu.com4 value:da4]
/9/0/map[date:12344 method:get5 url:www.baidu.com4 value:da4]
/0/1/map[date:12344 method:get5 url:www.baidu.com4 value:da4]
/1/1/map[date:12344 method:get5 url:www.baidu.com4 value:da4]
Опрос, постоянная отправка сообщений в память раздела.
4.2 Потребители
Система Б
func main() {
config := sarama.NewConfig()
config.Consumer.Return.Errors = true
brokers := []string{"127.0.0.1:9092"}
master, err := sarama.NewConsumer(brokers, config)
if err != nil {
panic(err)
}
defer func() {
if err := master.Close(); err != nil {
panic(err)
}
}()
_, e := master.Partitions("topic-python")
if e != nil {
log.Println(e)
}
consumer, err := master.ConsumePartition("topic-python", 0, sarama.OffsetOldest)
if err != nil {
panic(err)
}
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
doneCh := make(chan struct{})
go func() {
for {
select {
case err := <-consumer.Errors():
fmt.Println(err)
case msg := <-consumer.Messages():
fmt.Println("Received messages", string(msg.Key), string(msg.Value), msg.Topic)
case <-signals:
fmt.Println("Interrupt is detected")
doneCh <- struct{}{}
}
}
}()
<-doneCh
}
- Потребитель указывает тему: тема-python
- Раздел, указанный потребителем: 0
Помните сообщения, которые производители отправляют в топик-питон? раздел/смещение/значение
/0/0/map[date:12344 method:get5 url:www.baidu.com4 value:da4]
/1/0/map[date:12344 method:get5 url:www.baidu.com4 value:da4]
/2/0/map[date:12344 method:get5 url:www.baidu.com4 value:da4]
/3/0/map[date:12344 method:get5 url:www.baidu.com4 value:da4]
/4/0/map[date:12344 method:get5 url:www.baidu.com4 value:da4]
/5/0/map[date:12344 method:get5 url:www.baidu.com4 value:da4]
/6/0/map[date:12344 method:get5 url:www.baidu.com4 value:da4]
/7/0/map[date:12344 method:get5 url:www.baidu.com4 value:da4]
/8/0/map[date:12344 method:get5 url:www.baidu.com4 value:da4]
/9/0/map[date:12344 method:get5 url:www.baidu.com4 value:da4]
/0/1/map[date:12344 method:get5 url:www.baidu.com4 value:da4]
/1/1/map[date:12344 method:get5 url:www.baidu.com4 value:da4]
Видно, что в разделе два сообщения: 0. Затем потребитель указывает раздел и может использовать только эти два сообщения.
Received messages {"method":"get5","url":"www.baidu.com4","value":"da4","date":"12344"} topic-python
Received messages {"method":"get5","url":"www.baidu.com4","value":"da4","date":"12344"} topic-python
4.3 Другое
Какие еще функции нам нужны при использовании клиента kafka?
- О создании темы, описании, удалении и т.д.
- Описание группы потребителей и т. д.
- Метаинформация: метаданные
type ClusterAdmin interface {
CreateTopic(topic string, detail *TopicDetail, validateOnly bool) error
ListTopics() (map[string]TopicDetail, error)
DescribeTopics(topics []string) (metadata []*TopicMetadata, err error)
DeleteTopic(topic string) error
CreatePartitions(topic string, count int32, assignment [][]int32, validateOnly bool) error
DeleteRecords(topic string, partitionOffsets map[int32]int64) error
DescribeConfig(resource ConfigResource) ([]ConfigEntry, error)
AlterConfig(resourceType ConfigResourceType, name string, entries map[string]*string, validateOnly bool) error
CreateACL(resource Resource, acl Acl) error
ListAcls(filter AclFilter) ([]ResourceAcls, error)
DeleteACL(filter AclFilter, validateOnly bool) ([]MatchingAcl, error)
ListConsumerGroups() (map[string]string, error)
DescribeConsumerGroups(groups []string) ([]*GroupDescription, error)
ListConsumerGroupOffsets(group string, topicPartitions map[string][]int32) (*OffsetFetchResponse, error)
DeleteConsumerGroup(group string) error
DescribeCluster() (brokers []*Broker, controllerID int32, err error)
Close() error
}
Это основные приложения одноузловой кафки.
5. Контейнерные услуги
Любая система, предоставляющая службы, может использовать версию контейнера, и Kafka также может использовать версию контейнера. Конфигурацию можно задать с помощью переменных среды.
docker-compose.yml
version: '2'
services:
ui:
image: index.docker.io/sheepkiller/kafka-manager:latest
depends_on:
- zookeeper
ports:
- 9000:9000
environment:
ZK_HOSTS: zookeeper:2181
zookeeper:
image: index.docker.io/wurstmeister/zookeeper:latest
ports:
- 2181:2181
server:
image: index.docker.io/wurstmeister/kafka:latest
depends_on:
- zookeeper
ports:
- 9092:9092
environment:
KAFKA_OFFSETS_TOPIC_REPLIATION_FACTOR: 1
KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
- распределенная система координации зоопарка
- сервер кафка сервис кафка
- kafka-manager платформа управления kafka
Последующие версии кластера.