предисловие
вышеРазработайте систему push-уведомлений на миллион уровнейУпоминается, что поток сообщений принимаетKafka
как промежуточное ПО.
Среди них друг консультировал в случае большого количества сообщенийKakfa
Как обеспечить эффективность и последовательность сообщения?
просто в сочетании с этим вопросомKakfa
В исходном коде обсуждается, как правильно и эффективно отправлять сообщения.
Контента много, друзья, кому интересны исходники, прошу пристегнуть ремни😏 (исходники основаны на
v0.10.0.0
анализ версий). В то же время лучше иметь некоторый опыт использования Kafka и знать основы использования.
простой обмен сообщениями
Давайте посмотрим, как выглядит простая отправка сообщения, прежде чем анализировать его.
Следующий код построен на SpringBoot.
Сначала создайтеorg.apache.kafka.clients.producer.Producer
Боб.
Основная проблемаbootstrap.servers
, который является обязательным параметром. Относится к адресу брокера в кластере Kafka, например127.0.0.1:9094
.
Остальные параметры пока не обсуждаются и будут подробно представлены позже.
Затем введите этот компонент, чтобы вызвать его функцию отправки для отправки сообщения.
Вот я отправляю 10Вт кусков данных в определенную тему, а сообщение запущенной программы отправляется нормально.
Но это только для того, чтобы добиться отправки сообщения, ему все равно успешно доставлено сообщение или нет, оно равно чистому异步
Путь.
Синхронизировать
Итак, я хочу знать, успешно ли отправлено сообщение или нет, что мне делать?
фактическиProducer
изAPI
Он у нас уже учтен.После отправки нам нужно только вызвать егоget()
метод для получения результата отправки синхронно.
Отправить результат:
Эффективность такой отправки на самом деле относительно низкая, так как каждый раз приходится синхронно ждать результата отправки сообщения.
асинхронный
По этой причине мы должны отправлять его асинхронно.send()
Метод асинхронный по умолчанию, если он не вызывается вручную.get()
метод.
Но это не будет известно в результате отправки.
так что проверяйsend()
API можно найти там же параметр.
Future<RecordMetadata> send(ProducerRecord<K, V> producer, Callback callback);
Callback
Это интерфейс обратного вызова, который может вызвать нашу пользовательскую реализацию после отправки сообщения.
Результат после выполнения:
Те же результаты также могут быть получены, и обнаружено, что поток обратного вызова не синхронизирован выше.主线程
, которые также могут оказаться асинхронными обратными вызовами.
При этом callback будет передавать два параметра:
-
RecordMetadata
Метаданные после того, как сообщение, соответствующее приведенному выше, успешно отправлено. -
Exception
Информация об исключении при отправке сообщения.
Однако эти два параметра не имеют данных одновременно, только если отправка не удалась, будет ненормальная информация, а метаданные отправки пусты.
Таким образом, правильное написание должно быть:
Что касается того, почему только один параметр имеет значение, это будет объяснено по одному в анализе исходного кода ниже.
Анализ исходного кода
Сейчас я освоил только базовую отправку сообщений, если вы хотите глубоко разобраться в некоторых конфигурациях параметров при отправке, последнее слово за исходным кодом.
Прежде всего, поговорим обо всем процессе отправки сообщения.Kafka
Это не так просто, как отправлять сообщения по сети наbroker
, внутри Java все еще много оптимизаций и дизайнов.
Отправить процесс
Чтобы интуитивно понять процесс отправки, я просто нарисовал несколько ключевых шагов в процессе отправки.
Сверху вниз расположены:
- Инициализировать и фактически отправить сообщение
kafka-producer-network-thread
поток ввода-вывода. - Сериализировать сообщение.
- Получите раздел, который необходимо отправить.
- Запись во внутренний буфер.
- Инициализированный поток ввода-вывода постоянно использует этот буфер для отправки сообщений.
Пошаговый анализ
Каждый шаг подробно объясняется ниже.
инициализация
При вызове этого конструктора для инициализации это больше, чем просто запись основных параметров.KafkaProducer
. Более хлопотной вещью является инициализацияSender
Поток для потребления буфера.
Инициализируйте поток ввода-вывода:
Вы можете видеть, что потоку Sender нужны переменные-члены, такие как:
acks,retries,requestTimeout
и т.д. Эти параметры будут проанализированы позже.
сериализованное сообщение
вызовsend()
После функции первый шаг — сериализация, ведь наши сообщения нужно отправлять в Kafka через сеть.
один из нихvalueSerializer.serialize(record.topic(), record.value());
Это интерфейс, нам нужно указать класс реализации сериализации во время инициализации.
Мы также можем реализовать сериализацию сами, просто нужно реализоватьorg.apache.kafka.common.serialization.Serializer
интерфейс.
раздел маршрутизации
Далее идет раздел маршрутизации, обычно мы используемTopic
Несколько разделов созданы для масштабируемости и высокой производительности.
Если это раздел, все сообщения могут быть записаны в него.
Но несколько разделов неизбежно должны знать, в какой раздел производить запись.
Обычно есть три пути.
назначенный раздел
может быть построенProducerRecord
Укажите раздел для каждого сообщения.
Таким образом, будет судить, существует ли обозначение во время маршрутизации, и раздел будет использоваться напрямую.
Это обычно используется при специальных сценах.
настраиваемая стратегия маршрутизации
Если раздел не указан, он будет называтьсяpartitioner.partition
Интерфейс обеспечивает настраиваемую стратегию разбиения.
И нам нужно только настроить реализацию классаorg.apache.kafka.clients.producer.Partitioner
интерфейс, при созданииKafkaProducer
Конфигурация экземпляраpartitioner.class
параметр.
Обычно настраиваемые разделы необходимы, чтобы максимально обеспечить порядок сообщений.
Либо писать в какие-то определенные разделы, которые обрабатываются специальными потребителями и т.д.
политика по умолчанию
Последняя — это политика маршрутизации по умолчанию, которая применяется, если мы ничего не делаем.
Эта стратегия также делает распределение сообщений более равномерным.
Давайте посмотрим на его реализацию:
Проще говоря, он делится на следующие этапы:
- Получите количество разделов темы.
- +1 потокобезопасный счетчик, поддерживаемый внутри.
- По модулю количество разделов, чтобы получить номер раздела.
Фактически, это типичный алгоритм опроса, так что до тех пор, пока количество разделов не меняется часто, этот метод будет более равномерным.
запись во внутренний кеш
существуетsend()
После того, как метод получит раздел, он вызоветappend()
функция:
Эта функция вызоветgetOrCreateDeque()
запись во внутренний кешbatches
.
кеш потребления
Поток ввода-вывода, инициализированный в самом начале, на самом деле является потоком демона, который всегда будет потреблять данные.
Ранее записанные данные будут получены через несколько функций на рисунке. Этот контент может не нуждаться в углублении, но естьcompleteBatch
Метод очень важен.
При вызове этого метода сообщение должно быть отправлено, поэтому он будет вызванbatch.done()
Прежде чем мы завершим этоsend()
Интерфейс обратного вызова, определенный в методе.
Отсюда также видно, почему после завершения передачи появятся только одни метаданные и информация об исключении.
Анализ параметров производителя
После завершения процесса отправки давайте посмотримProducer
некоторые из наиболее важных параметров.
acks
acks
Это ключ параметр, влияющий на пропускную способность сообщения.
Есть[all、-1, 0, 1]
По умолчанию для этих параметров установлено значение 1.
так какKafka
Это не режим ведущий-ведомый, а режим ведущий-ведомый, аналогичный Zookeeper.
предпосылка
Topic
Настроить количество репликreplica > 1
.
когдаacks = all/-1
Время:
Это означает, что он гарантирует, что все последующие реплики завершат запись данных перед возвратом.
Это гарантирует, что сообщения не будут потеряны!
Но при этом производительность и пропускная способность самые низкие.
когдаacks = 0
Время:
Производитель не будет ждать копии любого ответа, поэтому, скорее всего, потеряет сообщения, но производительность самая лучшая!
когдаacks = 1
Время:
Это компромисс, он будет ждать ответа лидера реплики, но не ведомого.
Как только Лидер повесит трубку, сообщение будет потеряно. Но производительность и безопасность сообщений в определенной степени гарантируются.
batch.size
Из названия известно, что этот параметр ограничивает размер области внутреннего буфера, его правильное увеличение может улучшить пропускную способность.
Но нельзя экстремально, Тур Конгресса тратить память. Маленький, не может играть роли, и это также типичный компромисс времени и пространства.
Рисунок выше является отражением нескольких вариантов использования.
retries
retries
Этот параметр в основном используется для повторных попыток, когда происходит некоторое дрожание сети, это вызывает повторные попытки.
Этот параметр также предназначен для ограничения количества повторных попыток.
Но есть и некоторые другие проблемы.
- Поскольку это повторная передача, порядок сообщений может быть несогласованным, что также является случаем, когда даже сообщение о разделе не будет полностью упорядочено, как указано выше.
- Или из-за проблем с сетью, исходное сообщение было успешно написано, но не успешно отреагировано на производителе, он может появиться при попытке повторения
消息重复
.这种只能是消费者进行幂等处理。
Эффективный способ отправки
Если объем сообщения действительно очень большой, и вам нужно отправить сообщение наKafka
. Одинproducer
Всегда получайте эффекты, такие как размер кеша и т. д.
Можно ли создать несколькоproducer
отправить его?
- Настройте максимальное количество производителей.
- При отправке сообщения сначала получите
producer
, и определить, достигнут ли максимальный верхний предел при получении, а если нет, то создать новый и сохранить во внутреннийList
, хорошо поработайте над синхронизацией при сохранении, чтобы предотвратить проблемы параллелизма. - При получении отправителя вы можете использовать метод опроса, чтобы получить его в соответствии со стратегией разделения по умолчанию (чтобы обеспечить равномерное использование).
Таким образом, в сценарии с большой и частой отправкой сообщений можно повысить эффективность отправки иproducer
давление.
Закрыть продюсер
и наконецProducer
Производитель потребляет много ресурсов (потоки, память, сеть и т. д.) во время использования, поэтому его необходимо явно закрыть, чтобы переработать эти ресурсы.
дефолтclose()
Оба метода, так и методы с тайм-аутами вынуждены закрыть после определенного времени.
Однако оставшиеся задачи будут обработаны до истечения срока действия.
Так что, какой из них использовать, зависит от ситуации.
Суммировать
Эта статья содержит много контента и анализирует производителей Kafka с точки зрения примеров и исходного кода.
Я надеюсь, что друзья, прочитавшие это, смогут что-то получить, и добро пожаловать, чтобы оставить сообщение для обсуждения.
Неудивительно, что мы обсудим потребителей Kafka в следующем выпуске.
Если это поможет вам, пожалуйста, поделитесь этим для большего количества людей, чтобы увидеть.
Добро пожаловать, чтобы обратить внимание на публичный аккаунт, чтобы общаться вместе: