Интервьюер спросил меня, как сделать так, чтобы Кафка не терял сообщения?Я плакала!

Java Kafka

Как kafka гарантирует, что сообщения не будут потеряны

PS: В этой статье много народного! Надеюсь, после просмотра у вас что-то получится.

Друзья, не знающие Кафку, советую ознакомиться со следующими моими статьями, первую надо читать, а остальные учить по мере необходимости.

  1. Начиная! Народный язык знакомит вас с Кафкой!
  2. 5 минут, чтобы познакомиться с Кафкой
  3. Третья часть серии Кафка! Узнайте, как использовать Kafka в качестве очереди сообщений в программе Spring Boot за 10 минут?

Производитель теряет сообщение

Продюсер (продюсер) звонокsendПосле того, как метод отправит сообщение, сообщение может не быть отправлено из-за проблем с сетью.

Итак, мы не можем звонить по умолчаниюsendПосле того, как метод отправляет сообщение, сообщение message отправляется успешно. Чтобы определить, было ли сообщение отправлено успешно, нам нужно судить о результате отправки сообщения. Но следует отметить, что производитель Kafka (Продюсер) используетsendметод отправки сообщения на самом деле является асинхронной операцией, мы можем передатьget()Метод получает результат вызова, но это также делает его синхронной операцией Пример кода выглядит следующим образом:

Подробный код см. в этой моей статье:Третья часть серии Кафка! Узнайте, как использовать Kafka в качестве очереди сообщений в программе Spring Boot за 10 минут?

SendResult<String, Object> sendResult = kafkaTemplate.send(topic, o).get();
if (sendResult.getRecordMetadata() != null) {
  logger.info("生产者成功发送消息到" + sendResult.getProducerRecord().topic() + "-> " + sendRe
              sult.getProducerRecord().value().toString());
}

Но это вообще не рекомендуется! Это может быть в виде добавления к нему callback-функции Пример кода выглядит следующим образом:

        ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, o);
        future.addCallback(result -> logger.info("生产者成功发送消息到topic:{} partition:{}的消息", result.getRecordMetadata().topic(), result.getRecordMetadata().partition()),
                ex -> logger.error("生产者发送消失败,原因:{}", ex.getMessage()));

Если сообщение не удается отправить, мы проверяем причину сбоя и повторно отправляем его!

Кроме того, рекомендуется как производительretries(Количество повторных попыток) Установите разумное значение, обычно 3, но, чтобы гарантировать, что сообщение не будет потеряно, обычно устанавливается большее значение. После завершения настройки при возникновении проблемы с сетью отправка сообщения может быть автоматически повторена, чтобы избежать потери сообщения. Кроме того, рекомендуется установить интервал повтора, потому что, если интервал слишком мал, эффект повтора не будет очевиден.Как только сеть колеблется, вы будете повторять попытку 3 раза одновременно.

Потребители теряют сообщения

Мы знаем, что сообщениям присваивается определенное смещение, когда они присоединяются к разделу. Смещение (offset) указывает местоположение Раздела, потребляемого в данный момент Потребителем. Kafka гарантирует порядок сообщений внутри раздела посредством смещения.

kafka offset

Когда потребитель извлекает сообщение из раздела, потребитель автоматически отправляет смещение. Будет проблема с автоматической отправкой, представьте, что когда потребитель только что получил сообщение и был готов его принять, он внезапно завис, сообщение фактически не было использовано, но смещение было отправлено автоматически.

Решение также относительно грубое: мы вручную отключаем автоматическую отправку смещения и вручную отправляем смещение после того, как каждое сообщение фактически потребляется.Однако осторожные друзья обязательно обнаружат, что это вызовет проблему повторного потребления сообщений. Например, после того, как вы только что израсходовали сообщение, но не отправили смещение, и сами повесили трубку, то сообщение теоретически будет израсходовано дважды.

Кафка потерял сообщения

Мы знаем, что Kafka представляет механизм Replica для Partition. Между несколькими репликами в разделе будет парень, называемый лидером, а остальные реплики называются последователями. Сообщения, которые мы отправляем, отправляются на ведущую реплику, а затем ведомые реплики могут получать сообщения от ведущей реплики для синхронизации. Производители и потребители взаимодействуют только с репликой лидера. Вы можете понять, что другие реплики — это просто копии ведущей реплики, и они существуют только для обеспечения безопасности хранилища сообщений.

Представим ситуацию: если внезапно зависает брокер, в котором находится копия-лидер, то необходимо перевыбрать лидера из копии-последователя, но если в данных лидера остались какие-то данные, не синхронизированные копией-последователем, он будет привести к потере сообщения.

установить акки = все

Решение состоит в том, что мы устанавливаемacks = all. acks — очень важный параметр для производителей Kafka.

Значение acks по умолчанию равно 1, что означает, что наше сообщение успешно отправлено после его получения ведущей репликой. когда мы настраиваемacks = allЭто означает, что все реплики должны получить сообщение, прежде чем сообщение будет действительно успешно отправлено.

установить replication.factor >= 3

Чтобы убедиться, что реплика-лидер может иметь реплики-последователи для синхронизации сообщений, мы обычно устанавливаем тему дляreplication.factor >= 3. Это гарантирует, что каждый раздел (раздел) имеет как минимум 3 реплики. Хотя возникает избыточность данных, она обеспечивает безопасность данных.

установить min.insync.replicas> 1

В общем, нам также нужно установитьmin.insync.replicas> 1, поэтому конфигурация означает, что сообщение должно быть записано как минимум в 2 реплики, прежде чем оно будет успешно отправлено.min.insync.replicasЗначение по умолчанию 1 равно 1, и значения по умолчанию 1 следует по возможности избегать в реальном производстве.

Однако, чтобы обеспечить высокую доступность всей службы Kafka, необходимо убедиться, чтоreplication.factor > min.insync.replicas. Зачем? Представьте, что вы добавили, что они равны, пока одна копия выйдет из строя, весь раздел не будет работать должным образом. Это явное нарушение высокой доступности! Обычно рекомендуется устанавливатьreplication.factor = min.insync.replicas + 1.

установить unclean.leader.election.enable = false

Начиная с Kafka 0.11.0.0, значение параметра unclean.leader.election.enable по умолчанию было изменено с true на false.

Мы также сказали в начале, что сообщения, которые мы отправляем, будут отправляться на ведущую реплику, а затем реплики-последователи могут получать сообщения от ведущей реплики для синхронизации. Синхронизация сообщений между несколькими репликами подписчиков отличается.unclean.leader.election.enable = falseВ этом случае при сбое ведущей реплики ведущая не будет выбрана из ведомой реплики, чей уровень синхронизации с ведущей не соответствует требованиям, что снижает вероятность потери сообщений.

Reference

  • Официальная документация Кафки:kafka.apache.org/document ATI…
  • Geek Time — Раздел 11 «Основные технологии и практика Kafka»: как добиться конфигурации без потери сообщений?

Рекомендуемое чтение

«Проект с открытым исходным кодом 70k Star Java имеет версию для чтения в формате PDF! 》.

об авторе:Проект Github 70k StarJavaGuide(Публичный аккаунт с таким же именем) Автор. Каждую неделю я буду обновлять некоторые из моих собственных оригинальных галантерейных товаров в общественном аккаунте. За кулисами общественность ответила «1», чтобы получить необходимые учебные материалы для инженеров Java + интервью-сюрприз в формате pdf.