Вы знаете эти принципы Кафки?

задняя часть Kafka

Знание внутренней работы Kafka не обязательно, если вы просто разрабатываете приложения Kafka или просто используете Kafka в производстве. Однако понимание внутренней работы Kafka помогает понять поведение Kafka, а также быстро диагностировать проблемы. Давайте обсудим эти три вопроса

  • Как Кафка копирует
  • Как Kafka обрабатывает запросы от производителей и потребителей
  • Каковы детали хранения Kafka

Если вы заинтересованы, пожалуйста, не торопитесь и терпеливо прочитайте эту статью.

Отношения между членами кластера

Мы знаем, что Kafka запускается на ZooKeeper, потому что ZooKeeper — это форма кластеров, поэтому Kafka также присутствует для формирования кластера. Это также включает в себя вопрос о том, как координировать несколько производителей и нескольких потребителей, связь между этим кластером поддерживается ZooKeeper для завершения. Если вы читали мою предыдущую статью (На самом деле, достаточно прочитать эту статью о начале работы с Кафкой), вы должны знать, что будет несколько主机(broker), у каждого брокера будет одинbroker.id, каждый broker.id имеет уникальный идентификатор, позволяющий его отличить.Этот идентификатор можно указать вручную в файле конфигурации или сгенерировать автоматически.

Kafka может сгенерировать новый Broker.id с помощью Broker.id.generation.enable и Reserved.broker.max.id.

Параметр Broker.id.generation.enable используется для настройки включения или выключения функции автоматической генерации Broker.id, по умолчанию true, то есть эта функция включена. Автоматически созданный Broker.id имеет значение по умолчанию 1000, что означает, что автоматически созданный Broker.id начинается с 1001 по умолчанию.

Kafka будет в ZooKeeper при запуске/brokers/idsЗарегистрируйте временный узел с тем же идентификатором, что и у текущего брокера, по пути. Проверка работоспособности Kafka зависит от этого узла. Эти компоненты уведомляются, когда брокер присоединяется к кластеру или покидает его.

  • Если бы вы запустили другого брокера с тем же идентификатором, вы бы получили сообщение об ошибке — новый брокер попытается зарегистрироваться, но безуспешно, потому что в ZooKeeper уже есть брокер с таким же идентификатором.
  • Когда брокер не работает, возникает раздел или длительная пауза сборки мусора, брокер будет отключен от ZooKeeper, а временные узлы, созданные брокером при запуске, будут удалены из ZooKeeper. Компоненты Kafka, прослушивающие список брокеров, будут уведомлены об удалении брокера.
  • Когда брокер закрывается, соответствующий ему узел также исчезает, но его идентификатор продолжает существовать в других структурах данных, таких как список реплик темы, а список реплик будет скопирован позже. После полного закрытия брокера, если вы запустите другого совершенно нового брокера с тем же идентификатором, он немедленно присоединится к кластеру и будет иметь тот же раздел и тему, что и старый брокер.

Роль контролера брокера

Когда мы говорили о Kafka Rebalance ранее, мы упомянули координатора группы, который отвечает за координацию отношений между группами, а также координатора группы между брокерами.Компонент контроллера (Controller), который является основным компонентом Kafka. Его основная роль заключается в управлении и координации всего кластера Kafka с помощью ZooKeeper.Каждый брокер в кластере может называться контроллером, но после запуска кластера Kafka только один брокер станет контроллером. Так как кластер Kafka зависим от кластера ZooKeeper, необходимо ввести ZooKeeper какой, можете обратиться к статье автора (ZooKeeper — это не просто реестр, что еще вы знаете?) для деталей, просто кратко упомянутых здесьznodeПроблема с узлом.

Данные ZooKeeper хранятся на узлах, каждый узел также называетсяznode, узел znode представляет собой древовидную файловую структуру, которая очень похожа на путь к файлу в операционной системе Linux.Корневой узел ZooKeeper/.

Znode можно разделить на временный узел и постоянный узел в соответствии с способом постоянства данных. Настойчивые узлы не исчезают из-за изменений состояния зоофидера, но эфемерные узлы автоматически исчезают с Zookeeper перезапускается.

узлы znode имеютWatcherМеханизм: при изменении данных ZooKeeper генерирует событие Watcher и отправляет его клиенту. Механизм мониторинга Watcher — очень важная функция Zookeeper. На основе узлов, созданных в Zookeeper, мы можем привязывать к этим узлам события мониторинга, такие как мониторинг изменений данных узлов, удаление узлов и изменение состояния дочерних узлов. Благодаря этому механизму событий, Распределенные блокировки, управление кластером и другие функции могут быть реализованы на базе ZooKeeper.

выборы контролера

Текущие правила Kafka для выбора контроллеров: первый брокер, запущенный в кластере Kafka, создает временный узел в ZooKeeper./controllerСделайте себя контроллером контроллера. Другие брокеры также попытаются создать этот узел при запуске, но, поскольку этот узел уже существует, они получат уведомление позже, когда захотят создать узел /controller.Узел уже существуетисключение. Затем другой брокер регистрирует объект наблюдения ZooKeeper на этом контроллере,/controllerКогда узел изменится, другие брокеры получат уведомление об изменении узла. Таким образом, вы можете убедиться, что существует только один контроллер. Тогда должна быть проблема только с одним узлом, т.е.单点问题.

Эфемерные узлы в ZooKeeper исчезают, если контроллер выходит из строя или отключается от ZooKeeper. После того, как другие узлы в кластере получат сообщение о том, что контроллер находится в автономном режиме, отправленное объектом наблюдения, другие узлы-брокеры попытаются сделать себя новым контроллером. Правила создания других узлов соответствуют принципам создания первого узла.Первый брокер, который успешно создаст узел контроллера в ZooKeeper, станет новым контроллером, затем другие узлы получат существующее исключение узла.Затем создайте часы объект снова на новом узле контроллера для прослушивания.

Роль контролера

Итак, сказав все это, что такое контроль? Какова роль контролера? Или вот такой контроллер组件Он предназначен для этого? Не волнуйтесь, мы поговорим о следующем.

Kafka спроектирован как аналоговый конечный автомат многопоточного контроллера, он может иметь следующий эффект:

  • Контроллер эквивалентен менеджеру отдела (контроллеру брокера) в отделе (кластере), который используется для управления членами отдела (брокером) в отделе.

  • Контроллер — это монитор для всех брокеров, который используется для мониторинга онлайн и офлайн брокеров.

  • После отключения брокера контроллер может выбрать нового лидера раздела.

  • Контроллер может отправлять сообщения только что выбранному Лидеру брокера.

Его можно разделить на следующие 5 пунктов.

  • 主题管理: Kafka Controller может помочь нам выполнить операции по созданию, удалению и добавлению разделов в темы Kafka.Короче говоря, он имеет наивысшее право на использование разделов.

Другими словами, когда мы выполняемСкрипт Kafka-TopicsБольшая часть фоновой работы выполняется контроллером.

  • 分区重分配: Перераспределение разделов в основном относится кскрипт kafka-reassign-partitionsОбеспечивает детальную функцию распределения для существующих тематических разделов. Эта часть функции также реализуется контроллером.

  • Prefered 领导者选举: выбор предпочтительного лидера — это в основном изменение подхода Лидера к Кафке, чтобы избежать перегрузки той части, которую предоставляет Брокер.

  • 集群成员管理: Главное управление Добавлен брокер, отключение брокера, время простоя брокера

  • 数据服务: Последней основной работой контроллера является предоставление услуг передачи данных другим брокерам. Наиболее полная информация о метаданных кластера хранится на контроллере, и все остальные брокеры будут периодически получать от контроллера запросы на обновление метаданных для обновления кэшированных данных в своей памяти. Эти данные мы обсудим ниже

Когда контроллер обнаруживает, что посредник покинул кластер (наблюдая за соответствующим путем ZooKeeper), контроллер получает сообщение о том, что для тех разделов, которыми управляет этот посредник, требуется новый лидер. Контроллер по очереди обходит каждую секцию, определяет, кто может быть новым лидером, а затем отправляет сообщение всем секциям, содержащим нового лидера или существующих последователей, сообщение запроса, содержащее информацию о том, кто является новым лидером, а кто — ведомым. Впоследствии новый Лидер начинает обрабатывать запросы от производителей и потребителей, а Последователи используются для репликации от нового Лидера.

Это очень похоже на отдел аутсорсинговой компании, этот отдел предназначен для командировок, все работают в разных местах, но в центральном офисе есть начальник отдела, и вот начальник отдела внезапно ушел. Компания не планирует нанимать внешний персонал и решает выбрать человека с сильными способностями из отдела в качестве лидера. Затем человек, который становится лидером, должен отправить сообщение членам своей команды. Это сообщение состоит в том, чтобы назначить сообщение и уточните кем он руководит.Всем Всем известно, а потом они работают на отдел.

Когда контроллер видит посредника, присоединяющегося к кластеру, он использует идентификатор посредника, чтобы проверить, содержит ли вновь присоединенный посредник реплику существующего раздела. Если есть контроллер, он отправит сообщение вновь добавленному брокеру и существующему брокеру.

Приведенное выше содержание о репликации разделов будет обсуждаться далее.

хранилище данных контроллера брокера

上面我们介绍到 broker controller 会提供数据服务,用于保存大量的 Kafka 集群数据。 Как показано ниже

Информацию, хранящуюся выше, можно разделить на три категории:

  • Вся информация о брокере, включая все разделы в брокере, все реплики разделов брокера, какие брокеры работают в данный момент, а какие закрываются.
  • Вся информация о теме, включая информацию о конкретных разделах, например, кто является ведущей репликой, какие реплики входят в набор ISR и т. д.
  • Все разделы, связанные с оперативными задачами. Содержит список разделов, в которых в настоящее время проходят выборы предпочтительного лидера и переназначение разделов.

Kafka неотделима от ZooKeeper, поэтому эта информация также сохраняется в ZooKeeper. Всякий раз, когда контроллер инициализируется, он считывает соответствующие метаданные из ZooKeeper и заполняет свой собственный кэш.

отказоустойчивость контроллера брокера

Ранее мы говорили, что первый в ZooKeeper/brokers/idsБрокер, который создает узел, используется в качестве контроллера брокера, то есть существует только один контроллер брокера, поэтому неизбежно возникает проблема с единой точкой отказа. kafka предусматривает учет этого故障转移функция, то естьFail Over. Как показано ниже

В самом начале брокер 1 превентивно зарегистрируется, чтобы стать контроллером, а затем брокер 1 отключается из-за дрожания сети или других причин, ZooKeeper обнаруживает отключение брокера 1 через механизм Watch, после чего все оставшиеся в живых брокеры начинают соревноваться, чтобы стать контроллер. В это время брокер 3 предварительно регистрируется. Если это удается, информация о контроллере, хранящаяся в ZooKeeper, поступает от брокера 1 -> брокера 3. После этого брокер 3 будет считывать информацию метаданных из ZooKeeper и инициализировать ее в своем собственном кеше.

Примечание. В ZooKeeper хранится не кэшированная информация, а в брокере хранится только кэшированная информация.

Контроллер брокера существует

До версии Kafka 0.11 дизайн контроллера был довольно громоздким. Мы упоминали выше одно предложение: контроллер Kafka предназначен для многопоточности конечной машины аналогового контроллера, эта конструкция на самом деле, есть некоторые проблемы

  • Изменения состояния контроллера выполняются одновременно разными прослушивателями, что требует сложной синхронизации, подвержено ошибкам и затрудняет отладку.
  • Распространение состояний не синхронизировано, и брокер может появляться в нескольких состояниях с неопределенным временем, что может привести к ненужной дополнительной потере данных.
  • Контроллер Контроллер также создает дополнительные потоки ввода-вывода для удаления раздела, что приводит к снижению производительности.
  • Многопоточный дизайн контроллера также будет иметь доступ к общим данным. Мы знаем, что многопоточный доступ к общим данным является наиболее сложной частью синхронизации потоков. Чтобы защитить безопасность данных, контроллер должен использовать много кода в коде. .Механизм синхронизации ReentrantLock, что еще больше снижает скорость обработки всего контроллера.

Принцип внутренней конструкции контроллера брокера

После Kafka 0.11 контроллер Kafka имеет новый дизайн,Изменена многопоточная схема на однопоточную плюс схему очереди событий.. Как показано ниже

Основные внесенные изменения заключаются в следующем:

Первым улучшением является добавлениеEvent Executor Thread, поток выполнения события, как видно из рисунка, будь то очередь событий Event Queue или контекст контроллера. Контекст контроллера будет передан потоку выполнения события для обработки. Смоделируйте все первоначально выполненные операции в независимые события и отправьте их в выделенную очередь событий для использования этим потоком.

Второе улучшение заключается в изменении всех ранее синхронизированных ZooKeeper на异步操作. API ZooKeeper предоставляет два способа чтения и записи: синхронный и асинхронный. Ранее контроллер использовал метод синхронизации для работы ZooKeeper, в этот раз метод синхронизации был изменен на асинхронный, по результатам теста эффективность повысилась в 10 раз.

Третье улучшение заключается в обработке запросов в соответствии с приоритетом.Предыдущий дизайн заключался в том, что брокер справедливо обрабатывал запросы, отправленные всеми контроллерами. Что это обозначает? Разве это не справедливо? В некоторых случаях да, например, когда брокер стоит в очереди на обработку запроса на производство, контроллер выдаетStopReplicaзапрос, что бы вы сделали? Вы все еще обрабатываете запросы на продукцию? Этот запрос на производство все еще полезен? Наиболее разумный порядок обработки в это время должен быть,Дает более высокий приоритет запросу StopReplica, чтобы его можно было обработать упреждающе.

копировальный механизм

Функция репликации является основной функцией архитектуры Kafka.В документации Kafka Kafka описывает себя какРаспределенный, разделенный на разделы, может отправлять копию службы журнала. Причина, по которой репликация так важна, заключается в том, что очень важно постоянное хранение сообщений, которое может гарантировать высокую доступность Kafka даже после выхода из строя основного узла. Механизм копирования также можно назвать备份机制(Replication), обычно относится к распределенной системе, которая сохраняет одну и ту же резервную копию/копию данных на нескольких сетевых компьютерах.

Kafka использует темы для организации данных, каждая тема разделена на несколько разделов, разделы будут развернуты на одном или нескольких брокерах, и каждый раздел будет иметь несколько копий, поэтому копии также будут сохранены на брокере, каждый брокер может хранить тысячи реплик. На следующем рисунке представлена ​​схема копии копии

Как показано на рисунке выше, для простоты я нарисовал только два брокера, каждый брокер ссылается на сообщение, которое сохраняет топик.В брокере1 раздел 0 является ведущим, который отвечает за копирование раздела и копирование раздела 0 в брокере1. Реплика раздела 0 темы A брокера2. То же самое относится и к разделу 1 темы A.

Существует два типа копий: одинLeader(领导者)копия, одинFollower(跟随者)Копировать.

Копия лидера

Kafka должен выбрать реплику при создании раздела, и выбранная реплика является ведущей репликой.

Копия подписчика

Реплики, отличные от реплики лидера, вместе называютсяFollower 副本, Follower не предоставляет внешние услуги. Вот как работает реплика Лидера

На этой картинке следует отметить следующие моменты

  • В Kafka ведомая копия, то есть ведомая копия, не предоставляет внешних сервисов. То есть ни одна из реплик-последователей не может отвечать на запросы потребителей и производителей. Все запросы обрабатываются репликой-лидером. Другими словами, все запросы должны быть отправлены брокеру, где находится копия лидера, а копия ведомого используется только для извлечения данных.异步拉取метод и записать его в свой журнал коммитов, чтобы добиться синхронизации с Лидером
  • Когда брокер, в котором находится копия-лидер, выходит из строя, Kafka может обнаружить это в режиме реального времени, полагаясь на функцию мониторинга, предоставляемую ZooKeeper, и начать новый раунд выборов, чтобы выбрать одну из копий-последователей в качестве лидера. Если отключенный брокер будет перезапущен, реплика раздела воссоединится в качестве ведомого.

Еще одна задача лидера — выяснить, состояние какого из последователей соответствует его собственному. Последователи пытаются реплицировать сообщения от лидера до поступления новых сообщений, чтобы поддерживать состояние, согласованное с лидером. В соответствии с лидером, ведомый делает запрос к лидеру для данных, которые являются той же самой информацией, которую потребитель отправляет, чтобы прочитать сообщение.

Процесс отправки ведомым сообщение ведущему выглядит следующим образом: сначала запрашивается сообщение 1, затем принимается сообщение 1, после запроса 1 отправляется запрос 2, перед получением ведущего и отправкой его ведомому ведомый не будет продолжать отправлять сообщения. Процесс выглядит следующим образом

Важно, чтобы реплики-последователи не продолжали отправлять сообщения, пока не получат ответное сообщение. Просматривая последнее смещение, запрошенное каждым последователем, лидер узнает о ходе репликации каждого последователя. Если подписчик не запрашивает никакого сообщения в течение 10 секунд или хотя подписчик отправил запрос, но не получил сообщение в течение 10 секунд, это считается不同步из. Если копия не синхронизируется с лидером, то эта копия не будет называться лидером после сброса лидера, потому что новости этой копии не все.

Напротив, если подписчики синхронизации сообщений и лидеры копии сообщения одинаковы, то эта копия также называется подписчиком.同步的副本. То есть, если лидер отброшен, лидером может называться только копия синхронизации.

Мы так много говорили о механизме копирования, так в чем же преимущества механизма копирования?

  • Вы можете сразу увидеть написанное сообщение, то есть после того, как вы успешно запишете сообщение в раздел с помощью API производителя, вы можете сразу же использовать потребитель для чтения только что написанного сообщения.
  • Что значит иметь возможность добиться идемпотентности сообщений? То есть для сообщения, сгенерированного производителем, когда потребитель потребляет, он будет видеть, что сообщение существует каждый раз, и не будет ситуации, когда сообщение не существует.

Синхронная и асинхронная репликация

Когда я изучал механизм копирования, у меня возник вопрос, так как ведущая копия и ведомая копия发送 - 等待Механически это метод синхронной репликации, так почему же это асинхронная операция, когда ведомая реплика синхронизирует ведущую реплику?

Думаю, дело в этом.После синхронизации копии лидера копия фолловера сохранит сообщение в локальном журнале.В это время фолловер отправит ответное сообщение копии лидера, сообщив лидеру, что оно было успешно сохранено, и лидер синхронной репликации. Производитель будет ждать, пока все реплики-последователи будут успешно записаны, а затем вернет сообщение об успешной записи производителю. В асинхронной репликации ведущей копии не нужно заботиться об успешной записи последующей копии.Пока ведущая копия сохраняет сообщение в локальный журнал, она возвращает успешно записанное сообщение производителю. Ниже приведен процесс синхронной репликации и асинхронной репликации.

синхронная репликация

  • Производитель уведомляет ZooKeeper для определения лидера
  • Продюсер пишет сообщения лидеру
  • После того, как лидер получит сообщение, он запишет сообщение в локальный журнал.
  • Последователи получают сообщения от лидера
  • Подписчик записывает журнал в локальный
  • Последователи отправляют успешное сообщение лидеру
  • Лидер будет получать все сообщения, отправленные подписчиками.
  • Лидер отправляет сообщение об успешной записи производителю

Асинхронная репликация

Отличие от синхронной репликации заключается в том, что лидер напрямую отправляет клиенту сообщение об успешной записи после записи в локальный журнал, не дожидаясь завершения репликации всеми последователями.

ISR

Kafka динамически поддерживает набор реплик с синхронизированным состоянием.(a set of In-Sync Replicas),简称ISR, ISR также очень важная концепция. Как мы уже говорили, копия подписчика не предоставляет услуги, но только периодически вытаскивает данные лидера копию асинхронически. Вытягивание этой операции эквивалентно копированию.ctrl-c + ctrl-vКаждый должен быть с ним знаком. Означает ли это, что количество сообщений-реплик в наборе ISR будет таким же, как и количество сообщений реплик-лидеров? Это не обязательно, решение основано на параметрах брокера.replica.lag.time.max.msЗначение этого параметра означает, что реплика-последователь может отставать от реплики-лидера на максимально длительный интервал времени.

По умолчанию время параметра replica.lag.time.max.ms равно 10 секунд. Если реплика-последователь отстает от реплики-лидера не более чем на 10 секунд, то Kafka считает лидера и ведомого синхронизированными. Даже если сообщения, хранящиеся в подчиненной реплике, в настоящее время меньше, чем в ведущей реплике. Если реплика-последователь отстает от реплики-лидера более чем на 10 секунд, реплика-последователь будет удалена из ISR. Если реплика медленно догоняет лидера, ее можно добавить обратно в ISR. Это также показывает, что ISR является динамически настраиваемым набором, а не статически инвариантным.

Нечистые выборы лидера

Так как ISR может динамически корректироваться, неизбежно будет пустой набор ISR.Поскольку ведущая копия должна появиться в наборе ISR, пустой набор ISR должен означать, что ведущая копия также мертва, поэтому в это время Kafka необходимо переустановить. -выбрать нового лидера, так как это сделать? Теперь вам нужно изменить свое мышление.Выше мы сказали, что реплики в наборе ISR должны быть синхронизированы с лидером, тогда реплики в наборе ISR, которого больше нет, должны быть репликами, которые не синхронизированы с лидером, то есть , они больше не отображаются в списке ISR.Мастер-реплика потеряет некоторые сообщения. Если открыть параметры на стороне брокераunclean.leader.election.enableЕсли да, то среди этих асинхронных реплик будет выбран следующий лидер. Эти выборы также называютсяUnclean 领导者选举.

Если вы были подвержены распределенным проектам, вы должны знать теорию CAP, то эти нечистые лидерские выборы фактически жертвуют согласованностью данных и обеспечивают высокую доступность Кафки.

Вы можете решить, включать ли нечистые выборы лидеров в соответствии с вашим реальным бизнес-сценарием.Как правило, не рекомендуется включать этот параметр, потому что согласованность данных важнее, чем доступность.

Процесс обработки запросов Kafka

Большая часть работы посредника заключается в обработке запросов от клиентов, реплик раздела и контроллеров к лидеру раздела. Этот запрос обычно请求/响应Другими словами, я предполагаю, что самым ранним запросом/ответом должен быть HTTP-запрос. На самом деле HTTP-запросы могут быть синхронными или асинхронными. Как правило, обычные HTTP-запросы являются синхронными. Самая большая особенность синхронного метода заключается в том, чтоОтправить запрос -> дождаться обработки сервером -> возврат после обработки В этот период клиентский браузер ничего не может сделать. Самая большая особенность асинхронного метода заключается в том, чтоЗапрос инициируется событием -> обрабатывается сервером (это то, что браузер все еще может делать другие вещи) -> обрабатывается.

Тогда я также могу сказать, что синхронные запросы обрабатываются последовательно, а асинхронные запросы выполняются неопределенным образом, потому что асинхронный требует создания нескольких потоков выполнения, и каждый поток выполняется в другом порядке.

Здесь следует отметить, что мы используем только HTTP-запросы в качестве примера, а Kafka использует связь на основе TCP-сокетов для связи.

Итак, каковы недостатки этих двух методов?

Я считаю, что умный должен сразу подумать, что самый большой недостаток метода синхронизации заключается в том, что吞吐量太差, использование ресурсов крайне низкое, поскольку запросы могут обрабатываться только последовательно, поэтому каждый запрос должен ожидать обработки предыдущего запроса, прежде чем он сможет быть обработан. Этот способ работает только请求发送非常不频繁的系统.

Недостатком асинхронного подхода является то, что создание потока для каждого запроса очень затратно, а в некоторых случаях может даже привести к перегрузке всей службы.

Отзывчивая модель

После стольких разговоров Кафка использует синхронный или асинхронный язык? Кроме того, Кафка использует响应式(Reactor)模型, так что же такое реактивная модель? Проще говоря,Режим Reactor — это реализация архитектуры, управляемой событиями, которая особенно подходит для сценариев, когда несколько клиентов одновременно отправляют запросы на сервер.,Как показано ниже

На стороне брокера Kafka есть компонент SocketServer, похожий на процессор.SocketServer – это сокет-соединение на основе TCP, которое используется для приема клиентских запросов.Все сообщения запросов содержат заголовок сообщения, а заголовок сообщения содержит следующую информацию.

  • Тип запроса (он же ключ API)
  • Версия запроса (брокер может обрабатывать запросы клиентов разных версий и отвечать по-разному в зависимости от версии клиента)
  • Идентификатор корреляции --- уникальный номер, который идентифицирует сообщение запроса, а также появляется в ответных сообщениях и журналах ошибок (для диагностики проблем).
  • Идентификатор клиента --- используется для идентификации клиента, отправившего запрос.

Брокер будет запускатьAcceptorпоток, этот поток создаст соединение и передаст егоProcessor(网络线程池), количество процессоров может быть использованоnum.network.threadsConfigure, значение по умолчанию равно 3, что означает, что каждый брокер будет создавать 3 потока, когда начнет обрабатывать запросы, отправленные клиентами.

Поток Acceptor будет использовать轮询Способ справедливой отправки push-запроса в пул сетевых потоков, поэтому в процессе фактического использования эти потоки обычно имеют одинаковую вероятность быть назначенными для отложенной обработки.请求队列, то из响应队列Получите ответные сообщения и отправьте их клиенту. Обработка запроса-ответа в пуле сетевых потоков процессора относительно сложна.Ниже представлена ​​блок-схема обработки в пуле сетевых потоков.

После того, как пул сетевых потоков процессора получит сообщение, отправленное клиентом и другими брокерами, пул сетевых потоков поместит сообщение в очередь запросов.共享请求队列Поскольку пул Network Thread представляет собой многопоточный механизм, сообщение очереди запроса представляет собой область, совместно которой разделяется несколькими потоками, а затем обработана пулом потока IO, который определяется в соответствии с типом сообщения, такого какPRODUCEзапрос, то сообщение будет записано в лог лог, если оноFETCHзапрос, сообщение считывается с диска или из кэша страниц. Другими словами, пул потоков ввода-вывода — это компонент, который действительно выносит суждения и обрабатывает запросы. После обработки пула потоков ввода-вывода будет определено, что он помещен в响应队列средний илиPurgatoryЧто такое Чистилище?Поговорим об этом ниже.Теперь поговорим об очереди ответов.Очередь ответов уникальна для каждого потока,так как адаптивной модели все равно куда отправляется запрос,поэтому ответ отправляется обратно в очередь для каждого потока нить, так что нет необходимости делиться.

Примечание. Пул потоков ввода-вывода может передавать параметры на стороне брокера.num.io.threadsЧтобы настроить, номер потока по умолчанию составляет 8, который автоматически создает восемь потоков обработки IO после каждого запуска брокера.

тип запроса

Вот несколько распространенных типов запросов

производственный запрос

я здесьДействительно, достаточно прочитать эту статью о том, как начать работу с Кафкой.упоминается в статьеacksЗначение этого элемента конфигурации

Проще говоря, разные конфигурации имеют разные определения успеха записи: если acks = 1, то до тех пор, пока лидер получает сообщение, запись успешна, если acks = 0, это означает, что запись успешна, пока лидер отправляет сообщение Успех, независимо от влияния возвращаемого значения. Если acks = all, это означает, что лидер должен получать сообщения от всех реплик, прежде чем запись будет успешной.

После того, как сообщение будет записано лидеру раздела, если значение конфигурации acks равноall, то эти запросы будут храниться в炼狱(Purgatory)Ответ не будет отправлен клиенту, пока ведущая реплика не обнаружит, что последующие реплики реплицировали сообщение.

получить запрос

То, как брокер получает запрос, аналогично способу обработки производственного запроса. Клиент отправляет запрос и запрашивает у брокера сообщение по определенному смещению в разделе темы. Если смещение существует, Kafka будет использовать零复制Технология отправляет сообщение клиенту, а Kafka напрямую отправляет сообщение из файла в сетевой канал, минуя какие-либо буферы, что приводит к повышению производительности.

Клиент может установить верхний и нижний пределы данных запроса,上限Это относится к объему памяти, выделенному клиентом для получения достаточного количества сообщений. Этот предел более важен. Если верхний предел слишком велик, это может привести к прямому исчерпанию памяти клиента.下限Это можно понимать как сохранение достаточного количества пакетов данных и их последующая отправка.Это эквивалентно тому, что менеджер проекта назначает программисту 10 ошибок.Каждый раз, когда программист изменяет ошибку, он сообщает об этом менеджеру проекта.Это может не исправить вовремя, что увеличит стоимость связи и времени, поэтому нижний предел - это то, что программист должен сообщить мне после того, как вы исправите 10 ошибок! ! ! Как показано ниже

Как видно на рисунке, в拉取消息 ---> 消息Существует процесс ожидания накопления сообщений.Вы можете рассматривать это накопление сообщений как период тайм-аута, но по истечении тайм-аута произойдет исключение, и ответ на квитанцию ​​будет получен после тайм-аута накопления сообщений. Время задержки можно пройти черезreplica.lag.time.max.msНастроен, он указывает, что максимальная копия сообщения может быть разрешена при копировании времени задержки.

запрос метаданных

И производственные запросы, и запросы ответа должны быть отправлены на реплику лидера.Если брокер получает запрос для определенного раздела, а лидер запроса находится в другом брокере, клиент, отправляющий запрос, получит非分区首领ошибка ответа; такая же ошибка возникает, если запрос на раздел отправляется брокеру, у которого нет лидера. Клиенты Kafka должны отправлять запросы и ответы правильному брокеру. Разве это не ерунда? Как узнать, куда отправить?

На самом деле клиент будет использовать元数据请求, запрос такого типа будет содержать список тем, которые интересуют клиента, а ответное сообщение сервера указывает на раздел темы, ведущую копию и ведомую копию. Запросы метаданных можно отправлять любому брокеру, поскольку все брокеры кэшируют эту информацию.

При нормальных обстоятельствах клиент будет кэшировать эту информацию и отправлять запросы на производство и соответствующие запросы непосредственно к целевому брокеру. Эти кэши должны быть обновлены с интервалами, используяmetadata.max.age.msПараметры для настройки, поэтому вы знаете, изменятся ли метаданные. Например, после добавления нового брокера он будет вызвать тяжелый баланс, частичная реплика перейдет в новый брокер. В это время, если клиент получает不是首领ошибка, клиент очищает кеш метаданных перед отправкой запроса.

Процесс ребалансировки Кафки

я здесьДействительно, достаточно прочитать эту статью о том, как начать работу с Кафкой.В описании потребителей я вкратце рассказал о взаимосвязи между группами потребителей и ребалансировке, по сути, это можно свести к одному пункту: пусть все экземпляры потребителей в группе договариваются о том, какие тематические разделы потреблять.

Мы знаем, что группа потребителей должна иметь群组协调者(Coordinator), В то время как весовой баланс процесса завершен с помощью координатора вниз.

Здесь нам нужно объявить условия для перебалансировки, чтобы войти в первую очередь.

  • Любая тема, на которую подписан потребитель, меняется
  • Изменение количества потребителей
  • Количество разделов изменилось
  • Если вы подписываетесь на тему, которая еще не создана, то ребалансировка происходит при создании темы. Ребалансировка также произойдет, если тема, на которую вы подписаны, будет удалена.
  • Потребитель рассматривается координатором группы какDEADсостояние, которое может произойти из-за сбоя потребителя или нахождения в рабочем состоянии в течение длительного периода времени, что означает, что в течение разумно сконфигурированного периода времени потребитель не отправил координатору группы никаких тактов, что также может вызвать произойдет ребалансировка.

Прежде чем понимать балансировочный вес, вам нужно знать об этих двух ролях.

群组协调器(Coordinator): Координатор группы — это посредник, который может получать контрольные сообщения от всех потребителей в группе потребителей. В самых ранних версиях информация о метаданных хранилась в ZooKeeper, но в настоящее время информация о метаданных хранится в брокере. Каждая группа потребителей должна быть синхронизирована с координатором группы в группе. Когда все решения должны приниматься в узлах приложения, координатор группы может удовлетворитьJoinGroupЗапрашивает и предоставляет метаданные о группах потребителей, такие как распределения и смещения. Координатор группы также имеет право знать сердцебиение всех потребителей.Еще одна роль в группе потребителей – лидер.Пожалуйста, отличите ее от копии лидера и контроллера кафки. Лидер принимает решения в группе, поэтому, если лидер выходит из строя, координатор группы имеет право выгнать всех потребителей из группы. Следовательно, очень важным поведением группы потребителей является выбор лидера и чтение и запись информации метаданных о распределении и разделах с координатором.

消费者领导者: В каждой потребительской группе есть лидер. Если потребитель перестанет отправлять пульсации, координатор активирует перебалансировку.

Прежде чем понять ребалансировку, вам нужно знать, что такое конечный автомат.

Кафка разработал множество消费者组状态机(State Machine), чтобы помочь координатору завершить весь процесс перебалансировки. Машина состояний потребителя в основном имеет пять состояний, которыеEmpty, Dead, PreparingRebalance, CompletingRebalance и Stable.

Поняв значение этих состояний, давайте воспользуемся несколькими путями для представления ротации потребительских состояний.

Группа потребителей начинается вEmptyсостояние, когда ребалансировка включена, он будет помещен вPreparingRebalanceГосударство ждет, когда присоединятся новые потребители.CompletingRebalanceСостояние ожидает распределения. Пока новый потребитель присоединяется к группе или покидает ее, будет запущена перебалансировка, а состояние потребителя находится в состоянии PreparingRebalance. Подождите, пока будет указан механизм распределения, и завершите выделение, тогда его блок-схема выглядит следующим образом.

На основе приведенного выше рисунка, когда все группы потребителей прибываютStableсостояние, как только появится новыйИстек срок присоединения/отключения/пульса потребителей, затем запускается повторная балансировка, и состояние группы потребителей снова находится в состоянии PreparingRebalance. Тогда его блок-схема выглядит так.

Основываясь на приведенном выше рисунке, после того, как группа потребителей находится в состоянии PreparingRebalance, к сожалению, никто не играет, и все потребители ушли.В это время данные о смещении, потребляемые потребителями, все еще могут быть сохранены.После истечения срока действия данных о смещении или обновляется, то группа потребителей находится вDeadСостояние. Схема у него такая

На основе приведенного выше рисунка мы анализируем ребалансировку потребителей, вPreparingRebalanceилиCompletingRebalanceилиStableВ любом состоянии при смене лидера тематического раздела смещения группа будет находиться непосредственно в состоянии Dead, а все ее пути следующие:

Здесь следует отметить две вещи:

обычно появляютсяRequired xx expired offsets in xxx millisecondsЭто означает, что Кафка, скорее всего, удалит данные смещения группы

Только группа в состоянии Empty будет выполнять операцию удаления просроченного смещения.

процесс ребалансировки

Выше мы узнали о процессе преобразования статуса группы потребителей, давайте действительно начнем знакомитьсяRebalanceпроцесс. Процесс ребалансировки можно рассматривать с двух сторон: со стороны потребителя и со стороны координатора.Во-первых, давайте посмотрим на сторону потребителя.

Обратите внимание на баланс от потребителей

Есть два шага для потребителей, чтобы оценить баланс:消费者加入组и等待领导者分配方案. Соответствующие запросы после этих двух шаговJoinGroupиSyncGroup.

Когда новый потребитель присоединяется к группе, потребитель отправляет сообщение координаторуJoinGroupпросить. В этом запросе каждый участник-потребитель должен представить тему, которую он потребляет.Как мы упоминали выше в описании координатора группы, цель этого состоит в том, чтобы позволить координатору собрать достаточно информации метаданных для выбора потребления лидера группы. . Как правило, первый потребитель, отправивший запрос на присоединение к группе, автоматически называется лидером.Задача лидера состоит в том, чтобы собрать информацию о подписке для всех участников, а затем на основе этой информации разработать распределение конкретных разделов потребления.. Фигура

После того, как все потребители присоединились и передали информацию о метаданных лидеру, лидер составляет план распределения и отправляетSyncGroupЗапрос отправляется координатору, и координатор отвечает за выдачу политики потребления в группе. На следующей диаграмме показан процесс запроса SyncGroup.

Когда все участники успешно получили план распределения, группа потребителей переходит в стабильное состояние, то есть начинается нормальная работа по потреблению.

Ребалансировка с точки зрения координатора

С точки зрения координатора, ребалансировка в основном имеет следующие условия срабатывания:

  • Новый участник присоединяется к группе
  • Члены группы уходят добровольно
  • Члены группы терпят крах и уходят
  • Члены группы отправляют смещения

Опишем их отдельно, начиная с новых участников, вступающих в группу.

#### Новые участники присоединяются к группе

Мы обсудим сценарий, в котором состояние потребительского кластера находится вStableОжидание процесса распределения, в это время, если новый участник присоединяется к группе, процесс ребалансировки

С этой точки зрения координатор процессов и потребители похожи, но просто посмотрите с точки зрения потребителей, а теперь посмотрите с точки зрения лидеров.

Члены группы уходят

Член группы, покидающий группу потребителей, обращается к экземпляру потребителя, вызывающемуclose()Метод активно информирует координатора о том, что он собирается выйти. Вот и новый запросLeaveGroup()请求. Как показано ниже

авария участника группы

Сбой члена группы означает, что у экземпляра-потребителя произошел серьезный сбой, он не работает или не отвечает в течение определенного периода времени, и координатор не может получить пульс от потребителя, это будет рассматриваться как сбой.组成员崩溃,崩溃离组是被动的,协调者通常需要等待一段时间才能感知到,这段时间一般是由消费者端参数 session.timeout.ms 控制的。 Как показано ниже

Совершить смещение при перебалансировке

Мы больше не будем использовать графику для представления этого процесса.Общее описание состоит в том, что после того, как потребитель отправит запрос на присоединение к группе, потребители в группе должны отправить свои соответствующие смещения в течение указанного диапазона времени, а затем запустить обычный запрос на присоединение к группе/синхронной группе. Отправить.

Если вы согласны со мной, пожалуйста, поставьте мне лайк, спасибо. Увидимся в нашей следующей технической статье.

Ссылка на статью:

Полное руководство по Кафке

blog.csdn.net/u013256816/...

обучение. О, Reilly.com/library/VIE...

Блог woohoo.cn на.com/Kevin Grace/…

woo woo woo.cn blog on.com/breath 2 not/afraid/69…

«Geek Time — основные технологии и практика Kafka»

Из wiki.Apache.org/confluence/…

Из wiki.Apache.org/confluence/…

разделы и реплики kafka, процесс выполнения kafaka и высокая доступность сообщений

Синхронные и асинхронные запросы в Http

Подробное объяснение режима Reactor

kafka.apache.org/document ATI…

Woohoo.LinkedIn.com/pulse/Страх перед человеческим телом…

Из wiki.Apache.org/confluence/…