Кафка, которую ты должен знать

Java задняя часть Kafka

1 Обзор

Первоначально Apache Kafka была распределенной системой обмена сообщениями с открытым исходным кодом от LinkedIn, но теперь она является подпроектом Apache и стала одной из наиболее широко используемых систем обмена сообщениями в области с открытым исходным кодом. Сообщество Kafka очень активно, и начиная с версии 0.9 слоган Kafka изменился с «распределенной системы обмена сообщениями с высокой пропускной способностью» на «распределенную потоковую платформу».

Kafka отличается от традиционных систем обмена сообщениями:

  • Kafka — это распределенная система, которую легко масштабировать.

  • Он обеспечивает высокую пропускную способность как для публикации, так и для подписки.

  • Он поддерживает несколько подписчиков и автоматически балансирует потребителей в случае сбоя.

  • Сохранение сообщений

Сравнение кафки и других очередей сообщений:

 

kafka

activemq

rabbitmq

rocketmq

задний план

Kafka — это высокопроизводительная распределенная система обмена сообщениями, разработанная LinkedIn, которая широко используется в таких сценариях, как сбор журналов, потоковая обработка данных, онлайн- и офлайн-рассылка сообщений и т. д.

ActiveMQ ActiveMQ — это промежуточное программное обеспечение с открытым исходным кодом, ориентированное на сообщения (MOM), которое реализует спецификацию JMS1.1, предоставляя приложениям эффективную, масштабируемую, стабильную и безопасную передачу сообщений на уровне предприятия.

RabbitMQ — это реализация протокола AMQP (Advanced Message Queue) с открытым исходным кодом, разработанная erlang.

RocketMQ — промежуточное ПО для обмена сообщениями с открытым исходным кодом от Alibaba, выпущенное в 2012 году. Оно было передано в дар Apache Foundation и в ноябре 2016 года стало инкубационным проектом Apache.

Язык разработки

java,scala

Java

Erlang

Java

Поддержка протокола

Комплект договоров, составленный самостоятельно

JMS-протокол

AMQP

JMS, MQTT

Постоянная поддержка

служба поддержки

служба поддержки

служба поддержки

служба поддержки

Сопровождение сделки

Поддерживается после 0.11.0

служба поддержки

служба поддержки

служба поддержки

отказоустойчивость производителя

Опция конфигурации ack предоставляется в kafka,

request.required.acks=-1, самый низкий уровень, производителю не нужно заботиться об успешной передаче

request.required.acks=0, нужен только ведущий раздел

request.required.acks=1, все элементы в наборе isr синхронизируются перед возвратом

Может иметь повторяющиеся данные

Повторите попытку после отправки

с моделью акка

модель ack может дублировать сообщения

Модель транзакции гарантирует полную согласованность

похож на кафку

пропускная способность

Kafka обладает высокой пропускной способностью, внутренней пакетной обработкой сообщений, механизмом нулевого копирования, хранением и сбором данных являются последовательные пакетные операции на локальном диске со сложностью O(1), а эффективность обработки сообщений очень высока.


RabbitMQ немного уступает Kafka по пропускной способности.Отправные точки у них разные. RabbitMQ поддерживает надежную доставку сообщений, поддерживает транзакции и не поддерживает пакетные операции;исходя из требований надежности хранения, память или жесткий диск могут использоваться для место хранения.

Kafka имеет более высокую пропускную способность, чем RocketMq, когда количество тем небольшое, а RocketMq выше, чем kafka, когда количество тем большое.

балансировки нагрузки

Kafka использует zookeeper для управления брокерами и потребителями в кластере и может регистрировать темы в zookeeper; с помощью механизма координации zookeeper производитель сохраняет информацию брокера о соответствующей теме, которая может быть отправлена ​​​​брокеру случайным образом или путем опроса; и производитель может быть указан на основе семантики Фрагментация, сообщение отправляется фрагменту брокера

 

Для балансировки нагрузки RabbitMQ требуется отдельный балансировщик нагрузки для поддержки

NamerServer для балансировки нагрузки

2. Вводный пример

2.1 Производители

producer

import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class UserKafkaProducer extends Thread
{
    private final KafkaProducer<Integer, String> producer;
    private final String topic;
    private final Properties props = new Properties();
    public UserKafkaProducer(String topic)
    {
        props.put("metadata.broker.list", "localhost:9092");
        props.put("bootstrap.servers", "master2:6667");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        producer = new KafkaProducer<Integer, String>(props);
        this.topic = topic;
    }
    @Override
    public void run() {
        int messageNo = 1;
        while (true)
        {
            String messageStr = new String("Message_" + messageNo);
            System.out.println("Send:" + messageStr);
			//返回的是Future<RecordMetadata>,异步发送
            producer.send(new ProducerRecord<Integer, String>(topic, messageStr));
            messageNo++;
            try {
                sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

2.2 Потребители

Properties props = new Properties();
/* 定义kakfa 服务的地址,不需要将所有broker指定上 */
props.put("bootstrap.servers", "localhost:9092");
/* 制定consumer group */
props.put("group.id", "test");
/* 是否自动确认offset */
props.put("enable.auto.commit", "true");
/* 自动确认offset的时间间隔 */
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
/* key的序列化类 */
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
/* value的序列化类 */
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
 /* 定义consumer */
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
/* 消费者订阅的topic, 可同时订阅多个 */
consumer.subscribe(Arrays.asList("foo", "bar"));

 /* 读取数据,读取超时时间为100ms */
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records)
        System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
}

3. Принцип архитектуры Кафки

Давайте зададим несколько вопросов об архитектурном принципе kafka?

1. Как в Kafka хранятся темы и разделы и каковы их характеристики?

2. В чем преимущества модели потребления Kafka по сравнению с традиционными системами обмена сообщениями?

3. Как Kafka реализует распределенное хранение и чтение данных?

3.1 Схема архитектуры Кафки

3.2 объяснение термина кафка

В наборе архитектуры kafka есть несколько производителей, несколько брокеров и несколько потребителей.Каждый производитель может соответствовать нескольким темам, а каждый потребитель может соответствовать только одной группе потребителей.

Вся архитектура Kafka соответствует кластеру ZK, ZK управляет конфигурацией кластера, выбирает лидера и выполняет перебалансировку при изменении группы потребителей.

название

объяснять

Broker

Узел обработки промежуточного программного обеспечения сообщений, узел Kafka является брокером, и один или несколько брокеров могут формировать кластер Kafka.

Topic

Тема, Kafka классифицирует сообщения по темам, и каждое сообщение, публикуемое в кластере Kafka, должно указывать тему.

Producer

Производитель сообщений, клиент, который отправляет сообщения брокеру.

Consumer

Потребитель сообщений, клиент, который читает сообщения от брокера.

ConsumerGroup

Каждый потребитель принадлежит к определенной группе потребителей, и сообщение может быть отправлено нескольким различным группам потребителей, но только один потребитель в группе потребителей может использовать сообщение.

Partition

Физическая концепция, тема может быть разделена на несколько разделов, каждый раздел упорядочен внутри

3.3 Тема и раздел

У каждого сообщения в Kafka есть тема. Вообще говоря, в нашем приложении генерируются разные типы данных, и можно устанавливать разные темы. У темы обычно есть несколько подписчиков сообщений.Когда производитель публикует сообщение в теме, все потребители, подписавшиеся на эту тему, могут получать новые сообщения, написанные производителем.

Kafka поддерживает файл журнала распределенного раздела для каждой темы, и каждый раздел представляет собой журнал добавления на уровне хранилища Kafka. Любое сообщение, опубликованное в этом разделе, будет добавлено в конец лог-файла, и каждому сообщению в разделе будет присвоен монотонно возрастающий порядковый номер в хронологическом порядке, то есть наше смещение, смещение — длинное число, Через это смещение , мы можем определить уникальное сообщение под разделом. Заказ гарантирован по разделу, но не по теме.

На приведенной выше диаграмме наш производитель будет решать, в какой раздел отправлять данные.

1. Если значение ключа отсутствует, отправить его путем опроса.

2. Если есть значение ключа, выполните хеширование значения ключа, а затем возьмите оставшееся количество разделов, чтобы убедиться, что одно и то же значение ключа будет направлено в тот же раздел.Если вы хотите строгую последовательную согласованность очереди , вы можете установить для всех сообщений один и тот же ключ.

3.4 Модель потребления

После того, как сообщение будет отправлено производителем в кластер kafka, оно будет использовано потребителем. Вообще говоря, у нас есть две модели потребления: модель push (psuh) и модель pull (pull).

В системе сообщений, основанной на модели push, статус потребления записывается брокером сообщений. После того, как брокер сообщений отправляет сообщение потребителю, он помечает сообщение как использованное, но этот метод не может гарантировать семантику обработки при потреблении. Например, после того, как мы отправили сообщение потребителю, поскольку потребляющий процесс зависает или сообщение не получено по сетевым причинам, если мы пометим его как потребляемое в потребляющем агенте, сообщение будет безвозвратно потеряно. Если мы используем метод ответа после того, как производитель получил сообщение, брокер сообщений должен записывать статус потребления, что нежелательно. Если используется push, скорость потребления сообщений полностью контролируется агентом-потребителем.Как только потребитель будет заблокирован, возникнут проблемы.

Kafka использует модель вытягивания (опрос), которая сама контролирует скорость потребления и ход потребления, потребители могут потреблять в соответствии с любым смещением. Например, потребители могут использовать сообщения, которые были использованы для повторной обработки, или использовать последние сообщения и т. д.

3.5 Сетевая модель

3.5.1 KafkaClient — однопоточный селектор

Однопоточный режим подходит для небольшого количества одновременных ссылок, простой логики и небольшого объема данных.

В kafka и потребитель, и производитель используют описанный выше однопоточный режим. Этот режим не подходит для серверной части кафки, процесс обработки запросов в серверной части более сложен, что приведет к блокировке потока, при появлении последующих запросов они не будут обрабатываться, что вызовет большое количество таймаутов запросов. и вызывать лавины. На сервере многопоточность должна полностью использоваться для обработки логики выполнения.

3.5.2 Kafka--server -- многопоточный селектор

На сервере kafka используется многопоточная модель Selector.Acceptor работает в отдельном потоке.Для потоков в пуле потоков операции чтения событие чтения регистрируется в селекторе, отвечающем за логику работы запрос на чтение сервера. После успешного чтения поместите запрос в общую очередь очереди сообщений. Затем в пуле потоков записи выньте запрос и выполните над ним логическую обработку.Даже если поток запроса заблокирован, последующий округ получит запрос из очереди сообщений и обработает его, а логическая обработка будет обработана в поток записи.Событие OP_WIRTE зарегистрировано, поэтому на него также необходимо отправить ответ.

3.6 Модель высоконадежного распределенного хранилища

В Kafka высокая надежность модели гарантируется механизмом копирования, благодаря которому не происходит потери данных, даже если машина не работает.

3.6.1 Высокопроизводительное хранилище журналов

Все сообщения в теме в Kafka распределяются и хранятся на нескольких узлах в виде разделов. В то же время на машине kafka каждый раздел будет фактически соответствовать каталогу журнала, и в каталоге будет несколько сегментов журнала (LogSegment). Файл LogSegment состоит из двух частей, а именно файла «.index» и файла «.log», которые соответственно представлены как индексный файл сегмента и файл данных. Командные правила этих двух файлов таковы: первый сегмент раздела global начинается с 0, имя файла каждого последующего файла сегмента является значением смещения последнего сообщения предыдущего файла сегмента, числовой размер составляет 64 бита и длина составляет 20 цифр.Никакие числа не заполняются 0, как показано ниже, при условии, что сообщений 1000, каждый размер LogSegment равен 100, ниже показан индекс и журнал 900-1000:

Поскольку данные сообщения Kafka слишком велики, если все индексы будут построены, они будут занимать место и увеличивать время, поэтому Kafka выбирает метод разреженного индекса, чтобы индекс можно было напрямую ввести в память, что ускоряет работу. Частичная скорость запроса.

Кратко представим, как читать данные.Если мы хотим прочитать 911-е данные, то первым делом нужно выяснить, к какому сегменту он принадлежит, и найти файл, к которому он принадлежит по методу дихотомии.После нахождения 0000900.index и 00000900 .log, Затем переходим к индексу, чтобы найти индекс (911-900) = 11 или ближайший индекс меньше 11. Здесь мы находим индекс [10,1367] методом дихотомии, а затем передаем физическое положение этого индекса 1367, и начните после поиска, пока не будет найдено 911 фрагментов данных.

Выше описан процесс поиска смещения, но в большинстве случаев нам не нужно находить смещение, нам просто нужно его последовательно прочитать, а при последовательном чтении операционная система добавит между памятью и диском Кэш страницы — это операция предварительного чтения, которую мы обычно наблюдаем, поэтому наша операция последовательного чтения выполняется очень быстро. Однако с Kafka есть проблема. Если разделов слишком много, то будет много сегментов лога. При записи, поскольку он пишется партиями, он фактически станет случайной записью. Произвольный ввод-вывод оказывает большое влияние на производительность. В настоящее время. Так что в целом у Кафки не может быть слишком много разделов. В ответ на это RocketMQ записывает все логи в файл, который может стать последовательной записью, при определенной оптимизации чтение может быть близко к последовательному чтению.

Можно подумать: 1. Зачем нужны разделы, то есть раздел для темы всего один, неужели нельзя? 2. Зачем нужно сегментировать журналы

1. Разделение для горизонтального расширения 2. Если журнал слишком велик в том же файле, это повлияет на производительность. Запрос замедляется, если журнал растет бесконечно

3.6.2 Механизм копирования

Механизм репликации Kafka заключается в том, что несколько серверных узлов реплицируют журналы тематических разделов других узлов. Когда узел в кластере выходит из строя, запрос на доступ к отказавшему узлу будет передан другим нормальным узлам (этот процесс обычно называется Reblance).Каждый раздел каждой темы в Kafka имеет первичную реплику и 0 или более реплик.Реплики сохраняют их данные синхронизируются с первичной репликой и заменяются при сбое первичной реплики.

В Kafka не все реплики можно использовать для замены главной реплики, поэтому набор ISR (In sync Replicas) поддерживается в ведущем узле kafka, который в переводе также называется синхронизированным набором. выполняются условия:

  • Узлы должны оставаться подключенными к ZK

  • Реплика не может отставать слишком далеко от мастера во время синхронизации.

Кроме того, существует AR (назначенные реплики), используемый для определения полного набора реплик, OSR используется для представления набора реплик, которые были исключены из-за задержки, поэтому формула выглядит следующим образом: ISR = лидер + реплики, которые сильно не лагайте, AR = OSR + ISR;

Вот два существительных: HW (высокий уровень воды) — расположение раздела, которое может видеть потребитель, и LEO — расположение последнего сообщения в журнале каждого раздела. HW может гарантировать, что брокер, в котором находится лидер, выйдет из строя, и сообщение все еще можно будет получить от вновь избранного лидера, что не приведет к потере сообщения.

Когда производитель отправляет данные лидеру, уровень достоверности данных можно задать через параметр request.required.acks:

  • 1 (по умолчанию): это означает, что производитель отправляет следующее сообщение после того, как лидер в ISR успешно получил и подтвердил данные. Если лидер выйдет из строя, данные будут потеряны.

  • 0: Это означает, что производителю не нужно ждать подтверждения от брокера, чтобы продолжить отправку следующего пакета сообщений. В этом случае эффективность передачи данных самая высокая, но достоверность данных действительно самая низкая.

  • -1: Производитель должен дождаться, пока все последователи в ISR подтвердят, что данные были получены, прежде чем передача будет завершена, и надежность является самой высокой. Однако это не гарантирует, что данные не будут потеряны, например, когда в ISR есть только лидер (остальные узлы отключены от zk, либо не догнали), становится ситуация acks=1.

4. Модель высокой доступности и идемпотентность

Обычно в распределенных системах существует три семантики обработки:

  • Хотя бы один раз:

    По крайней мере, один раз, возможно, много раз. Если производитель получает подтверждение от ACK, это означает, что сообщение было записано в Кафку, что происходит точно один раз, что именно-один раз позади. Но если продюсер времена выходит или получает ошибку, а запрос. Required.acks конфигурация не является -1, она повторится отправка сообщения, и клиент подумает, что сообщение не было написано в Кафке. Если брокер не удается отправлять ACK, но после того, как сообщение успешно написано в Кафке, это повторение приведет к тому, что наше сообщение будет записано дважды, поэтому сообщение будет доставлено в конечному потребителю более одного раза, если логика обработки потребителей делает Не гарантированная идемпотентность даст неверные результаты.
    В этой семантике будет нарушение порядка, то есть когда первый акк не прошел и готов повторить попытку, но второе сообщение было отправлено, на этот раз будет нарушение порядка в одном разделе, нам нужно установить параметр производителяmax.in.flight.requests.per.connection, Flight.requests — это очередь, используемая производителем для сохранения отправляемых запросов без ответа, гарантируя, что количество неотвеченных запросов на источнике равно 1.

  • at-most-once:

    Если производитель не повторяет попытку после истечения времени ожидания подтверждения или возвращает ошибку, то есть мы говорим request.required.acks=-1, сообщение может в итоге не быть записано в kafka, поэтому потребитель не получит сообщение.

  • ровно один раз:

    Ровно один раз, даже если производитель попытается отправить сообщение, сообщение гарантированно будет доставлено потребителю не более одного раза. Эта семантика является наиболее желательной и наиболее сложной для реализации. Точность-один раз не может быть гарантирована до 0,10, и необходимо использовать гарантию идемпотентности, которая поставляется с потребителем. 0.11.0 использует гарантии транзакций

4.1 Как реализовать ровно один раз

Для реализации ровно один раз в Kafka 0.11.0 есть две официальные стратегии:

4.1.1 Один производитель и одна тема

После инициализации каждому производителю будет назначен уникальный PID, для каждого уникального PID, передаваемого в назначенный раздел темы в конкретном сообщении, производитель будет иметь монотонно увеличивающийся от 0 порядковый номер.

На стороне брокера мы также будем поддерживать параметр , который будет выравниваться и проверяться каждый раз при отправке сообщения:

  • Если порядковый номер сообщения на один или несколько больше, чем порядковый номер, поддерживаемый Брокером, это означает, что в середине есть данные, которые не были записаны, то есть не по порядку. В это время Брокер отклоняет сообщение и производитель выдает InvalidSequenceNumber

  • Если порядковый номер сообщения меньше или равен порядковому номеру, поддерживаемому брокером, это означает, что сообщение было сохранено, что является дубликатом сообщения.Брокер напрямую отбрасывает сообщение, а производитель выдает DuplicateSequenceNumber.

  • Если порядковый номер сообщения больше на единицу, оно считается допустимым.

Упомянутое выше решает две проблемы:

1. Когда лодость отправила сообщение не удается, брокер и не сохраняется, но второе сообщение было успешно отправлено, в результате чего выходит из данных заказа.

2. Когда производитель отправляет сообщение, брокер успешно сохраняет его, возврат подтверждения завершается ошибкой, и производитель снова доставляет дубликат сообщения.

Все вышеперечисленное находится под одним и тем же PID, а значит должно быть гарантировано в рамках одной сессии в одном Producer.Если Producer зависнет, то будет назначен новый PID, поэтому гарантии нет, поэтому в Kafka есть транзакции , механизм для обеспечения.

4.1.2 Транзакции

Роль транзакций в kafka

  • Реализовать семантику ровно один раз

  • Гарантия атомных операций либо все преуспевает, либо все сбой.

  • Восстановление государственных операций

Транзакция может гарантировать, что даже если она охватывает несколько , операции в очереди потребления в этой транзакции рассматриваются как атомарные, либо все выполняются успешно, либо все завершаются неудачно. Кроме того, приложения с отслеживанием состояния также могут гарантировать продолжение обработки с точки останова после перезапуска, то есть восстановление транзакции. В транзакции kafka приложение должно предоставить уникальный идентификатор транзакции, то есть идентификатор транзакции, и он не изменится после выключения и перезапуска.Идентификатор транзакции и PID могут соответствовать один к одному. Разница в том, что идентификатор транзакции предоставляется пользователем, в то время как PID реализован внутри и прозрачен для пользователя. После перезапуска Producer старый Producer с тем же идентификатором транзакции становится недействительным.Каждый раз, когда Producer получает PID через идентификатор транзакции, он также получает монотонно возрастающую эпоху. Поскольку эпоха старого Продюсера меньше эпохи нового Продюсера, Кафка может легко распознать, что Продюсер — это старый Продюсер, и отклонить его запрос. Для этого в Kafka 0.11.0.0 появился серверный модуль под названием Transaction Coordinator, который управляет транзакционным характером сообщений, отправляемых производителями. Координатор транзакций ведет журнал транзакций, который хранится во внутренней теме. Поскольку данные Topic являются постоянными, состояние транзакции также является постоянным. Производитель не читает и не записывает журнал транзакций напрямую, он связывается с координатором транзакций, а затем координатор транзакций вставляет статус транзакции в соответствующий журнал транзакций. Структура журнала транзакций аналогична смещению, используемому журналом смещения для сохранения потребителя.

Наконец

Что касается некоторых распространенных вопросов на собеседовании об очереди сообщений или Кафке, из приведенной выше статьи можно извлечь следующие классические вопросы:

  1. Зачем использовать очереди сообщений? Какова роль очереди сообщений?
  2. Как темы и разделы хранятся в Kafka и каковы их характеристики?
  3. Модель потребления Kafka по сравнению с традиционными системами обмена сообщениями. Каковы ее преимущества?
  4. Как Kafka обеспечивает распределенное хранение и чтение данных?
  5. Почему kafka поддерживает меньше автономных частей, чем RocketMQ?
  6. Зачем вам разделы, то есть в теме только один раздел, не так ли?
  7. Почему журналы должны быть сегментированы?
  8. Какой механизм использует Kafka для обеспечения высокой надежности и доступности?
  9. Как очередь сообщений гарантирует, что сообщения являются идемпотентными?
  10. Позвольте вам создать свою собственную очередь сообщений, как бы вы спроектировали то, что будет рассматриваться?

Большинство проблем могут найти ответ с резюме выше, если это не так, то я бы сосредоточился на общественных числах, позвольте мне ответить на это для вас.

Наконец, эта статья была включена в JGrowing, всеобъемлющий и отличный маршрут изучения Java, совместно созданный сообществом. Если вы хотите участвовать в обслуживании проектов с открытым исходным кодом, вы можете создать его вместе. Адрес github: https:// github.com/javagrowing/JGrowing Пожалуйста, дайте мне маленькую звезду.

Рекламируйте, если вы думаете, что в этой статье есть статьи для вас, вы можете подписаться на мой технический паблик, ваше внимание и пересылка - самая большая поддержка для меня, O (∩_∩) O