Всем привет, меня зовут да.
Это моя третья статья об анализе исходного кода Kafka.Первые две статьи говорили оЧтение и запись сегментов журналаа такжеПрименение бинарного алгоритма к индексу kafka
давай поговорим сегодняKafka Broker
Весь процесс обработки запросов в конце анализирует, как реализована базовая сетевая коммуникация, и приложение Reactor на kafka.
Давайте поговорим о том, почему сообщество разделило типы запросов на две категории в версии 2.3 и как реализовать приоритет двух типов обработки запросов. #лепет Однако, прежде чем перейти к сегодняшней теме, я хотел бы сказать несколько слов: разные люди по-разному относятся к исходному коду.
Некоторые люди пугаются, когда слышат слово «исходный код» Как вы себе представляете столько кода? Вбегает, как безголовая муха, все время следует за точкой останова, прыгает и пока-пока.
А некоторые считают, что исходный код бесполезен, если вы его прочтете, то не сможете им пользоваться.
На самом деле у меня есть обе мысли выше, хахаха. Тогда зачем мне смотретьKafka
Что насчет исходного кода?
На самом деле, у меня есть коллега, который преподает сам.go
, а затем хотите использовать go для записи очереди сообщений, спросите меня, когда я рисую архитектурную диаграмму, в этой очереди сообщений, кажется, что-то есть, отправка и получение сообщений, управление метаданными, как сохранить кучу вопросов, я не могу терпеть. .
этот рынокKafka
,RocketMQ
Это все готовые решения, поэтому я посмотрел исходный код.
Итак, первоначальная мотивация, побудившая меня взглянуть на исходный код, заключалась в том, чтобы притвориться перед моими коллегами! !
я первый посмотрелRocketMQ
, потому что в конце концов этоJava
написано, покаKafka Broker
обеscala
написано.
прочесалRocketMQ
После этого я хочу увидетьKafka
Как ты это сделал, так что я посмотрел еще разKafka
.
Перед анализом исходного кода я сделаю краткое заявление.Kafka
Базовая модель общения. Отвечайте на вопросы интервьюераKafka
Достаточно запросить весь процесс.
Реакторный режим
потянув заKafka
Прежде чем мы скажемReactor模式
, в основном до тех пор, пока базовая высокопроизводительная сетевая связь неотделимаReactor模式
. Как и Netty, Redis используютсяReactor模式
.
Как и когда мы только что изучали сетевое программирование, следующий код очень знаком: новый запрос либо обрабатывается непосредственно в текущем потоке, либо для его обработки создается новый поток.
В первые дни такое программирование не было проблемой, но с быстрым развитием Интернета однопоточная обработка не может быть выполнена, и ресурсы компьютера не могут быть полностью использованы.
И каждый запрос обрабатывается новым потоком, требования к ресурсам слишком высоки, а создание потока — тоже тяжелая операция.
Когда дело доходит до этого, кто-то думал об этом, разве недостаточно построить пул потоков, что еще нужно?Reactor
.
Технология пула действительно может облегчить проблему ресурсов, но пул ограничен, и поток в пуле все равно должен ждать подключения и инструкции. Эпоха Интернета уже прорваласьC10K
.
поэтому введенIO多路复用
,Зависит отПоток для мониторинга множества подключений, синхронно дождаться прихода одного или нескольких событий ввода-вывода, а затем распределить события по соответствующимHandler
обработка, это называетсяReactor模式
.
Развитие модели сетевой коммуникации происходит следующим образом.
Один поток => Многопоток => Пул потоков => Модель реактора
Кафка использовалReactor模型
следующим образом
Модель сетевой коммуникации Kafka Broker
Проще говоря, есть БрокерAcceptor(mainReactor)
Прислушивайтесь к прибытию новых соединений и опрашивайте, чтобы выбрать одно после установления соединения с новым соединением.Processor(subReactor)
управлять этим соединением.
а такжеProcessor
Будет прослушивать соединение, которым он управляет, и когда наступит событие, считать пакет вRequest
, и воляRequest
Поместить в общую очередь запросов.
Затем пул потоков ввода-вывода непрерывно берет запросы из очереди и выполняет реальную обработку. После обработки отправить ответ на соответствующийProcessor
очередь ответов, затемProcessor
будетResponse
возвращен клиенту.
каждыйlistener
единственныйAcceptor线程
, потому что он перераспределяется только как новое соединение, логики лишней нет, он очень легковесный, и одного достаточно.
Processor
В Kafka это называется сетевым потоком.Пул сетевых потоков по умолчанию имеет 3 потока, и соответствующие параметрыnum.network.threads
. И может динамически увеличиваться или уменьшаться в зависимости от фактического бизнеса.
Существует также пул потоков ввода-вывода, а именноKafkaRequestHandlerPool
, чтобы выполнить реальную обработку, соответствующие параметрыnum.io.threads
, значение по умолчанию — 8. После обработки потока ввода-вывода онResponse
поставить в соответствующийProcessor
в, поProcessor
Верните ответ клиенту.
Вы можете увидеть классический шаблон производитель-потребитель, используемый между сетевыми потоками и потоками ввода-вывода, будь то общая очередь запросов для обработки запросов или ответов, возвращаемых после обработки ввода-вывода.
Какая польза от этого? Производитель и потребитель отделены друг от друга, и в производителя или потребителя могут быть внесены независимые изменения и расширения. И это может сбалансировать вычислительную мощность двух.Например, если потребления недостаточно, я добавлю больше потоков ввода-вывода.
Если вы посмотрите на другой исходный код промежуточного программного обеспечения, вы обнаружите, что модель производитель-потребитель действительно слишком распространена, поэтому вопросы интервью часто содержат рукописную волну производитель-потребитель.
Анализ моделей сетевых коммуникаций на уровне источника
Компоненты сетевой связи Kafka в основном состоят из двух частей:SocketServerа такжеKafkaRequestHandlerPool.
##SocketServer
Как можно заметитьSocketServer
под управлением,Acceptor 线程
,Processor 线程
а такжеRequestChannel
и т.п. объекты.
data-plane
а такжеcontrol-plane
Анализ сделаю позже, сначала посмотрюRequestChannel
что.
RequestChannel
Ключевые свойства и методы были аннотированы в следующем коде, видно, что этот объект в основномуправлятьProcessor
а такжекак передачаRequest
а такжеResponse
пересадочная станция.
## Акцептор
Далее мы смотрим наAcceptor
Вы можете видеть, что он наследуетAbstractServerThread
, давайте посмотрим, что он работает
accept(key)
что сделал
очень простой, стандартныйselector
обработка, получить готовое мероприятие, позвонитьserverSocketChannel.accept()
получитьsocketChannel
,БудуsocketChannel
передано тому, кто был выбран голосованиемProcessor
, а затем обрабатывает события ввода-вывода.
##Процессор
Далее мы смотрим наProcessor
, в сравнении сAcceptor
Немного сложнее.
Давайте взглянем на трех ключевых членов
Рассмотрим основную логику обработки.
можно увидетьProcessor
В основном для инкапсуляции базовых данных ввода/вывода события чтения вRequest
Сохраняется в очереди, а затем вставляется в поток ввода-выводаResponse
, возвращается клиенту и обрабатываетсяResponse
логика обратного вызова.
#KafkaRequestHandlerPool
Пул потоков ввода-вывода, поток, который фактически обрабатывает запрос.
Давайте посмотрим, что делает поток ввода-вывода
Очень просто, ядро должно непрерывноrequestChannel
Возьмите запрос, а затем вызовите handle для обработки запроса.
handle
метод находится по адресуKafkaApis
класс, который можно понимать как прохождениеswitch
, по разному указанному в заголовке запросаapikey
называй разныеhandle
для обработки запроса.
Возьмем пример более простого процессаLIST_OFFSETS
процесс, то естьhandleListOffsetRequest
, чтобы завершить замкнутый цикл запроса.
Цепочку вызовов я отметил красной стрелкой. Указывает, что после обработки запроса он забивается в соответствующийProcessor
из.
Наконец, добавляется более подробная обзорная карта, и в основном соответственно добавляются классы, анализируемые исходным кодом.
#Приоритет обработки запроса
упомянутый вышеdata-plane
а такжеcontrol-plane
Время поднять завесу. Эти два соответствуют запросу класса данных и запросу класса управления.
Почему существует два типа запросов? Можно ли использовать ключ непосредственно в запросе, чтобы указать, предназначен ли запрос для чтения и записи данных или для обновления метаданных?
Проще говоря, например, если мы хотим удалить тему, мы обязательно хотим, чтобы эта тема была удалена немедленно, а в это время производитель записывает данные в эту тему, то такая ситуация может заключаться в том, что наш запрос на удаление занял N-е место... Подождите, пока предыдущий запрос на запись не будет обработан, прежде чем наступит очередь запроса на удаление. На самом деле предыдущие запросы, написанные в эту тему, бесполезны и зря потребляют ресурсы.
или продолжайPreferred Leader
Во время выборов,producer
будетack
Установить какall
когда,老leader
все еще ждуfollower
Написав данные, сообщите ему, кто знаетfollower
стал新leader
, а запрос на уведомление о смене лидера блокируется кучей запросов типа данных,老leader
Просто глупо ждать, пока это время истечет.
Именно для решения этой ситуации сообщество делит запросы на две категории.
Как сделать так, чтобы запрос управляющего класса обрабатывался первым? Приоритетная очередь?
Сообщество взяло два комплектаListener
, то есть тип данных alistener
, класс управленияlistener
.
В соответствии с тем, что мы сказали вышеСетевая коммуникационная модель, в кафке два набора!Kafka реализует приоритет запроса через два замаскированных набора мониторинга.В конце концов, должно быть много запросов типов данных и должно быть мало управляющих классов, поэтому кажется, что управляющие классы должны обрабатываться перед большинством типов данных!
Тактика обхода.
Разница между классом управления и классом данных в том, что только одинPorcessor线程
, а длина очереди запросов на запись составляет 20.
наконец-то
Главное, чтобы смотреть на исходный код, — это быть терпеливым и терпеливо следовать ему. Тогда выпрыгивай и смотри. Вы обнаружите, что это не так, хахаха.
Я да, человек-инструмент, который борется в Интернете и не имеет эмоций.