Kafka битва сообщений триллионного уровня

Java задняя часть Kafka

1. Приложение Кафка

В этой статье в основном обобщаются возможности, необходимые для обеспечения высокой доступности, высокой надежности, высокой производительности, высокой пропускной способности и безопасной работы кластера, когда трафик кластера Kafka достигает триллионов записей в день, 10 триллионов записей в день или даже выше.

Сводка здесь в основном относится к версии Kafka 2.1.1, включая обновление версии кластера, миграцию данных, ограничение трафика, сигнализацию мониторинга, балансировку нагрузки, расширение/уменьшение кластера, изоляцию ресурсов, отказоустойчивость кластера, безопасность кластера, оптимизацию производительности, платформизацию, открытость. дефекты исходной версии, динамика сообщества и т. д. Эта статья в основном знакомит с основным контекстом, не вдаваясь в подробности. Давайте рассмотрим некоторые основные сценарии применения Kafka в качестве концентратора данных.

На следующем рисунке показаны некоторые основные процессы обработки данных, а Kafka играет роль концентратора данных.

Далее давайте взглянем на общую архитектуру нашей платформы Kafka;

Обновление версии 1.1

1.1.1 Как выполнять непрерывное обновление и откат версий с открытым исходным кодом

Адрес официального сайта:kafka.apache.org

1.1.1.2 Как обновить и откатить преобразование исходного кода

Потому что в процессе апгрейда неизбежно будет чередоваться логика старого и нового кода. Некоторые узлы в кластере являются версиями с открытым исходным кодом, а некоторые другие узлы — модифицированными версиями. Поэтому необходимо учитывать смешивание старого и нового кода в процессе обновления, как обеспечить совместимость и как вернуться в случае сбоя.

1.2 Миграция данных

Из-за архитектурных особенностей кластера Kafka это неизбежно приводит к несбалансированной нагрузке трафика внутри кластера, поэтому нам необходимо выполнить некоторую миграцию данных, чтобы реализовать баланс потоков между различными узлами кластера. Версия Kafka с открытым исходным кодом предоставляет инструмент сценариев для переноса данных.bin/kafka-reassign-partitions.sh«Вы можете использовать этот скрипт, если вы не реализуете автоматическую балансировку нагрузки самостоятельно.

План миграции, созданный скриптом, предоставленным версией с открытым исходным кодом, является полностью ручным вмешательством.Когда масштаб кластера очень велик, эффективность миграции становится очень низкой, и расчет обычно выполняется в днях. Конечно, мы можем реализовать программу автоматической балансировки.Когда балансировка нагрузки автоматизирована, мы в основном используем внутренний API, и программа помогает нам создавать планы миграции и выполнять задачи миграции. Следует отметить, что существует два типа планов миграции: указанный каталог данных и неуказанный каталог данных.Для указанного каталога данных необходимо настроить аутентификацию безопасности ACL.

Адрес официального сайта:kafka.apache.org

1.2.1 Миграция данных между брокерами

Не указывать каталог данных

//未指定迁移目录的迁移计划
{
    "version":1,
    "partitions":[
        {"topic":"yyj4","partition":0,"replicas":[1000003,1000004]},
        {"topic":"yyj4","partition":1,"replicas":[1000003,1000004]},
        {"topic":"yyj4","partition":2,"replicas":[1000003,1000004]}
    ]
}

Укажите каталог данных

//指定迁移目录的迁移计划
{
    "version":1,
    "partitions":[
        {"topic":"yyj1","partition":0,"replicas":[1000006,1000005],"log_dirs":["/data1/bigdata/mydata1","/data1/bigdata/mydata3"]},
        {"topic":"yyj1","partition":1,"replicas":[1000006,1000005],"log_dirs":["/data1/bigdata/mydata1","/data1/bigdata/mydata3"]},
        {"topic":"yyj1","partition":2,"replicas":[1000006,1000005],"log_dirs":["/data1/bigdata/mydata1","/data1/bigdata/mydata3"]}
    ]
}

1.2.2 Миграция данных между дисками в брокере

Серверы в производственной среде обычно монтируются с несколькими жесткими дисками, например, 4 блока/12 блоков и т. д.; тогда он может появиться в кластере Kafka, и трафик между брокерами относительно сбалансирован, но внутри брокера трафик между дисками не сбалансирован, в результате чего диск перегружается, что влияет на производительность и стабильность кластера и не позволяет эффективно использовать аппаратные ресурсы. В этом случае нам нужно сбалансировать нагрузку трафика нескольких дисков внутри брокера, чтобы трафик более равномерно распределялся по каждому диску.

1.2.3 Параллельная миграция данных

Инструмент миграции реплик «bin/kafka-reassign-partitions.sh», предоставляемый текущей версией Kafka с открытым исходным кодом (версия 2.1.1), может выполнять задачи последовательной миграции только в пределах одного кластера. В случае, когда несколько групп ресурсов физически изолированы в кластере, поскольку группы ресурсов не будут влиять друг на друга, но они не могут отправлять задачи миграции параллельно и дружественным образом, эффективность миграции немного низка, и эта проблема была решена. не решается до версии 2.6.0. Если вам нужно реализовать одновременную миграцию данных, вы можете обновить версию Kafka или изменить исходный код Kafka.

1.2.4 Прекращение переноса данных

Инструмент миграции реплик «bin/kafka-reassign-partitions.sh», предоставляемый текущей версией Kafka с открытым исходным кодом (версия 2.1.1), не может прервать миграцию после запуска задачи миграции. Когда задача миграции влияет на стабильность или производительность кластера, он становится беспомощным и может только ждать завершения задачи миграции (успеха или неудачи).Эта проблема не будет решена до версии 2.6.0. Если вам нужно прекратить миграцию данных, вы можете обновить версию Kafka или изменить исходный код Kafka.

1.3 Ограничения движения

1.3.1 Ограничения потока производства и потребления

Часто будет некоторое неожиданное, непредсказуемое потребление трафика или производство аномальных кластеров IO и других ресурсов будет генерировать огромное давление, и в конечном итоге влияет на стабильность и производительность всего кластера. Затем мы можем быть синхронизированы между ограничениями пользовательских дорожного движения по производству, потреблению, копии данных, ограничивающий текущий механизм не предназначен для ограничения пользователей, но избегать внезапного потока, повлиять на устойчивость и производительность кластера, пользователь может Лучший сервис.

Как показано на рисунке ниже, входящий трафик узла внезапно увеличился со 140 МБ/с до 250 МБ/с, а исходящий трафик увеличился примерно с 400 МБ/с до 800 МБ/с. Если в настоящее время нет механизма ограничения, несколько узлов в кластере будут подвержены риску попадания этого аномального трафика или даже вызовут лавины кластера.

Адрес официального сайта ограничения трафика на производство/потребление изображений:нажмите на ссылку

Для лимита потока производителей и потребителей официальный веб-сайт предоставляет следующие комбинации размеров для ограничения (конечно, следующий механизм ограничения потока имеет определенные дефекты, которые будут упомянуты позже в разделе «Функциональные дефекты версии Kafka с открытым исходным кодом»):

/config/users/<user>/clients/<client-id> //根据用户和客户端ID组合限流
/config/users/<user>/clients/<default>
/config/users/<user>//根据用户限流 这种限流方式是我们最常用的方式
/config/users/<default>/clients/<client-id>
/config/users/<default>/clients/<default>
/config/users/<default>
/config/clients/<client-id>
/config/clients/<default>

При запуске брокерской службы Kafka необходимо включить настройку параметров JMX, что удобно для сбора различных JMX-индикаторов Kafka через другие приложения для мониторинга службы. Когда пользователю необходимо отрегулировать пороговое значение текущего лимита, выполняется интеллектуальная оценка в соответствии с трафиком, который может выдержать один брокер, без ручного вмешательства, чтобы определить, можно ли выполнить настройку; для ограничения пользовательского трафика основные индикаторы, которые необходимо упомянутые включают следующие два:

(1)消费流量指标:ObjectName:kafka.server:type=Fetch,user=acl认证用户名称 属性:byte-rate(用户在当前broker的出流量)、throttle-time(用户在当前broker的出流量被限制时间)
(2)生产流量指标:ObjectName:kafka.server:type=Produce,user=acl认证用户名称 属性:byte-rate(用户在当前broker的入流量)、throttle-time(用户在当前broker的入流量被限制时间)

1.3.2 Ограничение трафика лидера синхронизации/переноса данных ведомых устройств

Скопируйте адрес официального сайта ограничения трафика миграции/синхронизации:Связь

Задействованные параметры следующие:

//副本同步限流配置共涉及以下4个参数
leader.replication.throttled.rate
follower.replication.throttled.rate
leader.replication.throttled.replicas
follower.replication.throttled.replicas

Вспомогательные показатели следующие:

(1)副本同步出流量指标:ObjectName:kafka.server:type=BrokerTopicMetrics,name=ReplicationBytesOutPerSec
(2)副本同步入流量指标:ObjectName:kafka.server:type=BrokerTopicMetrics,name=ReplicationBytesInPerSec

1.4 Мониторинг сигналов тревоги

Для мониторинга Kafka доступны некоторые инструменты с открытым исходным кодом, например:

Kafka Manager;

Kafka Eagle;

Kafka Monitor;

КафкаОфсетмонитор;

Мы встроили Kafka Manager в платформу в качестве инструмента для просмотра некоторых основных показателей, но эти инструменты с открытым исходным кодом не могут быть хорошо интегрированы в наши собственные бизнес-системы или платформы. Поэтому нам необходимо внедрить систему с большей детализацией, более интеллектуальным мониторингом и более точными сигналами тревоги. Охват мониторинга должен включать базовое оборудование, операционную систему (операционная система время от времени зависает в системном процессе, из-за чего брокер симулирует смерть и не может нормально предоставлять услуги), службу брокера Kafka, клиентское приложение Kafka, кластер zookeeper, а также весь вышестоящий и нисходящая цепь мониторинг дорог.

1.4.1 Аппаратный мониторинг

Мониторинг сети:

Основные индикаторы включают входящий сетевой трафик, исходящий сетевой трафик, потерю сетевых пакетов, повторную передачу по сети, количество TCP-соединений в течение TIME.WAIT, коммутаторы, пропускную способность машинного зала и мониторинг DNS-сервера (если DNS-сервер неисправен, черные дыры трафика может произойти, что приведет к провалу бизнеса в большой области) и т. д.

Мониторинг диска:

Основные индикаторы включают в себя мониторинг записи на диск, чтение с диска (если нет задержки в потреблении или только небольшая задержка, операция чтения с диска обычно не выполняется), disk ioutil, disk iowait (если этот индикатор слишком высок, нагрузка на диск велика), дисковое пространство для хранения, плохой диск на диске, плохой блок/плохой сектор диска (плохой сектор или плохой блок приведет к тому, что брокер окажется в полумертвом состоянии, а потребитель застрянет из-за проверки crc ), так далее.

Мониторинг процессора:

Отслеживайте скорость простоя/нагрузку ЦП, отказ материнской платы и т. д. Обычно низкое использование ЦП не является узким местом Kafka.

Мониторинг памяти/подкачки:

Использование памяти, сбои памяти. В общем, за исключением памяти кучи, выделяемой при запуске брокера Kafka, вся остальная память на сервере в основном используется как PageCache.

Мониторинг коэффициента попаданий в кэш:

Потому что, если диск читает Kafka, это сильно влияет на производительность, поэтому нам нужно следить за частотой попаданий в кеш PageCache Linux, если скорость попадания в кеш, то потребительская база попаданий в кеш.

Подробности читайте в статье: "Применение настройки кэша страниц Linux в Kafka".

Системный журнал:

Нам нужно отслеживать и предупреждать журнал ошибок операционной системы, чтобы вовремя обнаруживать некоторые аппаратные сбои.

1.4.2 Мониторинг брокерских услуг

Мониторинг службы брокера в основном осуществляется путем указания порта JMX при запуске службы брокера, а затем сбора индикаторов JMX путем реализации набора программ сбора индикаторов. (Адрес официального сайта серверного индикатора)

мониторинг на уровне брокера: процесс брокера, размер/количество записей входящего трафика брокера, размер байта исходящего трафика брокера/номер записи, входящий трафик синхронизации реплики, исходящий трафик синхронизации реплики, отклонение трафика между брокерами, количество подключений брокера, количество очередей запросов брокера, брокер уровень простоя сети, задержка производства брокера, задержка потребления брокера, количество запросов производства брокера, количество запросов потребления брокера, количество лидеров, распределенных на брокере, количество реплик, распределенных на брокере, дисковый трафик на брокере, GC брокера и т. д. .

тематический мониторинг: размер/число записей в байтах входящего трафика темы, размер/номер записи исходящего трафика темы, тема без трафика, мутация трафика темы (внезапное увеличение/внезапное падение), задержка потребления темы.

мониторинг на уровне разделов: размер/количество записей в байтах входящего трафика раздела, размер/номер записи в байтах исходящего трафика раздела, отсутствующая копия раздела темы, запись о задержке потребления раздела, переключение лидера раздела, перекос данных раздела (при создании сообщений легко указать сообщение ключ при создании сообщения Вызывает перекос данных, что серьезно влияет на производительность службы Kafka), размер хранилища раздела (может управлять темами, которые слишком велики для одного раздела).

Мониторинг на уровне пользователя: размер в байтах исходящего/входящего трафика пользователя, ограниченное время исходящего/входящего трафика пользователя и внезапное изменение (внезапное увеличение/внезапное падение) пользовательского трафика.

Мониторинг журнала службы брокера: Мониторинг и оповещение журнала ошибок, распечатываемого на стороне сервера, для своевременного обнаружения исключений службы.

1.4.3. Мониторинг клиентов

Мониторинг клиентов в основном сам по себе реализует набор процедур отчетности по показателям, которые необходимо внедрить

Интерфейс org.apache.kafka.common.metrics.MetricsReporter. Затем добавьте элемент конфигурации metric.reporters в конфигурацию производителя или потребителя следующим образом:

Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
 
//ClientMetricsReporter类实现org.apache.kafka.common.metrics.MetricsReporter接口
props.put(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, ClientMetricsReporter.class.getName());
...

Адрес официального сайта клиентского индикатора:

kafka.apache.org/21/документ…

kafka.apache.org/21/документ…

kafka.apache.org/21/документ…

kafka.apache.org/21/документ…

kafka.apache.org/21/документ…

kafka.apache.org/21/документ…

Архитектура процесса мониторинга клиента показана ниже:

1.4.3.1 Мониторинг клиента производителя

измерение: имя пользователя, идентификатор клиента, IP-адрес клиента, имя темы, имя кластера, IP-адрес брокера;

показатель: количество подключений, время ожидания ввода-вывода, размер производственного трафика, количество производственных записей, количество запросов, задержка запроса, ошибки/повторные попытки отправки и т. д.

1.4.3.2 Мониторинг клиентов-потребителей

измерение: имя пользователя, идентификатор клиента, IP-адрес клиента, имя темы, имя кластера, группа потребителей, IP-адрес брокера, раздел темы;

показатель: количество подключений, время ожидания ввода-вывода, размер потребляемого трафика, количество записей потребления, задержка потребления, запись задержки потребления раздела темы и т. д.

1.4.4 Мониторинг зоопарка

  1. Мониторинг процессов Zookeeper;

  2. Мониторинг переключения лидеров Zookeeper;

  3. Мониторинг журнала ошибок службы Zookeeper;

1.4.5 Полный мониторинг канала

Когда канал передачи данных очень длинный (например: бизнес-приложение->скрытый SDk->сбор данных->Kafka->вычисления в реальном времени->бизнес-приложение), нам обычно приходится неоднократно связываться и проверять несколько команд, чтобы найти В каком звене возникает проблема, эффективность устранения неполадок относительно низкая. В этом случае нам нужно разобраться с мониторингом всего линка вместе с восходящим и нисходящим потоком. При возникновении проблемы найдите ссылку, по которой проблема возникает в первый раз, и сократите время обнаружения проблемы и устранения неисправности.

1.5 Изоляция ресурсов

1.5.1 Физическая изоляция различных бизнес-ресурсов в одном кластере

Мы физически изолируем группы ресурсов для разных бизнесов во всех кластерах, чтобы избежать взаимного влияния между бизнесами. Здесь мы предполагаем, что в кластере есть 4 брокерских узла (Брокер1/Брокер2/Брокер3/Брокер4), 2 бизнеса (Бизнес A/Бизнес B), и они имеют распределение разделов по темам, как показано на рисунке ниже, обе бизнес-темы разбросаны по На каждом брокере кластера, а также кроссовер на уровне диска.

Представьте, что одна из наших служб работает ненормально, например, из-за внезапного увеличения трафика, что приводит к ненормальному состоянию или приостановке работы узла брокера. В это время будет затронут и другой бизнес, что сильно повлияет на доступность наших услуг, вызовет сбои и расширит масштаб сбоев.

В ответ на эти болевые точки мы можем изолировать физические ресурсы служб в кластере, каждая служба имеет эксклюзивные ресурсы и разделяет группы ресурсов (здесь четыре брокера разделены на две группы ресурсов, Group1 и Group2). распределен в своей собственной группе ресурсов. Когда одна из служб выходит из строя, это не повлияет на другую. Это может эффективно уменьшить масштаб наших сбоев и повысить доступность службы.

1.6 Кластерная классификация

Мы делим кластеры на кластеры журналов, кластеры мониторинга, кластеры биллинга, поисковые кластеры, офлайн-кластеры и онлайн-кластеры в соответствии с бизнес-характеристиками.Бизнесы в разных сценариях помещаются в разные кластеры, чтобы избежать взаимного влияния между разными бизнесами.

1.7 Увеличение/уменьшение емкости

1.7.1 Раздел расширения темы

По мере роста объема данных темы количество разделов, указанное изначально созданной нами темой, может больше не соответствовать требованиям к объему и трафику, поэтому нам необходимо расширить раздел темы. Есть несколько вещей, которые следует учитывать при расширении раздела:

Необходимо следить за тем, чтобы опрос лидера и фолловера топика распределялся по всем брокерам в группе ресурсов, чтобы распределение трафика было более сбалансированным, при этом необходимо учитывать распределение разных реплик одного и того же раздела по стойки для повышения устойчивости к стихийным бедствиям;

При наличии остатка, когда количество лидеров секций темы делится на количество узлов группы ресурсов, лидер оставшейся секции должен быть отдан в приоритет брокерам с более низким трафиком.

1.7.2 Брокер онлайн

Поскольку объем бизнеса увеличивается, а объем данных продолжает расти, нашему кластеру также необходимо расширять емкость узлов-брокеров. Что касается масштабирования, нам необходимо реализовать следующее:

Интеллектуальная оценка расширения емкости: в зависимости от нагрузки кластера оценка необходимости расширения емкости запрограммирована и интеллектуальна;

Интеллектуальное расширение емкости: после оценки потребности в расширении емкости выберите платформу для процесса расширения емкости и балансировки трафика.

1.7.3 Брокер в автономном режиме

В некоторых сценариях нам необходимо отключить нашего брокера, в основном это следующие сценарии:

Некоторые устаревшие серверы должны быть отключены, чтобы реализовать платформизацию узла в автономном режиме;

Сбой сервера, сбой брокера не может быть восстановлен, нам нужно отключить отказавший сервер, чтобы реализовать платформизацию узла в автономном режиме;

Сервер с лучшей конфигурацией заменяет существующий узел брокера, чтобы реализовать платформизацию автономного узла.

1.8 Балансировка нагрузки

Зачем нужна балансировка нагрузки? Во-первых, давайте посмотрим на первое изображение.На следующем рисунке показано распределение трафика группы ресурсов в нашем кластере сразу после расширения.Трафик не может автоматически распределяться между нашими недавно расширенными узлами. В настоящее время нам нужно вручную инициировать миграцию данных и перенести некоторые копии на новый узел, чтобы достичь баланса трафика.

Далее давайте посмотрим на вторую картинку. Из этой картинки видно, что распределение потока очень неравномерно, а отклонение между минимальным и максимальным расходом более чем в несколько раз. Это связано с архитектурными особенностями Kafka, когда масштаб кластера и объем данных достигают определенного объема, неизбежно возникают проблемы. В этом случае нам также необходимо выполнить балансировку нагрузки.

Давайте посмотрим на третью картинку. Здесь мы видим, что только некоторые узлы резко увеличивают исходящий трафик, это связано с тем, что тематические партиции недостаточно разбросаны внутри кластера, а распределены централизованно по нескольким брокерам, в этом случае нам также нужно расширить партицию и сбалансировать .

Наше идеальное распределение трафика должно быть таким, как показано на рисунке ниже.Отклонение трафика между узлами очень мало.В этом случае это может не только повысить способность кластера противостоять аномальным внезапным увеличениям трафика, но и улучшить общий ресурс стабильность использования и обслуживания кластера, снижение затрат.

Для балансировки нагрузки нам необходимо добиться следующих эффектов:

1) Создать план миграции копии и выполнить платформу задач миграции, автоматизацию и интеллект;

2) После выполнения балансировки трафик между брокерами относительно равномерен, и один тематический раздел равномерно распределяется по всем узлам брокера;

3) После выполнения балансировки трафик между несколькими дисками внутри брокера относительно сбалансирован;

Для достижения этого эффекта нам необходимо разработать собственный набор инструментов балансировки нагрузки, таких как вторичная разработка круиз-контроля с открытым исходным кодом; ядром этого инструмента является стратегия генерации планов миграции, которая напрямую влияет на конечный эффект балансировки нагрузки кластера. Справочное содержание:

1. linkedIn/cruise-control

2. Introduction to Kafka Cruise Control

3. Cloudera Cruise Control REST API Reference

Схема архитектуры круиз-контроля выглядит следующим образом:

При создании плана миграции нам необходимо учитывать следующие моменты:

1) Выберите основные показатели в качестве основы для создания плана миграции, такие как исходящий трафик, входящий трафик, стойки и разброс разделов одной темы;

2) оптимизировать выборки индикаторов, используемые для создания плана миграции, например, отфильтровать ненормальные выборки, такие как внезапное увеличение/падение/нулевое падение трафика;

3) все образцы, используемые в плане миграции каждой группы ресурсов, являются внутренними образцами группы ресурсов, никакие другие группы ресурсов не задействованы и нет дублирования;

4) Управление слишком большой темой в одном разделе, чтобы распределение разделов темы было более рассредоточенным, трафик не концентрировался на некоторых брокерах, а объем данных одного раздела темы был меньше, что может уменьшить объем данных, подлежащих переносу, и повышение скорости переноса;

5) Для тем, которые были равномерно рассредоточены по группе ресурсов, добавить их в черный список миграции и не мигрировать, что может уменьшить объем переносимых данных и повысить скорость миграции;

6) Делать управление темами, чтобы исключить вмешательство долговременных не траффичных тем на баланс;

7) При расширении новой темы или раздела темы следует опрашивать все разделы на всех узлах-брокерах, после опроса оставшиеся разделы распределять преимущественно между брокерами с меньшим трафиком;

8) Когда балансировка нагрузки включена после расширения узла брокера, тому же брокеру отдается приоритет для выделения одного и того же большого потока (большой поток, а не большое пространство для хранения, что можно рассматривать как пропускную способность в секунду) тема нескольких лидеров разделов, и перенести некоторые из них на новый узел брокера;

9) При подаче задачи миграции отклонение размера данных раздела в одном и том же пакете планов миграции должно быть как можно меньше, чтобы избежать длительного ожидания миграции большого раздела после миграции небольшой раздел в задаче миграции, что приведет к перекосу задачи;

1.9 Сертификация безопасности

Доступ к нему бесплатный для всех в нашем кластере? Конечно нет, для безопасности кластера нам необходимо выполнить авторизационную аутентификацию для блокировки незаконных операций. В основном он включает следующие аспекты, требующие сертификации безопасности:

(1) сертификация органа производителя;

(2) проверка подлинности потребителя;

(3) Укажите сертификацию безопасности миграции каталогов данных;

Адрес официального сайта:kafka.apache.org

1.10 Аварийное восстановление кластера

Аварийное восстановление между стойками:

Адрес официального сайта:kafka.apache.org

Межкластерное/компьютерное аварийное восстановление: Если есть бизнес-сценарии, такие как удаленный активный-активный, вы можете обратиться к MirrorMaker 2.0 Kafka 2.7.

Адрес гитхаба:github.com

Точный адрес КИП:cwiki.apache.org

Восстановление метаданных Kafka в кластере ZooKeeper: мы будем регулярно создавать резервные копии данных информации о разрешениях в ZooKeeper и использовать их для восстановления, когда метаданные кластера ненормальны.

1.11 Оптимизация параметров/конфигурации

Оптимизация параметров брокерского сервиса: Здесь я перечисляю только некоторые основные параметры, влияющие на производительность.

num.network.threads
#创建Processor处理网络请求线程个数,建议设置为broker当CPU核心数*2,这个值太低经常出现网络空闲太低而缺失副本。
 
num.io.threads
#创建KafkaRequestHandler处理具体请求线程个数,建议设置为broker磁盘个数*2
 
num.replica.fetchers
#建议设置为CPU核心数/4,适当提高可以提升CPU利用率及follower同步leader数据当并行度。
 
compression.type
#建议采用lz4压缩类型,压缩可以提升CPU利用率同时可以减少网络传输数据量。
 
queued.max.requests
#如果是生产环境,建议配置最少500以上,默认为500。
 
log.flush.scheduler.interval.ms
log.flush.interval.ms
log.flush.interval.messages
#这几个参数表示日志数据刷新到磁盘的策略,应该保持默认配置,刷盘策略让操作系统去完成,由操作系统来决定什么时候把数据刷盘;
#如果设置来这个参数,可能对吞吐量影响非常大;
 
auto.leader.rebalance.enable
#表示是否开启leader自动负载均衡,默认true;我们应该把这个参数设置为false,因为自动负载均衡不可控,可能影响集群性能和稳定;

Оптимизация производства: Здесь я перечисляю только некоторые основные параметры, влияющие на производительность.

linger.ms
#客户端生产消息等待多久时间才发送到服务端,单位:毫秒。和batch.size参数配合使用;适当调大可以提升吞吐量,但是如果客户端如果down机有丢失数据风险;
 
batch.size
#客户端发送到服务端消息批次大小,和linger.ms参数配合使用;适当调大可以提升吞吐量,但是如果客户端如果down机有丢失数据风险;
 
compression.type
#建议采用lz4压缩类型,具备较高的压缩比及吞吐量;由于Kafka对CPU的要求并不高,所以,可以通过压缩,充分利用CPU资源以提升网络吞吐量;
 
buffer.memory
#客户端缓冲区大小,如果topic比较大,且内存比较充足,可以适当调高这个参数,默认只为33554432(32MB)
 
retries
#生产失败后的重试次数,默认0,可以适当增加。当重试超过一定次数后,如果业务要求数据准确性较高,建议做容错处理。
 
retry.backoff.ms
#生产失败后,重试时间间隔,默认100ms,建议不要设置太大或者太小。

В дополнение к оптимизации некоторых основных параметров нам также необходимо учитывать количество разделов темы и время хранения темы; если количество разделов слишком мало, а время хранения слишком велико, но объем записываемых данных очень велик, могут вызвать следующие проблемы:

1) Тематические разделы сосредоточены на нескольких узлах-брокерах, что приводит к дисбалансу реплик трафика;

2) Нагрузка на чтение и запись некоторых дисков внутри узла брокера перегружена, и запись в хранилище идет на разрыв;

1.11.1 Оптимизация потребления

Самая большая проблема потребления, и проблема, которая часто возникает, — это задержка потребления, подтягивание исторических данных. При извлечении большого объема исторических данных будет происходить большое количество операций чтения с диска, загрязняющих кеш страниц, что увеличит нагрузку на диск и повлияет на производительность и стабильность кластера;

Как мы можем уменьшить или избежать больших задержек потребления?

1) Когда объем данных темы очень велик, рекомендуется запустить поток для потребления в разделе;

2) Добавьте сигналы мониторинга к задержкам потребления тем, чтобы вовремя обнаруживать и устранять их;

3) Когда данные темы могут быть отброшены и столкнуться с большой задержкой, например, запись задержки в одном разделе превышает 10 миллионов или даже сотни миллионов, тогда точка потребления темы может быть сброшена для экстренной обработки; [Это решение обычно используется в экстремальных сценариях]

4) Избегайте сброса смещения раздела темы в очень раннее положение, что может привести к извлечению большого количества исторических данных;

1.11.2 Оптимизация параметров Linux-сервера

Нам нужно оптимизировать дескрипторы файлов Linux, кэш страниц и другие параметры. Ссылаться на "Применение настройки кэша страниц Linux в Kafka".

1.12 Аппаратная оптимизация

Оптимизация диска

Если позволяют условия, твердотельные накопители SSD можно использовать вместо механических жестких дисков HDD, чтобы решить проблему низкой производительности операций ввода-вывода механических дисков; если нет твердотельного накопителя SSD, можно сделать жесткий RAID (обычно RAID10) для несколько жестких дисков на сервере. , чтобы нагрузка ввода-вывода узла брокера была более сбалансированной. Если это механический жесткий диск HDD, брокер может установить несколько жестких дисков, например, 12*4 ТБ.

ОЗУ

Поскольку Kafka — это служба высокочастотного чтения и записи, а запросы на чтение и запись в Linux в основном проходят через кэш страниц, большая одноузловая память значительно повысит производительность. Обычно выбирайте 256 ГБ или выше.

Интернет

Увеличьте пропускную способность сети: если позволяют условия, чем больше пропускная способность сети, тем лучше. Потому что таким образом пропускная способность сети не станет узким местом в производительности, и по крайней мере 10-гигабитная сеть (10Gb, сетевая карта полнодуплексная) может иметь относительно высокую пропускную способность. Если это один канал, теоретический верхний предел суммы исходящего и входящего сетевого трафика составляет 1,25 ГБ/с; если это дуплексный двухканальный, теоретическое значение сетевого входящего и исходящего трафика может достигать 1,25 ГБ/с. с.

Маркировка сетевой изоляции: поскольку в компьютерном зале могут быть развернуты как автономные кластеры (такие как HBase, Spark, Hadoop и т. д.), так и кластеры реального времени (такие как Kafka). Тогда кластер реального времени и автономный кластер, установленный на одном коммутаторе, будут конкурировать за пропускную способность сети, и автономный кластер может влиять на кластер реального времени. Поэтому нам нужно изолировать уровень коммутатора, чтобы автономные машины и кластеры реального времени не подключались к одному и тому же коммутатору. Даже если он смонтирован под тем же коммутатором, мы отметим приоритет сетевого трафика (золото, серебро, медь, железо), чтобы при ограниченной пропускной способности сети приоритет отдавался бизнесу в реальном времени.

CPU

Узким местом Kafka является не ЦП, как правило, на один узел достаточно ЦП с 32 ядрами.

1.13 Платформизация

Теперь вопрос в том, как мы уже упоминали много мониторинга, оптимизации и других средств, мы бизнес-пользователя или администратора для всех операций, необходимых для кластерного сервера кластера? Конечно, нет, нам нужны богатые возможности платформы для поддержки. С одной стороны, это повышает эффективность наших операций, а с другой стороны, также повышает стабильность работы кластера и снижает вероятность ошибки.

Управление конфигурацией

Работает черный экран, каждый раз, когда изменяется файл конфигурации server.properties брокера, нет записи об изменении, которую нужно отследить.Иногда некоторые ошибки могут быть вызваны тем, что кто-то изменил конфигурацию кластера, но соответствующие записи не могут быть найдены. Если мы реализуем управление конфигурацией на платформе, каждое изменение можно будет отследить, и риск ошибок при изменении снизится.

последовательный перезапуск

Когда нам нужно внести онлайн-изменения, иногда нам нужно выполнить последовательный перезапуск на нескольких узлах в кластере.Если мы работаем в командной строке, эффективность становится очень низкой, и требуется ручное вмешательство, что тратит впустую рабочую силу. В настоящее время нам необходимо создать платформу для этой повторяющейся работы, чтобы повысить нашу операционную эффективность.

Управление кластером

Управление кластером в основном реализует ряд операций, которые изначально выполнялись в командной строке на платформе, так что пользователям и администраторам больше не нужно управлять кластером Kafka на черном экране, это имеет следующие преимущества:

повысить эффективность работы;

Вероятность ошибок при работе меньше, а кластер более защищен;

Все операции прослеживаемы и прослеживаемы;

Управление кластером в основном включает в себя: управление брокером, управление темами, управление полномочиями производства/потребления, управление пользователями и т. д.

1.13.1 фиктивная функция

Обеспечьте тему пользователя функцией создания выборки данных и выборки потребления на платформе.Пользователи могут проверить, можно ли использовать тему и являются ли разрешения нормальными, без написания кода самостоятельно;

Обеспечьте тему пользователя функцией проверки разрешения на производство/потребление на платформе, чтобы пользователь мог определить, есть ли у его учетной записи разрешение на чтение и запись в тему;

1.13.2 Управление правами

Платформенное управление правами чтения/записи пользователей и другие связанные операции.

1.13.3 Увеличение/уменьшение емкости

Узел брокера находится в сети и в автономном режиме на платформе, и всем узлам в сети и в автономном режиме больше не требуется работать с командной строкой.

1.13.4 Управление кластером

1) Управление темами без трафика, очистка тем в кластере без трафика и снижение нагрузки на кластер, вызванной слишком большим количеством бесполезных метаданных;

2) Чтобы управлять размером данных тематических разделов, отсортируйте темы с чрезмерным объемом данных тематических разделов (например, объем данных одного раздела превышает 100 ГБ/день), чтобы увидеть, необходимо ли расширение, чтобы избежать концентрации данных на некоторых узлах. кластера;

3) Управление перекосом данных раздела темы, чтобы клиент не указывал ключ сообщения при создании сообщения, но ключ слишком сконцентрирован, и сообщение сосредоточено только в некоторых разделах, что приводит к перекосу данных;

4) Децентрализованное управление тематическими разделами, так что тематические разделы распределяются по как можно большему количеству брокеров в кластере, что может избежать риска концентрации трафика на нескольких узлах из-за внезапного увеличения трафика темы, а также может избежать Влияние исключения брокера на тему Очень большое;

5) Управление задержкой потребления разделов темы: как правило, существует две ситуации, когда происходит большое отложенное потребление: во-первых, производительность кластера снижается, а во-вторых, параллелизм потребления бизнес-стороны недостаточен. Потребитель недостаточно параллелен Должен быть связан с бизнесом, чтобы увеличить параллелизм потребления.

1.13.5 Мониторинг аварийных сигналов

1) Превратите всю коллекцию индексов в настраиваемую платформу, предоставьте унифицированную коллекцию индексов, отображение индексов и платформу сигналов тревоги, а также реализуйте интегрированный мониторинг;

2) связывать восходящие и нисходящие службы для осуществления полноканального мониторинга;

3) Пользователи могут настраивать сигналы мониторинга, такие как задержка трафика темы или раздела, мутация и т. д.;

1.13.6 Бизнес большой экран

Основные показатели бизнес-экрана: количество кластеров, количество узлов, ежедневный входящий трафик, ежедневный входящий трафик, трафик восхода солнца, трафик восхода солнца, входящий трафик в секунду, входящий трафик в секунду и размер исходящего трафика в секунду, количество записей исходящего трафика в секунду, количество пользователей, задержка производства, задержка потребления, надежность данных, доступность службы, размер хранилища данных, количество групп ресурсов, количество тем, количество разделов, количество реплик и индикаторы групп потребления, такие как числа.

1.13.7 Ограничение потока

Пользовательский трафик теперь находится на платформе, и на платформе выполняется интеллектуальная обработка ограничения тока.

1.13.8 Балансировка нагрузки

Функция автоматической балансировки нагрузки реализована на платформе, а планирование и управление осуществляются через платформу.

1.13.9 Бюджет ресурсов

Когда кластер достигает определенного масштаба, скорость потока растет, то где машина расширения кластера? Бюджет бизнес-ресурсов позволяет нескольким предприятиям в кластере разделить затраты на оборудование всего кластера в соответствии с их трафиком в кластере; конечно, независимые кластеры и независимые группы ресурсов, методы бюджета могут рассчитываться отдельно.

1.14 Оценка производительности

1.14.1 Оценка производительности одного брокера

Цель нашей оценки производительности одного брокера включает следующие аспекты:

1) Обеспечить основу для нашей оценки приложений ресурсов;

2) Сообщите нам больше о возможностях чтения и записи кластера и о том, где находится узкое место, и выполните оптимизацию для узкого места;

3) Обеспечить основу для нашей текущей пороговой настройки ограничения;

4) предоставить основу для нашей оценки того, когда нам следует расширяться;

1.14.2 Оценка производительности тематического раздела

1) При создании темы для нас оценка должна дать разумную основу для того, сколько разделов следует указать;

2) Обеспечить основу для оценки расширения раздела нашей темы;

1.14.3 Оценка производительности одного диска

1) Чтобы мы поняли реальные возможности чтения и записи на диске и предоставили нам основу для выбора более подходящего типа диска для Kafka;

2) Предоставить нам основу для установки порога оповещения о трафике диска;

1.14.4 Исследование пределов размера кластера

1) Нам необходимо понять верхний предел размера отдельного кластера или верхний предел размера метаданных и изучить влияние соответствующей информации на производительность и стабильность кластера;

2) Согласно тщательному расследованию, оценить разумный диапазон размера узла кластера, вовремя предсказать риски и выполнить такую ​​работу, как разделение сверхбольших кластеров;

1.15 Сетевая архитектура DNS+LVS

Когда наши узлы кластера достигают определенного масштаба, например сотни узлов-брокеров в одном кластере, тогда, когда мы указываем конфигурацию bootstrap.servers для рабочих и потребительских клиентов, что, если мы указываем ее? Вы выбираете несколько конфигураций брокера или все сразу?

На самом деле вышеперечисленные способы не подходят, если настроено только несколько IP, то при настройке нескольких узлов брокера на выход в офлайн наше приложение не сможет подключиться к кластеру Kafka, если настроены все IP, то даже больше нереально, сотни IP, так что же делать?

строить планы: Используя сетевую архитектуру DNS+LVS, конечным клиентам-производителям и клиентам-потребителям нужно только настроить доменное имя. Следует отметить, что при присоединении новой ноды к кластеру необходимо добавить карту, при отключении ноды ее необходимо выкинуть из карты, в противном случае, если эти машины используются в другом месте, если порт один и тот же как и у Kafka, получается, что некоторые запросы из кластера будут отправляться на автономный сервер, вызывая сбои ключей в производственной среде.

2. Функциональные дефекты версии с открытым исходным кодом

Основными особенностями протокола RTMP являются: мультиплексирование, пакетирование и протокол прикладного уровня. Эти особенности будут подробно описаны ниже.

2.1 Миграция копирования

Добавочная миграция невозможна; [мы внедрили добавочную миграцию на основе преобразования исходного кода версии 2.1.1]

Невозможно реализовать параллельную миграцию; [версия с открытым исходным кодом не реализовывала параллельную миграцию до версии 2.6.0]

Невозможно завершить миграцию; [Мы реализовали прекращение миграции копии на основе преобразования исходного кода версии 2.1.1] [Версия с открытым исходным кодом не реализовала приостановку миграции до версии 2.6.0, что несколько отличается от прекращения миграции и не будет откатывать метаданные]

При указании каталога данных миграции во время процесса миграции, если время хранения темы сокращается, время хранения темы не будет действовать для переносимого раздела темы, а просроченные данные раздела темы не могут быть удалены; [Открытый исходный код ошибка версии, еще не исправлена]

Когда указан каталог данных миграции, когда план миграции находится в следующих сценариях, вся задача миграции не может завершить миграцию и всегда зависает; [Ошибка версии с открытым исходным кодом, еще не исправлена]

В процессе миграции, если узел брокера перезапущен, все ведущие разделы на этом узле брокера не могут быть переключены обратно, в результате чего весь трафик узла будет передан другим узлам, и лидер не переключится обратно, пока не будут перенесены все реплики; [ ошибка версии с открытым исходным кодом, в настоящее время не исправлена].

在原生的Kafka版本中存在以下指定数据目录场景无法迁移完毕的情况,此版本我们也不决定修复次bug:
 
1.针对同一个topic分区,如果部分目标副本相比原副本是所属broker发生变化,部分目标副本相比原副本是broker内部所属数据目录发生变化;
那么副本所属broker发生变化的那个目标副本可以正常迁移完毕,目标副本是在broker内部数据目录发生变化的无法正常完成迁移;
但是旧副本依然可以正常提供生产、消费服务,并且不影响下一次迁移任务的提交,下一次迁移任务只需要把此topic分区的副本列表所属broker列表变更后提交依然可以正常完成迁移,并且可以清理掉之前未完成的目标副本;
 
这里假设topic yyj1的初始化副本分布情况如下:
 
{
"version":1,
"partitions":[
{"topic":"yyj","partition":0,"replicas":[1000003,1000001],"log_dirs":["/kfk211data/data31","/kfk211data/data13"]}
]
}
//迁移场景1:
{
"version":1,
"partitions":[
{"topic":"yyj","partition":0,"replicas":[1000003,1000002],"log_dirs":["/kfk211data/data32","/kfk211data/data23"]}
]
}
 
//迁移场景2:
{
"version":1,
"partitions":[
{"topic":"yyj","partition":0,"replicas":[1000002,1000001],"log_dirs":["/kfk211data/data22","/kfk211data/data13"]}
]
}
针对上述的topic yyj1的分布分布情况,此时如果我们的迁移计划为“迁移场景1”或迁移场景2“,那么都将出现有副本无法迁移完毕的情况。
但是这并不影响旧副本处理生产、消费请求,并且我们可以正常提交其他的迁移任务。
为了清理旧的未迁移完成的副本,我们只需要修改一次迁移计划【新的目标副本列表和当前分区已分配副本列表完全不同即可】,再次提交迁移即可。
 
这里,我们依然以上述的例子做迁移计划修改如下:
{
"version":1,
"partitions":[
{"topic":"yyj","partition":0,"replicas":[1000004,1000005],"log_dirs":["/kfk211data/data42","/kfk211data/data53"]}
]
}
这样我们就可以正常完成迁移。

2.2 Протокол трафика

Детализация текущего ограничения является грубой, недостаточно гибкой, достаточно точной и недостаточно интеллектуальной.

Текущая ограничивающая комбинация размеров

/config/users/<user>/clients/<client-id>
/config/users/<user>/clients/<default>
/config/users/<user>
/config/users/<default>/clients/<client-id>
/config/users/<default>/clients/<default>
/config/users/<default>
/config/clients/<client-id>
/config/clients/<default>

Существует проблема

Когда есть несколько пользователей одновременно, много производства и потребления на одном и том же брокере, брокер может захотеть, чтобы он работал правильно, он должен разрешать все пороги пользовательского трафика и когда не превышать текущий лимит пропускной способности, лимит брокера; если больше выше верхнего предела брокера, есть риск того, что брокер был бит зависшим; однако, даже если поток пользовательского трафика не достигает верхнего предела брокера, но если весь пользовательский трафик сосредоточить на нескольких дисках, читающих и записывающих больше, чем нагрузка на диске может привести к тому, что все производство, запрос на потребление будет заблокирован, брокер может находиться в состоянии анабиоза.

решение

(1) Измените исходный код, чтобы реализовать верхний предел трафика одного брокера.Пока трафик достигает верхнего предела брокера, обработка текущего лимита выполняется немедленно, и все пользователи, которые пишут этому брокеру, могут быть ограниченным, или пользователь имеет приоритет, и ему присвоен высокий приоритет, ограничить низкий приоритет;

(2) Преобразовать исходный код, чтобы реализовать верхний предел трафика одного диска на брокере (во многих случаях трафик концентрируется на нескольких дисках, в результате чего не достигается верхний предел трафика брокера, но превышается верхний предел емкости чтения и записи одного диска), пока трафик диска достигает верхнего предела, обработка текущего предела выполняется немедленно, и все пользователи, записывающие на этот диск, могут быть ограничены; или пользователь имеет приоритет , высокоприоритетные удаляются, а низкоприоритетные ограничиваются;

(3) Преобразование исходного кода для реализации текущего ограничения размера темы и функции запрета записи для раздела темы;

(4) Преобразование исходного кода для достижения точного текущего ограничения пользователя, брокера, диска, темы и других параметров;

Три, тенденция развития кафки

3.1 План итерации сообщества Kafka

3.2 Постепенный отказ от ZooKeeper (KIP-500)

3.3 Контроллер отделен от брокера, а в качестве арбитражного механизма контроллера введен протокол плота (KIP-630)

3.4 Многоуровневое хранилище (КИП-405)

3.5 Можно уменьшить тематические разделы (KIP-694)

3.6 MirrorMaker2 ровно один раз (KIP-656)

3.7 Загрузка и описание функций каждой версии

3.8 Kafka все адреса KIP

4. Как внести свой вклад в сообщество

4.1 Какие точки могут способствовать

kafka.apache.org/cont я вас не упоминаю...

4.2 адрес вклада вики

Из wiki.Apache.org/confluence/...

4.3 адрес проблемы

1)Issues.apache.org/так как ah/pro является ec…

2)Issues.apache.org/sinceah/secure…

4.4 Основные коммиттеры

kafka.apache.org/committers

Автор: команда интернет-сервера vivo - Ян Ицзюнь