Независимо от того, используется ли Kafka в качестве очереди сообщений, сообщений, шины или платформы хранения данных, всегда необходимо иметь производителя, который может записывать данные в Kafka, и потребителя, который может читать данные из Kafka, или комбинацию обоих ролевых приложений.
Например, в системе обработки транзакций по кредитным картам есть клиентское приложение, которое может быть интернет-магазином, ответственным за отправку транзакций в Kafka всякий раз, когда происходит платеж. Другое приложение проверяет транзакцию на соответствие механизму правил и решает, утвердить ее или отклонить. Утвержденные или отклоненные ответные сообщения записываются обратно в Kafka, а затем отправляются в интернет-магазин, инициировавший транзакцию. Третье приложение считывает состояние транзакций и аудита из Kafka, сохраняет их в базе данных, а затем аналитики могут анализировать эти результаты и, возможно, улучшать механизм правил.
Разработчики могут использовать встроенный клиентский API Kafka для разработки приложений Kafka.
В этой главе мы начнем с дизайна и компонентов производителя Kafra и узнаем, как использовать производителя Kafka. Мы покажем, как создавать объекты KafkaProducer и ProducerRecords, как отправлять записи в Kafka и как обрабатывать ошибки, возвращенные из Kafka, затем представим важные параметры конфигурации, управляющие поведением производителей, и, наконец, углубимся в то, как использовать различные методы секционирования. и сериализаторы последовательностей, а также как настроить сериализаторы и разделители.
В следующей главе мы познакомимся с потребительским клиентом Kafra и узнаем, как читать сообщения от Kafka.
Обзор производителя
Приложение должно писать сообщения в Kafka во многих ситуациях: записывать действия пользователя (для аудита и анализа), записывать метрики, сохранять журналы, сообщения, записывать информацию об интеллектуальных устройствах, асинхронно взаимодействовать с другими приложениями, буферизировать данные, которые будут записаны в база данных и т.д.
Различные сценарии использования означают различные требования: учитывается ли каждое сообщение? Допустима ли потеря части сообщений? Допустимы ли случайные дубликаты сообщений? Существуют ли строгие требования к задержке и пропускной способности?
В упомянутой ранее системе обработки транзакций по кредитным картам не допускается потеря или дублирование сообщений, максимально допустимая задержка составляет 500 мс, а требования к пропускной способности высоки.Мы надеемся обрабатывать один миллион сообщений в секунду.
Сохранение информации о кликах на веб-сайте — еще один сценарий использования. В этом сценарии допускается потеря или повторение небольшого количества сообщений, а задержка может быть выше, если это не влияет на взаимодействие с пользователем. Другими словами, пока страница загружается сразу после того, как пользователь щелкнет ссылку, мы не возражаем против того, что сообщению потребуется несколько секунд, чтобы достичь сервера Kafka. Пропускная способность зависит от того, как часто пользователи используют сайт.
Различные сценарии использования напрямую влияют на использование и конфигурацию API-интерфейса производителя.
Хотя API производителя прост в использовании, процесс отправки сообщений немного сложнее. На следующей диаграмме показаны основные этапы отправки сообщения в Kafka.
Диаграмма компонентов производителя KafkaНачнем с создания объекта ProducerRecord, который должен содержать целевую тему и данные для отправки. Мы также можем указать ключи или разделы. При отправке объекта ProducerRecord производитель сначала сериализует объекты ключей и значений в массивы байтов, чтобы их можно было передавать по сети.
Далее данные передаются разделителю. Если раздел был ранее указан в объекте ProducerRecord, то разделитель ничего не сделает и вернет указанный раздел напрямую. Если раздел не указан, разделитель выбирает раздел на основе ключа объекта ProducerRecord. После выбора раздела производитель знает, в какую тему и раздел отправить запись. Затем запись добавляется в пакет записей, и все сообщения в этом пакете отправляются в ту же тему и раздел. Отдельный поток отвечает за отправку пакетов этих записей соответствующему брокеру.
Сервер возвращает ответ, когда он получает эти сообщения. Если сообщение успешно записано в Kafka, возвращается объект RecordMetaData, который содержит информацию о теме и разделе, а также смещение, записанное в разделе. Если запись не удалась, возвращается ошибка. Производитель попытается повторно отправить сообщение после получения ошибки, и если он по-прежнему терпит неудачу через несколько раз, он вернет сообщение об ошибке.
Создать производителя Kafka
Чтобы писать сообщения в Kafka, сначала создайте объект производителя и установите некоторые свойства.
В следующем фрагменте кода показано, как создать нового производителя, здесь указываются только необходимые свойства, остальные используют настройки по умолчанию.
private Properties kafkaProps = new Properties();
kafkaProps.put("bootstrap.servers","broker1:9092,broker2:9092");
kafkaProps.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put("value.seializer","org.apache.kafka.common.serialization.StringSerializer");
producer = new KafkaProducer<String, String>(kafkaProps);
Производитель Kafka имеет 3 обязательных свойства
bootstrap.servers
Это свойство определяет список адресов брокера в формате хост:порт. Список не обязательно должен содержать все адреса брокеров, производитель найдет информацию о других брокерах у данного брокера. Однако рекомендуется предоставить информацию как минимум о двух брокерах, чтобы в случае выхода из строя одного из них производитель все равно мог подключиться к кластеру.
key.serializer
Ключ и значение сообщения, которое брокер ожидает получить, представляют собой массивы байтов. Интерфейс производителя позволяет использовать параметризованные типы, поэтому объекты Java можно отправлять брокеру в виде ключей и значений. Такой код очень удобочитаем, но производителю нужно знать, как преобразовать эти Java-объекты в байтовые массивы. Для key.serializer должен быть задан класс, реализующий интерфейс org.apache.kafka.common.serialization.Serializer, который производитель будет использовать для сериализации ключевых объектов в байтовые массивы. Клиент Kafka по умолчанию предоставляет ByteArraySerializer (который делает очень мало вещей), StringSerializer и IntegerSerializer, поэтому, если вы используете только несколько распространенных типов объектов Java, нет необходимости реализовывать собственный сериализатор. Обратите внимание, что key.serializer должен быть установлен, даже если вы собираетесь отправлять только содержимое значения.
value.serializer
Как и key.serializer, класс, указанный в value.serializer, будет сериализовать значение. Если и ключи, и значения являются строками, можно использовать тот же сериализатор, что и key.serializer. Если ключ представляет собой целочисленный тип, а значение представляет собой символьный сектор, необходимо использовать другой сериализатор.
Существует три основных способа отправки сообщений:
1. Выстрелил-забыл: мы отправляем сообщение на сервер, но нам все равно, дойдет ли оно нормально. В большинстве случаев сообщения приходят нормально, потому что Kafka высокодоступна, и производители автоматически пытаются отправить их повторно. Однако при использовании этого метода некоторые сообщения иногда теряются.
2. Синхронная отправка: мы используем метод send() для отправки сообщения, он возвращает объект Future, вызывает метод get() для ожидания, и тогда мы можем узнать, успешно ли отправлено сообщение.
3. Асинхронная отправка: мы вызываем метод send() и указываем функцию обратного вызова, которую сервер вызывает при возврате ответа.
В следующих примерах мы покажем, как использовать вышеуказанные методы для отправки сообщений и как работать с возможными исключениями.
Во всех примерах в этой главе используется один поток, но производитель может использовать несколько потоков для отправки сообщений. В начале можно использовать один потребитель и один поток. Если требуется более высокая пропускная способность, количество потоков можно увеличить без изменения количества производителей. Если этого недостаточно, количество производителей можно увеличить.
Отправить сообщение Кафке
Самый простой способ синхронной отправки сообщений выглядит следующим образом:
ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry", "Precision Products", "France");
try{
producer.send(record);
} catch(Exception e) {
e.printStack();
}
Метод send() производителя принимает объект ProducerRecord в качестве параметра, ему требуется имя целевой темы и объекты ключа и значения для отправки, которые являются строками. Типы объектов ключей и значений должны соответствовать объектам сериализатора и производителя.
Мы используем сторону send() производителя для отправки объекта ProducerRecord. Как видно из схемы архитектуры производителя, сообщение сначала помещается в буфер, а затем отправляется на сервер с помощью отдельного потока. Метод send() возвращает объект Future, содержащий RecordMetadata, но, поскольку мы игнорируем возвращаемое значение, невозможно узнать, было ли сообщение отправлено успешно. Если вас не волнует отправка результатов, вы можете использовать этот метод отправки. Например, вести журнал сообщений Twitter или менее важных журналов приложений.
Мы можем игнорировать ошибки, которые могут возникнуть при отправке сообщения, или ошибки, которые могут возникнуть на стороне сервера, но другие исключения могут возникнуть у производителя до отправки сообщения. Этими исключениями могут быть SerializationException (указывающие на сбой при сериализации сообщения), BufferExhaustedException или TimeoutException (указывающие на то, что буфер заполнен) или InterruptException (указывающие на то, что отправляющий поток был прерван).
Отправлять сообщения синхронно
ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry", "Precision Products", "France");
try{
producer.send(record).get();
} catch(Exception e) {
e.printStack();
}
Здесь сторона производителя.send() сначала возвращает объект Future, а затем вызывает метод get() объекта Future, чтобы дождаться ответа Kafka. Если сервер вернет ошибку, get() выдаст исключение. Если ошибок не возникает, мы получаем объект RecordMetadata, который можно использовать для получения смещения сообщения. Если перед отправкой данных или во время процесса отправки возникает какая-либо ошибка, например, брокер возвращает исключение, которое не разрешает повторную передачу сообщения, или превышено количество повторных передач, то будет выдано исключение. Мы просто печатаем информацию об исключении.
Как обрабатывать ошибки, возвращаемые производителями Kafka
В KafkaProducer обычно возникают два типа ошибок. Одна категория — повторяемые ошибки, которые можно устранить путем повторной отправки сообщения. Например, для ошибок соединения это может быть решено путем повторного установления соединения, а ошибки «без лидера» могут быть решены путем повторного выбора лидера для раздела. KafkaProducer можно настроить на автоматический повтор, и если ему не удастся решить проблему после нескольких попыток, приложение получит исключение повторной попытки. Другой тип ошибок, которые не могут быть устранены повторными попытками, например исключения «сообщение слишком велико». Для такого рода ошибок KafkaProducer не будет предпринимать никаких повторных попыток и сразу выдаст исключение.
Отправляйте сообщения асинхронно
Предположим, что сообщению требуется 10 мс, чтобы пройти туда и обратно между приложением и кластером Kafka. Если вы ждете ответа после отправки каждого сообщения, для отправки 100 сообщений потребуется 1 секунда. Но если просто отправить сообщение и не ждать ответа, на отправку 100 сообщений уйдет намного меньше времени. В большинстве случаев нам не нужно ждать ответа — хотя Kafka отправит обратно тему назначения, информацию о разделе и смещение сообщения, это не обязательно для отправляющего приложения. Однако, когда мы сталкиваемся с ошибкой отправки сообщения, нам нужно создать исключение, зарегистрировать ошибку или записать сообщение в файл «сообщения об ошибке» для последующего анализа.
Для обработки исключений при асинхронной отправке сообщений производитель предоставляет поддержку обратного вызова. Ниже приведен пример использования асинхронной отправки сообщений, обратных вызовов.
Конфигурация производителя
До сих пор мы рассмотрели только несколько необходимых параметров конфигурации для производителя — API bootstrap.servers и сериализатор.
Производитель также имеет множество настраиваемых параметров, которые описаны в документации Kafka, большинство из них имеют разумные значения по умолчанию, поэтому их не нужно изменять. Однако есть несколько параметров, которые оказывают большое влияние на производителей с точки зрения использования памяти, производительности и надежности, и мы объясним их один за другим.
1. acks
Параметр acks указывает, сколько реплик раздела должно получить сообщение, прежде чем производитель сочтет запись сообщения успешной.
Этот параметр оказывает существенное влияние на вероятность потери сообщения. Этот параметр имеет следующие опции. • Если acks=0, производитель не будет ждать ответа от сервера, прежде чем успешно написать сообщение. То есть, если что-то пойдет не так и сервер не получит сообщение, производитель не сможет узнать об этом, и сообщение будет потеряно. Однако, поскольку производителю не нужно ждать ответа от сервера, он может отправлять сообщения с максимальной скоростью, которую может поддерживать сеть, что обеспечивает высокую пропускную способность.
• Если acks=1, производитель будет получать ответ об успехе от сервера каждый раз, когда ведущий узел кластера получает сообщение. Если сообщение доходит до узла-лидера без коллизий (например, узел-лидер рушится, а новый лидер не выбран), производитель получит ответ об ошибке и, во избежание потери данных, отправит сообщение повторно. Однако если новым лидером станет узел, не получивший сообщение, сообщение все равно будет потеряно. Пропускная способность в это время зависит от того, используется ли синхронная или асинхронная передача. Если вы заставите клиент-отправитель ждать ответа сервера (вызвав метод get() объекта Future), вы, очевидно, увеличите задержку (задержку приема-передачи в сети). Если клиент использует асинхронные обратные вызовы, проблема задержки может быть смягчена, но пропускная способность по-прежнему ограничена количеством отправляемых сообщений (например, сколько сообщений может отправить производитель, прежде чем получит ответ от сервера).
• Если acks=all, производитель получит успешный ответ от сервера, только если все узлы, участвующие в репликации, получили сообщение. Этот режим самый безопасный, он гарантирует, что сообщение получит более одного сервера, и даже если один сервер выйдет из строя, весь кластер все равно сможет функционировать (подробнее в главе 5). Однако его задержка выше, чем при acks=1, потому что нам приходится ждать, пока сообщение будет получено более чем одним серверным узлом.
2. buffer.memory
Этот параметр используется для установки размера буфера памяти производителя, который используется производителем для буферизации сообщений, отправляемых на сервер. Если приложение отправляет сообщения на сервер быстрее, чем может, это приведет к нехватке места у производителя. В это время вызов метода send() либо блокируется, либо генерируется исключение, в зависимости от того, как установлен параметр block.on.buffe.full (в версии 0.9.0.0 заменен на max.block.ms, указывающий, что throw Может заблокироваться на некоторое время перед исключением).
3. compression.type
По умолчанию сообщения отправляются без сжатия. Для этого параметра можно задать значение snappy, gzip или lz4, которое указывает, какой алгоритм сжатия используется для сжатия сообщения перед его отправкой брокеру. Алгоритм быстрого сжатия был изобретен Google. Он потребляет меньше ресурсов ЦП, но может обеспечить более высокую производительность и значительный коэффициент сжатия. Если вас больше беспокоит производительность и пропускная способность сети, вы можете использовать этот алгоритм. Алгоритм сжатия gzip обычно требует больше ресурсов ЦП, но обеспечивает более высокую степень сжатия, поэтому этот алгоритм можно использовать, если пропускная способность сети относительно ограничена. Использование сжатия может уменьшить нагрузку на передачу по сети и нагрузку на хранилище, что часто является узким местом при отправке сообщений в Kafka.
4. retries
Ошибка, которую производитель получает от сервера, может быть временной ошибкой (например, раздел не может найти лидера). В этом случае значение параметра retries определяет, сколько раз производитель может повторно отправить сообщение, если это число будет достигнуто, производитель прекратит повторную попытку и вернет ошибку. По умолчанию производитель будет ждать 100 мс между повторными попытками, но этот интервал можно изменить с помощью параметра retries.backoff.ms. Перед установкой числа повторных попыток и интервала повторных попыток рекомендуется проверить, сколько времени потребуется для восстановления отказавшего узла (например, сколько времени потребуется всем разделам для выбора лидера), чтобы общее время повторных попыток было больше, чем восстановление кластера Kafka после сбоя долго, иначе производитель преждевременно откажется от повторной попытки. Однако некоторые ошибки не являются временными и не могут быть устранены путем повторной попытки (например, ошибки "слишком тихие"). Как правило, в логике кода нет необходимости обрабатывать повторяющиеся ошибки, поскольку производитель автоматически повторяет попытку. Вам нужно обрабатывать только неповторяемые ошибки или повторные попытки, которые превышают лимит.
5. batch.size
Когда в один и тот же раздел необходимо отправить несколько сообщений, производитель помещает их в пакет. Этот параметр указывает объем памяти, который может использовать пакет, в байтах (не в сообщениях). Когда пакет заполнен, все сообщения в пакете будут отправлены. Тем не менее, скважины-производители не обязательно все ждут, пока пакет не будет заполнен перед отправкой, и могут быть отправлены наполовину захваченные пакеты, даже пакеты, содержащие только одно сообщение. Таким образом, даже если вы установите очень большой размер пакета, это не приведет к задержке, а просто займет больше памяти. Но если задать слишком маленькое значение, это добавит дополнительные накладные расходы, поскольку производителю необходимо отправлять сообщения чаще.
6. linger.ms
Этот параметр указывает, как долго производитель ожидает добавления дополнительных сообщений к пакету перед отправкой пакета. KafkaProducer будет отправлять пакеты, когда пакет будет заполнен или когда будет достигнут предел linger.ms. По умолчанию производитель будет отправлять сообщения, пока доступны потоки, даже если в пакете только одно сообщение. Если задать для linger.ms значение больше 0, производитель некоторое время будет ждать перед отправкой пакета, чтобы в пакет было добавлено больше сообщений. Хотя это увеличивает задержку, это также увеличивает пропускную способность (поскольку одновременно отправляется больше сообщений, накладные расходы на сообщение меньше).
7. client.id
Этот параметр может быть любой строкой, которую сервер будет использовать для идентификации источника сообщения, а также может использоваться в журналах и метриках квот.
8. max.in.flight.requests.per.connection
Этот параметр указывает, сколько сообщений может отправить производитель, прежде чем получит ответ от сервера. Чем выше его значение, тем больше используется памяти, но при этом повышается и пропускная способность. Установка его в 1 гарантирует, что сообщения будут записываться на сервер в том порядке, в котором они были отправлены, даже если будут повторные попытки.
9. timeout.ms, request.timeout.ms и metadata.fetch.timeout.ms
request.timeout.ms указывает время ожидания производителем ответа сервера при отправке данных, а metadata.fetch.timeout.ms указывает время ожидания производителем ответа сервера при получении метаданных. (например, кто является лидером целевого раздела). Если время ожидания ответа истекло, производитель либо повторит попытку отправки данных, либо вернет ошибку (сгенерирует исключение или выполнит обратный вызов). timeout.ms указывает время, в течение которого брокер ожидает, пока синхронная реплика вернет подтверждение сообщения, которое соответствует конфигурации запросов. Если в течение указанного времени от синхронной реплики не будет получено подтверждения, брокер вернет ошибку.
10. max.block.ms
Этот параметр указывает время блокировки производителя при вызове метода send() или использовании partitionFor() для получения метаданных. Эти стороны блокируются, когда перехвачен буфер отправки производителя или когда нет доступных метаданных. Когда время блокировки достигает max.block.ms, производитель выдает исключение тайм-аута.
11 . max.request.size
Этот параметр используется для управления размером запроса, отправляемого производителем. Он может относиться к максимальному значению одного сообщения, которое может быть отправлено, или к общему размеру всех сообщений в одном запросе. Например, если предположить, что это значение равно 1 МБ, то самое большое отдельное сообщение, которое может быть отправлено, составляет 1 МБ, или производитель может отправить пакет из 1000 сообщений, каждое размером 1 КБ, в одном запросе. Кроме того, у посредника также есть собственный предел максимального значения сообщений, которые могут быть получены ( message.max.bytes ), поэтому лучше всего согласовать конфигурации с обеих сторон, чтобы избежать отклонения сообщения, отправленного производителем. брокер.
12. Receive.buffer.bytes и send.buffer.bytes
Эти два параметра определяют размер буфера сокета TCP для приема и отправки пакетов соответственно. Если они установлены на -1 , используются значения операционной системы по умолчанию. Если производитель или потребитель находится в другом центре обработки данных, чем брокер, эти значения могут быть соответствующим образом увеличены, поскольку сеть между центрами обработки данных обычно имеет более высокую задержку и более низкую пропускную способность.
гарантия заказа
Kafka может гарантировать, что сообщения в одном и том же разделе упорядочены. То есть, если производитель отправляет сообщения в определенном порядке, брокер будет записывать их в раздел в этом порядке, и потребитель будет читать их в том же порядке. В некоторых случаях порядок очень важен. Если количество повторных попыток установлено ненулевым целым числом, а для параметра max.in.flight.requests.per.connection задано число больше 1, то если первая партия сообщений не может быть записана, а вторая партия пишет. запись успешна, брокер повторит попытку записи первой партии. Если первый пакет также успешно записан в это время, порядок двух пакетов меняется на противоположный.
Вообще говоря, если некоторые сценарии требуют упорядочения сообщений, также важно, чтобы сообщения были написаны успешно, поэтому не рекомендуется, чтобы порядок был очень важным. Если количество попыток равно 0. Вы можете установить max.in.flight.requests.per.connection равным 1, чтобы, когда производитель пытается отправить первый пакет сообщений, никакие другие сообщения не отправлялись брокеру. Однако это может серьезно повлиять на пропускную способность производителя, поэтому это следует делать только при наличии жестких требований к порядку сообщений.