Интервьюер: Расскажите мне обо всем процессе обработки запросов Kafka.

Kafka

Всем привет, меня зовут да.

Это моя третья статья об анализе исходного кода Kafka.Первые две статьи говорили оЧтение и запись сегментов журналаа такжеПрименение бинарного алгоритма к индексу kafka

давай поговорим сегодняKafka BrokerВесь процесс обработки запросов в конце анализирует, как реализована базовая сетевая коммуникация, и приложение Reactor на kafka.

Давайте поговорим о том, почему сообщество разделило типы запросов на две категории в версии 2.3 и как реализовать приоритет двух типов обработки запросов. #лепет Однако, прежде чем перейти к сегодняшней теме, я хотел бы сказать несколько слов: разные люди по-разному относятся к исходному коду.

Некоторые люди пугаются, когда слышат слово «исходный код» Как вы себе представляете столько кода? Вбегает, как безголовая муха, все время следует за точкой останова, прыгает и пока-пока.

А некоторые считают, что исходный код бесполезен, если вы его прочтете, то не сможете им пользоваться.

На самом деле у меня есть обе мысли выше, хахаха. Тогда зачем мне смотретьKafkaЧто насчет исходного кода?

На самом деле, у меня есть коллега, который преподает сам.go, а затем хотите использовать go для записи очереди сообщений, спросите меня, когда я рисую архитектурную диаграмму, в этой очереди сообщений, кажется, что-то есть, отправка и получение сообщений, управление метаданными, как сохранить кучу вопросов, я не могу терпеть. .

этот рынокKafka,RocketMQЭто все готовые решения, поэтому я посмотрел исходный код.

Итак, первоначальная мотивация, побудившая меня взглянуть на исходный код, заключалась в том, чтобы притвориться перед моими коллегами! !

я первый посмотрелRocketMQ, потому что в конце концов этоJavaнаписано, покаKafka Brokerобеscalaнаписано.

прочесалRocketMQПосле этого я хочу увидетьKafkaКак ты это сделал, так что я посмотрел еще разKafka.

Перед анализом исходного кода я сделаю краткое заявление.KafkaБазовая модель общения. Отвечайте на вопросы интервьюераKafkaДостаточно запросить весь процесс.

Реакторный режим

потянув заKafkaПрежде чем мы скажемReactor模式, в основном до тех пор, пока базовая высокопроизводительная сетевая связь неотделимаReactor模式. Как и Netty, Redis используютсяReactor模式.

Как и когда мы только что изучали сетевое программирование, следующий код очень знаком: новый запрос либо обрабатывается непосредственно в текущем потоке, либо для его обработки создается новый поток.

В первые дни такое программирование не было проблемой, но с быстрым развитием Интернета однопоточная обработка не может быть выполнена, и ресурсы компьютера не могут быть полностью использованы.

И каждый запрос обрабатывается новым потоком, требования к ресурсам слишком высоки, а создание потока — тоже тяжелая операция.

Когда дело доходит до этого, кто-то думал об этом, разве недостаточно построить пул потоков, что еще нужно?Reactor.

Технология пула действительно может облегчить проблему ресурсов, но пул ограничен, и поток в пуле все равно должен ждать подключения и инструкции. Эпоха Интернета уже прорваласьC10K.

поэтому введенIO多路复用,Зависит отПоток для мониторинга множества подключений, синхронно дождаться прихода одного или нескольких событий ввода-вывода, а затем распределить события по соответствующимHandlerобработка, это называетсяReactor模式.

Развитие модели сетевой коммуникации происходит следующим образом.

Один поток => Многопоток => Пул потоков => Модель реактора

Кафка использовалReactor模型следующим образом

图来自Doug Lea大神的 Scalable IO in Java

Модель сетевой коммуникации Kafka Broker

Проще говоря, есть БрокерAcceptor(mainReactor)Прислушивайтесь к прибытию новых соединений и опрашивайте, чтобы выбрать одно после установления соединения с новым соединением.Processor(subReactor)управлять этим соединением.

а такжеProcessorБудет прослушивать соединение, которым он управляет, и когда наступит событие, считать пакет вRequest, и воляRequestПоместить в общую очередь запросов.

Затем пул потоков ввода-вывода непрерывно берет запросы из очереди и выполняет реальную обработку. После обработки отправить ответ на соответствующийProcessorочередь ответов, затемProcessorбудетResponseвозвращен клиенту.

каждыйlistenerединственныйAcceptor线程, потому что он перераспределяется только как новое соединение, логики лишней нет, он очень легковесный, и одного достаточно.

ProcessorВ Kafka это называется сетевым потоком.Пул сетевых потоков по умолчанию имеет 3 потока, и соответствующие параметрыnum.network.threads. И может динамически увеличиваться или уменьшаться в зависимости от фактического бизнеса.

Существует также пул потоков ввода-вывода, а именноKafkaRequestHandlerPool, чтобы выполнить реальную обработку, соответствующие параметрыnum.io.threads, значение по умолчанию — 8. После обработки потока ввода-вывода онResponseпоставить в соответствующийProcessorв, поProcessorВерните ответ клиенту.

Вы можете увидеть классический шаблон производитель-потребитель, используемый между сетевыми потоками и потоками ввода-вывода, будь то общая очередь запросов для обработки запросов или ответов, возвращаемых после обработки ввода-вывода.

Какая польза от этого? Производитель и потребитель отделены друг от друга, и в производителя или потребителя могут быть внесены независимые изменения и расширения. И это может сбалансировать вычислительную мощность двух.Например, если потребления недостаточно, я добавлю больше потоков ввода-вывода.

Если вы посмотрите на другой исходный код промежуточного программного обеспечения, вы обнаружите, что модель производитель-потребитель действительно слишком распространена, поэтому вопросы интервью часто содержат рукописную волну производитель-потребитель.

Анализ моделей сетевых коммуникаций на уровне источника

Компоненты сетевой связи Kafka в основном состоят из двух частей:SocketServerа такжеKafkaRequestHandlerPool.

##SocketServer

Как можно заметитьSocketServerпод управлением,Acceptor 线程,Processor 线程а такжеRequestChannelи т.п. объекты.

data-planeа такжеcontrol-planeАнализ сделаю позже, сначала посмотрюRequestChannelчто.

RequestChannel

Ключевые свойства и методы были аннотированы в следующем коде, видно, что этот объект в основномуправлятьProcessorа такжекак передачаRequestа такжеResponseпересадочная станция.

## Акцептор Далее мы смотрим наAcceptor

Вы можете видеть, что он наследуетAbstractServerThread, давайте посмотрим, что он работает

посмотри сноваaccept(key)что сделал

очень простой, стандартныйselectorобработка, получить готовое мероприятие, позвонитьserverSocketChannel.accept()получитьsocketChannel,БудуsocketChannelпередано тому, кто был выбран голосованиемProcessor, а затем обрабатывает события ввода-вывода. ##Процессор Далее мы смотрим наProcessor, в сравнении сAcceptorНемного сложнее.

Давайте взглянем на трех ключевых членов

Рассмотрим основную логику обработки.

можно увидетьProcessorВ основном для инкапсуляции базовых данных ввода/вывода события чтения вRequestСохраняется в очереди, а затем вставляется в поток ввода-выводаResponse, возвращается клиенту и обрабатываетсяResponseлогика обратного вызова.

#KafkaRequestHandlerPool

Пул потоков ввода-вывода, поток, который фактически обрабатывает запрос.

Давайте посмотрим, что делает поток ввода-вывода

Очень просто, ядро ​​​​должно непрерывноrequestChannelВозьмите запрос, а затем вызовите handle для обработки запроса.

handleметод находится по адресуKafkaApisкласс, который можно понимать как прохождениеswitch, по разному указанному в заголовке запросаapikeyназывай разныеhandleдля обработки запроса.

Возьмем пример более простого процессаLIST_OFFSETSпроцесс, то естьhandleListOffsetRequest, чтобы завершить замкнутый цикл запроса.

Цепочку вызовов я отметил красной стрелкой. Указывает, что после обработки запроса он забивается в соответствующийProcessorиз.

Наконец, добавляется более подробная обзорная карта, и в основном соответственно добавляются классы, анализируемые исходным кодом.

#Приоритет обработки запроса упомянутый вышеdata-planeа такжеcontrol-planeВремя поднять завесу. Эти два соответствуют запросу класса данных и запросу класса управления.

Почему существует два типа запросов? Можно ли использовать ключ непосредственно в запросе, чтобы указать, предназначен ли запрос для чтения и записи данных или для обновления метаданных?

Проще говоря, например, если мы хотим удалить тему, мы обязательно хотим, чтобы эта тема была удалена немедленно, а в это время производитель записывает данные в эту тему, то такая ситуация может заключаться в том, что наш запрос на удаление занял N-е место... Подождите, пока предыдущий запрос на запись не будет обработан, прежде чем наступит очередь запроса на удаление. На самом деле предыдущие запросы, написанные в эту тему, бесполезны и зря потребляют ресурсы.

или продолжайPreferred LeaderВо время выборов,producerбудетackУстановить какallкогда,老leaderвсе еще ждуfollowerНаписав данные, сообщите ему, кто знаетfollowerстал新leader, а запрос на уведомление о смене лидера блокируется кучей запросов типа данных,老leaderПросто глупо ждать, пока это время истечет.

Именно для решения этой ситуации сообщество делит запросы на две категории.

Как сделать так, чтобы запрос управляющего класса обрабатывался первым? Приоритетная очередь?

Сообщество взяло два комплектаListener, то есть тип данных alistener, класс управленияlistener.

В соответствии с тем, что мы сказали вышеСетевая коммуникационная модель, в кафке два набора!Kafka реализует приоритет запроса через два замаскированных набора мониторинга.В конце концов, должно быть много запросов типов данных и должно быть мало управляющих классов, поэтому кажется, что управляющие классы должны обрабатываться перед большинством типов данных!

Тактика обхода.

Разница между классом управления и классом данных в том, что только одинPorcessor线程, а длина очереди запросов на запись составляет 20.

наконец-то

Главное, чтобы смотреть на исходный код, — это быть терпеливым и терпеливо следовать ему. Тогда выпрыгивай и смотри. Вы обнаружите, что это не так, хахаха.

Я да, человек-инструмент, который борется в Интернете и не имеет эмоций.