Действительно, достаточно понять Кафку и прочитать эту статью!

задняя часть база данных Kafka

Привет всем, меня зовут Yunqi.

Некоторые люди говорят, что в мире есть три великих изобретения: огонь, колеса и кафка.

На сегодняшний день Apache Kafka, несомненно, пользуется большим успехом: Confluent утверждает, что треть из 500 крупнейших компаний мира используют Kafka. Сегодня я поделюсь с вами знаниями, связанными с Kafka, высокой производительностью, устойчивостью, резервным копированием с несколькими копиями, горизонтальным расширением...

Текст объемом 10 000 слов, будьте готовы, рекомендуется сначала собрать его, а потом прочитать!

1. Почему существует система сообщений?

  1. Развязка

  2. Асинхронная обработка Например, платформы электронной коммерции, действия seckill. Общий процесс будет разделен на: 1:风险控制,2:库存锁定, 3:生成订单, 4:短信通知, 5:更新数据

  3. Бизнес-деятельность seckill разделена через систему сообщений, и бизнес, который не требует срочного решения, будет решаться медленно; процесс изменяется на: 1:风险控制,2:库存锁定, 3:消息系统, 4:生成订单, 5:短信通知, 6:更新数据

  4. Управление потоком: 1. После того, как шлюз получит запрос, он помещает запрос в очередь сообщений 2. Серверная служба получает запрос из очереди сообщений и завершает последующий процесс обработки всплеска. Затем вернуть результат пользователю. Плюсы: Контролирует поток Минусы: Замедляет процесс

2. Основные понятия Кафки

режиссер: Производитель генерирует данные для кластера Kafka.

потребитель: Consumer для получения данных, обработки данных и потребительских данных, потребительских данных, для извлечения данных внутри Kafka потребителями.

тема:тема

раздел:partition По умолчанию топик имеет один раздел (раздел), и вы можете настроить несколько разделов самостоятельно (разделы разбросаны и хранятся на разных узлах сервера)

3. Кластерная архитектура Kafka

В кластере Kafka сервер kafka является брокером, тема — это только логическое понятие, а раздел — это каталог на диске.

Consumer Group: Группа потребления При использовании данных необходимо указать идентификатор группы Указать идентификатор группы Предполагая, что программа A и программа B имеют одинаковый номер идентификатора группы, две программы принадлежат к одной и той же группе потребления.

специальный: Например, если есть тема Topic A и программа A использует эту тему A, то программа B больше не может использовать тему A (программа A и программа B принадлежат к одной и той же группе потребителей); например, программа A уже использовала данные в тема A, и все еще невозможно повторно использовать данные темы A снова, но после повторного назначения номера идентификатора группы они могут быть использованы. Между различными группами потребителей нет никакого влияния, группу потребителей необходимо настроить, а программа имени потребителя создается автоматически (уникальная).

Controller: главный узел в узле Kafka с помощью zookeeper.

4. Последовательная запись на диск Kafka гарантирует производительность записи данных

**kafka write data: **последовательная запись, при записи данных на диск происходит добавление данных, операция случайной записи отсутствует.Опыт: Если диск сервера достигает определенного числа и диск также достигает определенного числа оборотов, скорость последовательной записи (дозаписи) данных на диск аналогична скорости записи в память生产者生产消息,经过kafka服务先写到os cache 内存中,然后经过sync顺序写到磁盘上。

5. Механизм нулевого копирования Kafka обеспечивает высокую производительность чтения данных

Поток данных чтения потребителя:

  1. Потребитель отправляет запрос в сервис kafka

  2. Сервис kafka обращается к кэшу ОС для чтения данных (если кэша нет, он переходит к диску для чтения данных)

  3. Чтение данных с диска в кеш ОС

  4. os cache копирует данные в приложение kafka

  5. Kafka отправляет данные (копию) в кеш сокета

  6. Кэш сокета передается потребителю через сетевую карту.

    图片

Технология kafka linux sendfile - нулевая копия

1. Потребитель отправляет запрос в службу kafka 2. Служба kafka обращается к кэшу ОС для чтения данных (если кэша нет, она переходит к диску для чтения данных) 3. Данные считываются из диск в кеш-кэш ОС 4. Кэш ОС напрямую передает данные. Отправить на сетевую карту. 5. Передача данных потребителю через сетевую карту.

图片

6. Сегментированное хранилище журналов Kafka

Для топика в Кафке вообще задаются разделы например создается топикtopic_a, а затем указал, что эта тема при создании имеет три раздела. Фактически на трех серверах будет создано три каталога. Сервер 1 (kafka1) создает каталог theme_a-0:. Ниже каталога находится наш файл (хранимые данные), данные kafka — это сообщение, а данные хранятся в файле журнала. Конец .log — это файл журнала, а файл данных называется файлом журнала в kafka.По умолчанию в разделе находится n нескольких файлов журнала (сегментированное хранилище), а файл журнала по умолчанию имеет размер 1 ГБ..

图片

Сервер 2 (kafka2): Создайте каталог theme_a-1: Сервер 3 (kafka3): Создайте каталог theme_a-2:

7. Данные позиционирования бинарного поиска Kafka

Каждое сообщение в Kafka имеет свое собственное смещение (относительное смещение), которое существует на физическом диске In position Position: физическая позиция (где на диске), то есть сообщение имеет две позиции: offset: относительное смещение Shift ( относительное положение) position: физическое положение дискаРазреженный индекс:Kafka использует разреженный индекс для чтения индекса.Каждый раз, когда Kafka записывает журнал размером 4 КБ (.log), индекс записи записывается в индекс. Будет использован бинарный поиск.

图片

8. Дизайн сети с высокой параллельной работой (сначала разберитесь с NIO)

Часть дизайна сети - лучшая часть дизайна kafka, что также является причиной обеспечения высокой параллелизма и высокой производительности Kafka.Чтобы оптимизировать kafka, вы должны лучше понимать принцип kafka, особенно часть проектирования сети.

Шаблон проектирования сети Reactor 1:

图片

Шаблон проектирования сети Reactor 2:

图片

Шаблон проектирования сети Reactor 3:

图片

Проект сети сверхвысокого параллелизма Kafka:

图片

图片

9. Избыточные копии Kafka обеспечивают высокую доступность

В кафке есть реплики разделов, обратите внимание:До 0.8 не было механизма копирования. При создании топика можно указать раздел или указать количество реплик. Реплики имеют роли: раздел-лидер: 1. Операции записи и чтения данных выполняются из раздела-лидера. 2. Список ISR (in-sync-replica) будет поддерживаться, но значения в списке ISR будут удаляться по определенным правилам.Производитель отправляет сообщение.Сообщение должно быть сначала записано в раздел-лидер. Сообщение записывается в другие разделы в списке ISR.После записи сообщение считается отправленным в следующий раздел: синхронизируйте данные из ведущего раздела.

10. Отличное архитектурное мышление-резюме

Kafka — высокая степень параллелизма, высокая доступность, высокая производительность Высокая доступность: механизм многократного копирования Высокая степень параллелизма: проектирование сетевой архитектуры Трехуровневая архитектура: мультиселектор -> многопоточность -> проектирование очередей (NIO) Высокая производительность: запись данных:

  1. Сначала запишите данные в кэш ОС

  2. Запись на диск последовательная запись, высокая производительность

Чтение данных:

  1. Быстро находите данные для потребления на основе разреженного индекса.

  2. Механизм нулевого копирования Уменьшение копирования данных Уменьшение переключения контекста приложений и операционной системы

11. Создание производственной среды Kafka

11.1 Анализ сценария спроса

Платформа электронной коммерции должна ежедневно отправлять 1 миллиард запросов в кластер Kafka. Двадцать восемь. В любом случае это вообще не большая проблема после оценки. 1 миллиард запросов -> 24 пришло, при нормальных условиях большого объема данных с 12:00 до 8:00 утра каждый день нет. 80% запросов обрабатываются еще 16 часов. 16 часов обработки -> 800 миллионов запросов. 16 * 0,2 = 3 часа Обработано 80% из 800 миллионов запросов

То есть 600 миллионов данных обрабатываются за 3 часа. Давайте просто посчитаем количество запросов в секунду в пиковый период.6亿/3小时 =5.5万/s qps=5.5万

1 миллиард запросов * 50 КБ = 46 Тб требуется для хранения 46 Т данных в день.

При нормальных обстоятельствах мы создадим две копии46T * 2 = 92TДанные в Kafka имеют зарезервированный период времени, и самые последние3дней данных.92 т * 3 дня = 276 тЯ говорю о том, что 50 КБ не означает, что сообщение не 50 КБ (журналы объединены, и несколько журналов объединены вместе) Обычно сообщение составляет несколько байтов, а может быть и несколько сотен байт.

11.2 Оценка количества физических машин

1) Во-первых, проанализируйте, нужна ли вам виртуальная машина или физическая машина.При построении кластеров, таких как Kafka, mysql и hadoop, мы используем в производстве физические машины. 2) Общее количество запросов, которые необходимо обработать в пиковый период — 55 000 в секунду, по факту одна-две физические машины точно выдержат. При нормальных обстоятельствах, когда мы оцениваем машину, мы оцениваем ее по 4-кратному пиковому периоду. Если в 4 раза, то емкость нашего кластера должна быть готова к 200 000 qps. Такой кластер является относительно безопасным кластером. Требуется около 5 физических машин. Каждый может выдержать 40 000 запросов.

Резюме сценария:搞定10亿请求,高峰期5.5万的qps,276T的数据,需要5台物理机。

11.3 Выбор диска

Для обработки 1 миллиарда запросов, 55 000 запросов в секунду и 276 терабайт данных в пиковый период требуется 5 физических машин. 1) Твердотельный жесткий диск SSD, по-прежнему нужен обычный механический жесткий диск ** Жесткий диск SSD: более высокая производительность, но дорогой диск SAS: в некоторых аспектах производительность не очень хорошая, но относительно дешевая. Производительность жесткого диска SSD лучше, а это означает, что его производительность произвольного чтения и записи лучше. Подходит для кластеров, таких как MySQL. ** Но на самом деле его производительность при последовательной записи аналогична производительности SAS-дисков. Понимание Кафки: писать в порядке употребления. Поэтому мы используем обычный [机械硬盘] достаточно.

2) Нам нужно оценить, сколько дисков нужно каждому серверу, 5 серверов требуют в общей сложности 276T, и каждый сервер должен хранить 60T данных. В конфигурации сервера в нашей компании используется 11 жестких дисков, каждый из которых по 7Т. 11 * 7Т = 77Т

77T * 5 серверов = 385T.

Резюме сценария:

搞定10亿请求,需要5台物理机,11(SAS) * 7T

11.4 Оценка памяти

Получите 1 миллиард запросов, нужно 5 физических машин, 11 (SAS) * 7T

Мы обнаружили, что процесс чтения и записи данных в kafka основан на кеше ОС. Другими словами, если наш кеш ОС бесконечен, эквивалентна ли вся кафка работе на основе памяти? Если она основана на памяти, производительность должна быть очень хорошим. Память ограничена. 1) Как можно больше ресурсов памяти нужно отдавать в кеш ОС 2) Код Кафки написан на scala, а код клиента на java. Все основано на jvm. Поэтому мы должны отдать часть памяти jvm. Дизайн Kafka не помещает много структур данных в jvm. Так что нашему jvm не нужно слишком много памяти.По опыту достаточно дать 10G.

NameNode: метаданные (десятки гигабайт) тоже помещаются в jvm, и JVM должна отдавать большой объем. Например, дать 100G.

Предположим, у нас есть 10 запросов на этот проект, всего будет 100 тем. 100 тема * 5 раздел * 2 = 1000 раздел Раздел на самом деле является каталогом на физическом компьютере, и в этом каталоге находится много файлов .log. .log — это файл данных хранилища.По умолчанию размер файла .log составляет 1 ГБ. Если мы хотим убедиться, что данные последних .log-файлов 1000 разделов находятся в памяти, производительность будет лучшей на данный момент. 1000 * 1G = 1000G памяти Нам нужно только поставить последний журнал, чтобы убедиться, что 25% последних данных в памяти. 250M * 1000 = 0,25 ГБ * 1000 = 250 ГБ памяти.

250 памяти / 5 = 50 ГБ памяти 50 ГБ + 10 ГБ = 60 ГБ памяти

64Г памяти, еще 4Г, операционной системе тоже нужна память? На самом деле JVM от Kafka не нужно давать целых 10G. Подсчитано, что 64G в порядке. Конечно, если вы можете дать сервер с 128 ГБ памяти, это лучше всего.

Когда я только оценивал, я использовал тему с 5 разделами, но если это тема с большим объемом данных, может быть 10 разделов.

Суммировать:搞定10亿请求,需要5台物理机,11(SAS) * 7T ,需要64G的内存(128G更好)

11.5 Оценка загрузки ЦП

Оцените, сколько ядер ЦП нужно каждому серверу (ресурсы очень ограничены)

Мы оцениваем, сколько процессоров нам нужно, исходя из того, сколько потоков выполняется в нашем сервисе. Поток должен полагаться на процессор для запуска. Если у нас будет больше потоков, но меньше ядер процессора, нагрузка на нашу машину будет очень высокой, и производительность не будет хорошей.

Оцените, сколько потоков будет запускать сервер Kafka после его запуска?

Поток-акцептор 1 поток процессора 3 6~9 потоков Обработка потоков запросов 8 32 потока Регулярная очистка потоков, извлечение потоков данных, регулярная проверка механизма списка ISR и т. д. Таким образом, после запуска службы Kafka будет более 100 потоков.

cpu core = 4, еще раз, десятки потоков точно завалят процессор. ядро процессора = 8, он должен легко поддерживать десятки потоков. Если у нас потоков больше 100 или почти 200, то 8 процессорных ядер не сделать. Поэтому мы предлагаем здесь: ядро ​​ЦП = 16. Если возможно, было бы лучше иметь 32 ядра процессора.

Вывод: кластер kafka должен иметь как минимум 16 ядер процессора, а было бы лучше, если бы он мог иметь 32 ядра процессора. 2 ЦП * 8 = 16 ядер ЦП 4 ЦП * 8 = 32 ядра ЦП

Суммировать:搞定10亿请求,需要5台物理机,11(SAS) * 7T ,需要64G的内存(128G更好),需要16个cpu core(32个更好)

11.6 Оценка потребностей сети

Оцените, какую сетевую карту нам нужно?Как правило, это либо гигабитная сетевая карта (1 Гбит/с), либо сетевая карта 10 Гбит/с (10 Гбит/с).

高峰期的时候 每秒会有5.5万的请求涌入,5.5/5 = 大约是每台服务器会有1万个请求涌入。
我们之前说的,
10000 * 50kb = 488M  也就是每条服务器,每秒要接受488M的数据。数据还要有副本,副本之间的同步
也是走的网络的请求。488 * 2 = 976m/s
说明一下:
   很多公司的数据,一个请求里面是没有50kb这么大的,我们公司是因为主机在生产端封装了数据
   然后把多条数据合并在一起了,所以我们的一个请求才会有这么大。
   
说明一下:
   一般情况下,网卡的带宽是达不到极限的,如果是千兆的网卡,我们能用的一般就是700M左右。
   但是如果最好的情况,我们还是使用万兆的网卡。
   如果使用的是万兆的,那就是很轻松。


11.7 Кластерное планирование

Количество запросов, планирование количества физических машин, анализ количества дисков и выбор того, какую дисковую память, процессорную сетевую карту использовать, - это сказать всем, что если в будущем в компании возникнет спрос, оцените ресурсы. и серверы. Следите за моими идеями, чтобы оценить

Размер сообщения 50kb -> 1kb 500byte 1MIP-имя хоста 192.168.0.100 hadoop1 192.168.0.101 hadoop2 192.168.0.102 hadoop3

Планирование хоста: при архитектуре кластера kafka: архитектура master-slave: контроллер -> управление метаданными всего кластера через кластер zk.

  1. Кластер zookeeper hasoop1 hadoop2 hadoop3

  2. кластер kafka Теоретически мы не должны устанавливать службы kafka вместе с службами zk. Но у нас здесь ограниченное количество серверов. Итак, наш кластер kafka также установлен в hadoop1 haadoop2 hadoop3

12. Эксплуатация и обслуживание Kafka

12.1 Введение в общие инструменты O&M

KafkaManager — инструмент управления страницами

12.2 Общие команды O&M

сцена первая:Объем данных темы слишком велик, количество тем необходимо увеличить

При создании темы в начале объем данных был не большой, и количество разделов не большое.

kafka-topics.sh --create --zookeeper hadoop1:2181,hadoop2:2181,hadoop3:2181 --replication-factor 1 --partitions 1 --topic test6
kafka-topics.sh --alter --zookeeper hadoop1:2181,hadoop2:2181,ha

идентификатор брокера:

hadoop1:0 hadoop2:1 hadoop3:2 Предположим, что раздел имеет три копии: partition0: a,b,c

a: ведущий раздел b, c: подчиненный раздел

ISR:{a,b,c}如果一个follower分区 超过10秒 没有向leader partition去拉取数据,那么这个分区就从ISR列表里面移除。

Сценарий второй:Основная тема увеличить коэффициент копирования

Если вам нужно увеличить коэффициент копирования сценария vim test.json для основных бизнес-данных, сохраните следующую строку сценария json.

{“version”:1,“partitions”:[{“topic”:“test6”,“partition”:0,“replicas”:[0,1,2]},{“topic”:“test6”,“partition”:1,“replicas”:[0,1,2]},{“topic”:“test6”,“partition”:2,“replicas”:[0,1,2]}]}

Выполните приведенный выше json-скрипт:

kafka-reassign-partitions.sh --zookeeper hadoop1:2181,hadoop2:2181,hadoop3:2181 --reassignment-json-file test.json --execute

Сценарий третий:Тема с несбалансированной нагрузкой, ручная миграцияvi topics-to-move.json

{“topics”: [{“topic”: “test01”}, {“topic”: “test02”}], “version”: 1} // 把你所有的topic都写在这里


kafka-reassgin-partitions.sh --zookeeper hadoop1:2181,hadoop2:2181,hadoop3:2181 --topics-to-move-json-file topics-to-move.json --broker-list “5,6” --generate

Напишите сюда все свои машины включая только что добавленных брокеров, там будет сказано, что все разделы равномерно распределены на каждом брокере, включая вновь добавленных брокеров, в это время будет сгенерирован план миграции, который можно сохранить в файл. в: expand-cluster-reassignment.json

kafka-reassign-partitions.sh --zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181 --reassignment-json-file expand-cluster-reassignment.json --execute

kafka-reassign-partitions.sh --zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181 --reassignment-json-file expand-cluster-reassignment.json --verify

Этот вид операции переноса данных должен выполняться в ночное время с низкой пиковой нагрузкой, потому что при этом будут переноситься данные между машинами, что занимает много ресурсов полосы пропускания.–generate:Создайте план миграции на основе заданного списка тем и списка брокеров. generate фактически не выполняет перенос сообщений, а рассчитывает план переноса сообщений для команды execute. --execute: выполнить миграцию в соответствии с заданным планом миграции сообщений. --verify: проверить, было ли сообщение перенесено.

Сценарий четвертый:Если у лидера брокера слишком много разделов

Обычно наш лидерный раздел балансирует нагрузку между серверами. Hadoop1 4 Hadoop2 1 Hadoop3 1

Теперь каждая бизнес-сторона может подать заявку на создание темы самостоятельно, а количество разделов автоматически выделяется и динамически корректируется позже.Kafka сама автоматически распределяет ведущий раздел равномерно на каждой машине, что может гарантировать, что пропускная способность чтения и записи каждого машина едина.Однако есть исключения, то есть, если некоторые брокеры не работают, ведущий раздел будет слишком сконцентрирован на нескольких других брокерах, что приведет к высокой нагрузке на запросы чтения и записи нескольких брокеров, а другие Брокеры простоя будут перезапущены.После этого есть разделы-последователи, а запросы на чтение и запись очень низкие, что приводит к несбалансированной нагрузке кластера.Есть параметр auto.leader.rebalance.enable, который соответствует действительности по умолчанию.Лидер проверяется каждые 300 секунд(leader.imbalance.check.interval.seconds).Сбалансирована ли нагрузка.Если несбалансированный лидер на брокере превышает 10%,leader.imbalance.per.broker.percentage выберет этот Broker Параметры конфигурации: auto.leader.rebalance.enable по умолчанию является истинным лидером по умолчанию ignorer.per.broker.percentage: Процент несбалансированных лидеров, разрешенный для каждого брокера. Если каждый брокер превысит это значение, контроллер инициирует балансировку лидера. Это значение представляет собой процент. 10% Leader.imbalance.check.interval.seconds: значение по умолчанию — 300 секунд.

13. Кафка Продюсер

13.1 Принцип отправки сообщения производителем

图片

13.2 Принцип отправки сообщения производителем - демонстрация базового случая

图片

13.3 Как улучшить пропускную способность

Как улучшить пропускную способность: параметр:buffer.memory: Установите буфер для отправки сообщений, значение по умолчанию 33554432, что составляет 32 МБ Параметр два:compression.type: по умолчанию нет, без сжатия, но также можно использовать сжатие lz4, и эффективность по-прежнему хорошая.После сжатия объем данных может быть уменьшен, а пропускная способность может быть улучшена, но это увеличит нагрузку на ЦП на Сторона производителя Параметр 3:batch.size: Установите размер пакета. Если пакет слишком мал, это вызовет частые сетевые запросы и снизит пропускную способность; если пакет слишком велик, сообщение будет долго ждать отправки, и оно будет оказывают большое давление на буфер памяти. В памяти буферизуются несколько данных. Значение по умолчанию: 16384, что составляет 16 КБ, то есть пакет отправляется, когда он заполнен 16 КБ. Как правило, в реальной производственной среде , значение этого пакета может быть увеличено для повышения пропускной способности.Если пакет Если параметр слишком велик, будет задержка. Обычно устанавливается в соответствии с размером сообщения. Если у нас будет меньше новостей. Параметр linger.ms используется совместно, это значение по умолчанию равно 0, что означает, что сообщение должно быть отправлено немедленно, но это неправильно, обычно устанавливается значение 100 миллисекунд или подобное, это означает, что после отправки сообщения, он входит в пакет, если пакет заполнен 16 КБ в течение 100 миллисекунд, он будет отправлен естественным образом.

13.4 Как обрабатывать исключения

  1. LeaderNotAvailableException: это означает, что если машина зависнет, копия-лидер в это время будет недоступна, что приведет к сбою записи. Прежде чем продолжить запись, вы должны дождаться, пока другие копии-последователи переключятся на копию-лидер. можно повторить отправку; если вы обычно перезапускаете процесс брокера kafka, это обязательно приведет к переключению лидера, что обязательно заставит вас написать ошибку, которая является LeaderNotAvailableException.

  2. NotControllerException: Это тоже самое.Если Брокер, где находится Контроллер, зависнет, в это время будет проблема, и вам нужно дождаться переизбрания Контроллера.В это время это то же самое, что и повторная попытка .

  3. NetworkException: Тайм-аут сетевого исключения а. Настройте параметр повторных попыток, и он будет автоматически повторять попытку б. Но если после нескольких повторных попыток все еще произойдет сбой, нам будет предоставлено исключение. После того, как мы получим исключение, мы повторим попытку сообщения обрабатываются отдельно. У нас будут резервные ссылки. Неудачные сообщения отправляются в Redis, записываются в файловую систему или даже отбрасываются.

13.5 Механизм повторных попыток

Повторная попытка вызывает некоторые проблемы:

  1. сообщение повторяетсяИногда некоторые проблемы, такие как переключение лидера, необходимо повторить, и повторные попытки могут быть установлены, но повторная попытка сообщения вызовет проблему повторной передачи.Например, если джиттер сети заставляет его думать, что это не удалось, он попытается снова , Все были успешными.

  2. СообщениеПовторная попытка сообщения может привести к сообщениям не по порядку, потому что сообщения, которые могут находиться в очереди позади вас, отправляются. Таким образом, вы можете использовать для параметра «max.in.flight.requests.per.connection» значение 1, что гарантирует, что производитель может отправлять только одно сообщение за раз. Интервал между двумя повторными попытками по умолчанию составляет 100 миллисекунд. Чтобы установить его, используйте «retry.backoff.ms». В основном, в процессе разработки 95% ненормальных проблем могут быть решены с помощью механизма повторных попыток.

13.6 Подробное объяснение параметров ACK

На стороне производителя установлено значение request.required.acks=0, пока запрос отправлен, даже если он отправлен, ему все равно, успешна запись или нет. Производительность очень хорошая.Если вы анализируете какие-то журналы, вы можете допустить потерю данных.С этим параметром производительность будет очень хорошей. request.required.acks=1, отправьте сообщение, когда ведущий раздел успешно записан, запись выполнена успешно. Однако этот метод также имеет возможность потери данных. request.required.acks=-1; Это сообщение не будет успешно записано, пока все копии не будут записаны в список ISR. ИСР: 1 экз. 1 ведущий раздел 1 подчиненный раздел kafka server: min.insync.replicas: 1, если мы не установим его, значение по умолчанию равно 1. Ведущий раздел будет поддерживать список ISR, и это значение предназначено для ограничения количества ISR. в списке Копия, например, это значение равно 2, тогда когда в списке ISR всего одна копия. При вставке данных в этот раздел будет сообщено об ошибке. Разработайте решение, которое не теряет данные: Решение, которое не теряет данные: 1) Реплика раздела >=2 2)acks = -1 3)min.insync.replicas >=2 Также возможно, что будет отправлено исключение: обрабатывать исключение

13.7 Пользовательские разделы

Раздел: 1.ключ не установленНаши сообщения будут отправлены на разные разделы в ротации. 2,ключ установленРазделитель, который поставляется с kafka, вычислит хеш-значение на основе ключа, и это хеш-значение будет соответствовать определенному разделу. Если ключи совпадают, хэш-значение должно быть одинаковым, и одно и то же значение ключа должно быть отправлено в один и тот же раздел. Но в некоторых особых случаях нам нужно настроить раздел

public class HotDataPartitioner implements Partitioner {
private Random random;
@Override
public void configure(Map<String, ?> configs) {
random = new Random();
}
@Override
public int partition(String topic, Object keyObj, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
String key = (String)keyObj;
List partitionInfoList = cluster.availablePartitionsForTopic(topic);
//获取到分区的个数 0,1,2
int partitionCount = partitionInfoList.size();
//最后一个分区
int hotDataPartition = partitionCount - 1;
return !key.contains(“hot_data”) ? random.nextInt(partitionCount - 1) : hotDataPartition;
}
}

Как использовать: настроить этот класс:props.put("partitioner.class", "com.zhss.HotDataPartitioner");

13.8 Полная демонстрация случая

14.1 Концепция группы потребителей Один и тот же идентификатор группы принадлежит к одной и той же группе потребителей 1) Каждый потребитель должен принадлежать к Consumer.group, которая является группой потребителей Один раздел темы будет назначен только одному потребителю в одной группе потребителей для Потребитель может выделить несколько разделов, или потребитель может не быть выделен ни для одного раздела 2) Если вы хотите добиться широковещательного эффекта, вам нужно использовать только другой идентификатор группы для потребления. тема A: раздел 0, раздел 1, группа A: потребитель 1: потребляющий раздел 0; другие потребители, и если он снова перезапустится, некоторые разделы будут возвращены ему

14, KAFKA потребителей

14.1 Концепция группы потребителей

Один и тот же идентификатор группы принадлежит к одной и той же группе потребителей 1) Каждый потребитель должен принадлежать к Consumer.group, которая является группой потребителей Раздел темы будет назначен только потребителю в группе потребителей для обработки, и каждый потребитель может может быть назначено несколько разделов, также возможно, что потребитель не назначен ни одному разделу. 2) Если вы хотите добиться эффекта трансляции, вам нужно использовать только разные идентификаторы групп для потребления. тема A: раздел 0, раздел 1, группа A: потребитель 1: потребляющий раздел 0; другие потребители, и если он снова перезапустится, некоторые разделы будут возвращены ему

14.2 Демонстрация базового случая

图片

14.3 Управление смещением

  1. Структура данных в памяти каждого потребителя сохраняет смещение потребления каждого раздела каждой темы, и смещение передается регулярно.Старая версия записывается в zk, но такой высокий одновременный запрос zk является неразумным архитектурным проектом, zk Распределенное Скоординированное, облегченное хранилище метаданных системы, которое не может отвечать за большое число одновременных операций чтения и записи, действует как хранилище данных.

  2. Теперь новая версия отправляет смещение и отправляет его во внутреннюю тему кафки: __consumer_offsets. При отправке в прошлом ключом является group.id+topic+номер раздела, а значением является значение текущего смещения. время от времени kafka будет сжимать эту тему внутри (слияние), то есть каждый номер group.id+topic+partition сохраняет последние данные.

  3. __consumer_offsets может получать много одновременных запросов, поэтому раздел по умолчанию равен 50 (ведущий раздел -> 50 kafka), поэтому, если ваша kafka развертывает большой кластер, например 50 машин, вы можете использовать 50 машин, чтобы противостоять давлению отправленных запросов смещения. потребитель-> сообщение данных-> диск-> увеличение смещения, чтобы начать потребление? -> смещение потребителя (смещение)

14.4 Введение в инструмент мониторинга смещения

  1. Программное обеспечение для управления веб-страницей (kafka Manager). Измените сценарий bin/kafka-run-class.sh и добавьте JMX_PORT=9988 в первую строку.перезапустить процесс кафки

  2. Другое программное обеспечение: смещение потребителя, который в основном отслеживается. Представляет собой jar-пакет java -cp KafkaOffsetMonitor-assembly-0.3.0-SNAPSHOT.jar com.quantifind.kafka.offsetapp.OffsetGetterWeb –offsetStorage kafka\ (По версии: если в кафке есть смещение, то заполнить кафку, если есть zookeeper, заполните zookeeper) –zk hadoop1:2181 –порт 9004 –обновление 15 секунд – сохранение 2 дня.

14.5 Восприятие ненормального потребления

heartbeat.interval.ms: интервал сердцебиения потребителя, вы должны поддерживать пульсацию с координатором, чтобы узнать, неисправен ли потребитель, а затем, если возникает ошибка, команда перебалансировки будет выдана другим потребителям через сердцебиение, чтобы уведомить их для выполнения сеанса операции перебалансировки. будет считать это вычислительной мощностью потребления слишком слабой, и он будет исключен из группы потребления, а раздел будет выделен другим для потребления.Вообще говоря, его можно установить в соответствии с производительностью бизнес-обработки.

14.6 Объяснение основных параметров

fetch.max.bytes: Получить максимальное количество байтов сообщения. Обычно рекомендуется устанавливать большее значение. По умолчанию 1M. На самом деле, мы уже видели этот аналогичный параметр во многих местах, что означает, что максимальное размер сообщения может быть?

  1. Данные, отправляемые Продюсером, максимальный размер сообщения -> 10M

  2. Брокер хранит данные, максимальный размер, который может принять сообщение -> 10M

  3. Consumer max.poll.records: Максимальное количество сообщений, возвращаемых одним опросом.По умолчанию 500. connection.max.idle.ms: Если сокетное соединение между потребителем и брокером не используется в течение определенного периода времени, соединение будет автоматически восстановлено. Тем не менее, соединение сокета должно быть восстановлено для следующего потребления. Это предложение установлено на -1. Не перезапускать enable.auto.commit: включить автоматическую отправку смещения auto.commit.interval.ms: Как часто отправлять смещения, значение по умолчанию 5000 миллисекунд начало темы -> partition0:1000 partitino1:2000 last Когда есть отправленное смещение в каждом разделе, начать потребление с отправленного смещения; когда нет отправленного смещения, использовать вновь сгенерированные данные в разделе нет темы Когда есть отправленное смещение смещение в каждом разделе, начать потребление после смещения; выдает исключение, если для одного из разделов нет зафиксированного смещения

14.7 Подробное описание случая

Вводный случай: подержанная платформа электронной коммерции (Happy Send), в зависимости от количества потребления пользователем, накапливаются пользовательские звезды. Order System (Producer) -> Сообщения отправляются в кластер Kafka. Система членства (потребитель) -> Кластер Kafka потребляет сообщения и обрабатывает их.

14,8 Групповых координаторов Принципы координатора

Вопрос из интервью: Как потребители достигают баланса? — Реализовано по согласованию с координатором

  1. Что такое координатор Каждая группа потребителей выбирает брокера в качестве своего собственного координатора. Он отвечает за мониторинг сердцебиения каждого потребителя в этой группе потребителей, определение того, не работает ли он, и последующую активацию перебалансировки.

  2. Как выбрать машину-координатор Сначала хэшируем groupId (число), затем берем по модулю количество партиций в __consumer_offsets, по умолчанию 50, количество партиций в _consumer_offsets можно задать через offsets.topic.num.partitions, после поиск раздела, где находится раздел Машина-брокер является машиной-координатором. Например: groupId, "myconsumer_group" -> хеш-значение (число) -> по модулю 50 -> 8 __consumer_offsets Какой брокер является разделом № 8 этой темы, и этот брокер является координатором, поэтому он знает всех потребителей в эта группа потребителей. Когда потребитель отправляет смещение, в какой раздел отправляется смещение,

  3. Запуск процесса 1) Каждый потребитель отправляет запрос на присоединение к группе Координатору, 2) Затем Координатор выбирает потребителя из группы потребителей в качестве лидера, 3) Отправляет лидеру статус группы потребителей, 4) Затем лидер будет отвечать за формулирование плана потребления, 5) Отправьте его Координатору через SyncGroup 6) Затем Координатор отправляет план потребления каждому потребителю, и они начнут подключение к сокету и будут потреблять сообщения от ведущего брокера указанного раздела.

图片

14.9 стратегия перебалансировки

Группа потребителей реализует Rebalance координатором

Здесь есть три стратегии перебалансировки: ассортимент, круглый робин, липкий

Например, тема, которую мы потребляем, имеет 12 разделов: p0, p1, p2, p3, p4, p5, p6, p7, p8, p9, p10, p11 Предположим, что в нашей группе потребителей три потребителя.

  1. Стратегия диапазона Стратегия диапазона основана на диапазоне порядковых номеров p0 раздела.3 consumer1 p47 потребитель2 p8~11 потребитель3 — эта стратегия по умолчанию;

  2. Стратегия циклического перебора заключается в опросе и распределении потребитель1:0,3,6,9 потребитель2:1,4,7,10 потребитель3:2,5,8,11 Но есть проблема с двумя предыдущими схемами: 12 - > 2 Каждый потребитель потребляет 6 разделов

Предположим, что завис потребитель 1: p0-5 выделен потребителю 2, а p6-11 выделен потребителю 3. В этом случае разделы p6 и p7, первоначально расположенные на потребителе 2, выделены потребителю 3.

  1. Sticky Contategy Последняя липкая стратегия состоит в том, чтобы обеспечить, чтобы при перебалансировании перегородки, которые первоначально принадлежат потребителю, а затем равномерно их распространяют избыточные разбиения, чтобы поддерживать исходную стратегию распределения разбиения как можно больше

потребитель1: 0-3 потребитель2: 4-7 потребитель3: 8-11 Предположим, что потребитель3 вешает трубку потребитель1: 0-3, +8,9 потребитель2: 4-7, +10,11

15. Управление брокером

15.1 Значение Льва и hw

  1. Основной принцип Кафки

  2. Как оценить ресурс кластера

  3. Создайте набор кластеров kafka - «Представляет некоторые простые операции управления эксплуатацией и обслуживанием.

  4. Производитель (использование, основные параметры)

  5. Потребители (принцип, использование, основные параметры)

  6. Некоторые принципы внутри брокера

Основные понятия: LEO, HW LEO: Это связано со смещением смещения.

ЛЕО: В kafka и ведущий, и ведомый разделы называются репликами.

Каждый раз, когда раздел получает сообщение, он обновляет свой собственный LEO, который является смещением конца журнала. LEO на самом деле является последним смещением + 1.

HW: Высокий уровень воды LEO имеет очень важную функцию для обновления HW.Если LEO последователя и лидера синхронизированы, HW может обновлять данные перед HW и виден потребителям, а сообщение принадлежит коммиту. государство. Потребители сообщений после HW не могут их использовать.

15.2 Обновление Лео

图片

15.3 аппаратное обновление

图片

15.4 Как контроллер управляет всем кластером

1: /controller/id конкурирующего контроллера 2: Каталог, который слушает служба контроллера: /broker/ids/ используется для обнаружения брокера в сети и в автономном режиме /broker/topics/ Чтобы создать тему, мы создали команду темы. , предоставленные параметры и ZK-адрес. /admin/reassign_partitions Переназначение раздела  …图片

15.5 Отложенные задачи

Механизм отложенного планирования Kafka (расширенные знания) Давайте сначала посмотрим, где в kafka есть задачи, которые необходимо отложить. Первый тип отложенной задачи: например, если acks производителя = -1, он должен дождаться завершения записи как лидером, так и ведомым, прежде чем вернуть ответ. Таймаут есть, по умолчанию 30 секунд (request.timeout.ms). Следовательно, после записи куска данных на ведущий диск должна быть задача задержки, время истечения которой составляет 30 секунд.Задача задержки помещается в DelayedOperationPurgatory (менеджер задержки). Если все последователи будут записаны на локальный диск раньше, чем за 30 секунд, то задача будет автоматически запущена для пробуждения, и результат ответа может быть возвращен клиенту, в противном случае в самой задаче задержки указано максимум 30 секунд. , и если период тайм-аута не достигнут, исключение будет возвращено непосредственно в тайм-аут. Второй тип отложенной задачи: когда ведомый получает сообщения от ведущего, если он обнаруживает, что он пуст, он создаст отложенную тянущую задачу по истечении времени задержки (например, 100 мс) и вернет ведомому пустое сообщение. Затем ведомый отправляет запрос на чтение сообщения еще раз, но если лидер напишет сообщение во время задержки (менее 100 мс), задача автоматически проснется и автоматически выполнит задачу на вытягивание.

Необходимо запланировать большое количество отложенных задач.

15.6 Механизм времени колеса

  1. Какое колесо времени должно быть спроектировано? В Kafka много задач с задержкой, которые реализованы не на основе JDK Timer, временная сложность задач вставки и удаления O(nlogn), но реализована на основе написанного ею же колеса времени, а временная сложность составляет O(1), в зависимости от механизма колеса времени, задержки вставки и удаления задачи, O(1)

  2. Что такое колесо времени? На самом деле колесо времени представляет собой массив. tickMs: интервал колеса времени 1 мс wheelSize: размер колеса времени 20 interval: timckMS * wheelSize, общий промежуток времени колеса времени. 20 мс currentTime: Указатель текущего времени. а: поскольку колесо времени представляет собой массив, когда вы хотите получить данные из него, вы полагаетесь на индекс, а временная сложность равна O(1) б: сохраняется задача, соответствующая определенной позиции в массиве в двусвязном списке.Временная сложность вставки и удаления задач в двусвязном списке также O(1) Например: вставить задачу, которая будет выполняться через 8 мс на 19 мс 3. Многоуровневое колесо времени Например: в вставьте задачу для запуска через 110 мс. tickMs: интервал колеса времени 20 мс wheelSize: размер колеса времени 20 interval: timckMS * whellSize, общий промежуток времени колеса времени. 20 мс currentTime: Указатель текущего времени. Колесо в первый раз: 1 мс * 20 Колесо во второй раз: 20 мс * 20 Колесо в третий раз: 400 мс * 20

图片

Respect ~

Рекомендуемое чтение:

Прозрачный! Общие методы моделирования и демонстрации примеров в области хранилища данных

Orange Heart Preferred — интервью со старшим инженером Data Warehouse

О построении и оптимизации архитектуры хранилища данных и проектирования моделей

Комплексное интерпретация центра обработки данных, хранилища данных и озера данных

Архитектор | 10 вопросов о душе строительства хранилища данных

图片

Я "Юнь Ци", разработчик больших данных, который любит технологии и может писать стихи. Приветствую всех, кто обратил внимание!

云祁QI

Юнци ЦиЖизнь, море и море, пробивающееся сквозь волны.

В этой статье используетсяПомощник по синхронизации статейСинхронизировать