описание проблемы
За последние несколько дней было обнаружено, что у Kafka повторяются проблемы с потреблением в производственной среде.В журнале мы можем обнаружить, что часто возникает сигнал журнала о перебалансировке:
WARN [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=boot_kafka] Synchronous auto-commit of offsets {am_performance_topic-0=OffsetAndMetadata{offset=27914, metadata=''}} failed: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=boot_kafka] Revoking previously assigned partitions [am_performance_topic-0]
INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer - partitions revoked: [am_performance_topic-0]
INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=boot_kafka] (Re-)joining group
INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=boot_kafka] Successfully joined group with generation 474
INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=boot_kafka] Setting newly assigned partitions [am_performance_topic-0]
INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] o.s.k.l.KafkaMessageListenerCont
Для моего слабого кафки, который мало что знает о мелочах, единственный способ - поискать в Интернете. Вот мои сообщения в блоге, которые помогли решить проблему:
Разница между параметрами Kafka session.timeout.ms heartbeat.interval.ms и некоторыми мыслями о хранении данных
Подробное объяснение значения Kafka auto.offset.reset
Потребительский параметр Kafka auto.offset.reset
Подробное объяснение конфигурации производителя-потребителя spring-kafka
Разговор о Координаторе и механизме ребалансировки Kafka Consumer Group
анализировать
Давайте проанализируем следующие журналы предупреждений:
Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
Можно получить несколько ключевых параметровmax.poll.interval.ms
,max.poll.records
так же какsession timeout
. Простой перевод этого журнала
Не удалось выполнить фиксацию, поскольку группа kafka была перебалансирована и раздел был назначен другому участнику. Обычно это означает, что цикл опроса тратит слишком много времени на обработку сообщений, больше, чем настроенный max.poll.interval.ms. Вы можете исправить это, увеличив время ожидания сеанса или уменьшив максимальный размер пакета (max.poll.records), возвращаемый функцией poll().
Давайте сначала разберемся со значением следующих параметров, согласноРазница между параметрами Kafka session.timeout.ms heartbeat.interval.ms и некоторыми мыслями о хранении данных:
Поскольку тема часто имеет несколько разделов, и мы создадим несколько потребителей в группе потребителей для использования этой темы, поэтому возникает вопрос: какие сообщения раздела отправляются какому потребителю для потребления? Здесь задействованы три понятия: группа потребителей, потребитель в группе потребителей, и у каждой группы потребителей есть координатор группы. Назначение потребительских разделов осуществляется через протокол группового управления;
Каждый потребитель в группе потребителей отправляет запрос JoinGroup координатору группы, так что координатор группы имеет информацию о членах всех потребителей, поэтому он выбирает потребителя в качестве ведущего потребителя и сообщает ведущему потребителю: Вы берете эту информацию об участнике и Позвольте мне предоставить информацию о разделе вашей темы, чтобы определить, какие потребители несут ответственность за использование каких разделов.
Затем ведущий потребитель вычисляет раздел, который будет использоваться для каждого потребителя в соответствии с настроенной нами стратегией распределения (заданной параметром partition.assignment.strategy). Таким образом, каждый потребитель отправляет запрос SyncGroup координатору группы, но только запрос ведущего потребителя имеет стратегию выделения раздела.После того как координатор группы получит план выделения раздела ведущего потребителя, он отправляет план каждому потребителю. Нарисуйте такую картинку:
В нормальных условиях перебалансировка запускается, когда потребитель входит в группу потребителей или покидает ее.Так называемая перебалансировка заключается в переформулировании плана распределения разделов. При формировании плана распределения разделов каждый потребитель должен быть своевременно уведомлен, что связано с параметром heartbeat.interval.ms. А именно: каждый потребитель будет периодически отправлять хербит координатору группы в соответствии со временем, указанным в параметре heartbeat.interval.ms, и координатор группы будет отвечать каждому потребителю. REBALANCE_IN_PROGRESS, чтобы каждый потребитель знал, что произошла перебалансировка, а координатор группы также знал о выживании каждого потребителя.
Так зачем сравнивать heartbeat.interval.ms с session.timeout.ms? session.timeout.ms указывает время, которое требуется координатору группы для обнаружения сбоя потребителя. Потребитель в группе потребителей вешает трубку, и для его обнаружения требуется session.timeout.ms секунд. Например, session.timeout.ms=10, heartbeat.interval.ms=3
session.timeout.ms - "логический" индикатор, который указывает порог --- 10 секунд.Если координатор не получает никаких сообщений от потребителя в пределах этого порога, координатор считает, что потребитель завис. Heartbeat.interval.ms — это «физический» индикатор, который говорит потребителю отправлять пакет пульса координатору каждые 3 секунды.Чем меньше heartbeat.interval.ms, тем больше пакетов пульса отправляется, что влияет на количество Количество отправленных TCP-пакетов оказывает реальное влияние, поэтому я называю его «физическим» показателем.
Если координатор группы не получит сигнал пульса потребителя в течение периода heartbeat.interval.ms, он переместит потребителя из группы, что немного неразумно. Это как если бы потребитель сделал небольшую ошибку и убил ее одной палкой. На самом деле могут быть задержки в сети, а в потребителе может быть долговременный GC, который влияет на приход пакета heartbeat, и, возможно, следующий heartbeat будет нормальным.
Heartbeat.interval.ms должен быть меньше, чем session.timeout.ms.Если группа потребителей перебалансирована, потребитель может узнать, что перебалансировка произошла во времени, через REBALANCE_IN_PROGRESS в пакете пульса, тем самым обновив потребляющий раздел потребителя. И если session.timeout.ms превышено, то координатор группы считает, что потребитель завис, и конечно нет необходимости сообщать потребителю информацию о ребалансировке.
В версиях после kafka0.10.1, session.timeout.ms и max.poll.interval.ms разделены. То есть после нового объекта KafkaConsumer в процессе выполнения Consumer.poll в цикле while true для извлечения сообщений за ним фактически стоят два потока, то есть экземпляр потребителя kafka содержит два потока:Один поток сердцебиения, а другой поток обработки, поток обработки можно понимать как поток, который вызывает метод Consumer.poll для выполнения логики обработки сообщений, а поток пульса — это фоновый поток, который «скрыт» от программиста. Если логика обработки сообщений очень сложная, например, требуется обработка 5 минут, то для max.poll.interval.ms можно установить значение больше 5 минут. Поток пульса связан с упомянутым выше параметром heartbeat.interval.ms.Поток пульса отправляет пакет пульса координатору каждые heartbeat.interval.ms, чтобы доказать, что он все еще жив. Пока поток пульса отправляет пакет пульса координатору в пределах session.timeout.ms, координатор группы считает, что текущий потребитель kafka жив.
До kafka0.10.1 два процесса отправки пакетов Heartbeat и логика обработки сообщений были связаны вместе.Представьте: если время обработки сообщения занимает 5 минут, а session.timeout.ms=3000ms, то подождите, пока потребитель kafka обработает сообщение. сообщение, Координатор группы уже переместил получателя из группы. Поскольку имеется только один поток, он не может отправить пакет контрольных сигналов координатору группы во время обработки сообщения. Если пакет контрольных сигналов не отправляется более 3000 мс, координатор группы вытеснит потребителя из группы. Чтобы разделить их, поток обработки отвечает за выполнение логики обработки сообщений, а поток контрольных сообщений отвечает за отправку пакетов контрольных сообщений, тогда: даже если сообщение необходимо обрабатывать в течение 5 минут, пока нижний поток контрольных сообщений отправляет пакет пульса координатору группы по адресу session.timeout.ms. Потребитель может продолжать обрабатывать сообщения, не беспокоясь о том, что его удалят из группы. Другое преимущество заключается в том, что если есть проблема с потребителем, ее можно обнаружить в течение session.timeout.ms вместо ожидания max.poll.interval.ms.
краткое содержание
Таким образом, при срабатывании сбалансированного повторного потребления есть две причины:
- Во-первых, пакет пульса не получен в течение session.timeout.ms, и поток пульса будет отправлять пульс каждый интервал heartbeat.interval.ms.
- Другой заключается в том, что в течение времени max.poll.interval.ms сообщения с максимальным количеством max.poll.records, захваченные потребителем kafka в последний раз, не были обработаны, и смещение обновления не могло быть возвращено в течение времени max.poll.interval.ms Информация.
Итак, проблема на этот раз тоже очень проста, это второй случай выше, решение состоит в том, чтобы увеличить таймаут max.poll.interval.ms и уменьшить max.poll.records (количество сообщений за опрос). Если вы используете фреймворк spring-kafka, вы можете обратиться к тому, как писать параметры в этой части.Spring Boot Reference Guideи сообщение в блоге вышеПодробное объяснение конфигурации производителя-потребителя spring-kafka. Последняя модификация выглядит следующим образом:
kafka:
producer:
bootstrap-servers: 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
enable-auto-commit: true
group-id: boot_kafka
auto-commit-interval: 100
auto-offset-reset: earliest
max-poll-records: 50
bootstrap-servers: 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094
properties:
max.poll.interval.ms: 1200000
max.poll.interval.ms
Значение по умолчанию 300000 (5 минут),max.poll.records
Значение по умолчанию — 500. Дополнительные описания параметров см.Apache Kafka Documentation
Давайте снова поговорим о поле автоматического смещения-сброса
значение auto.offset.reset значение объяснение
- earliest
Когда в каждом разделе есть отправленное смещение, начать потребление с отправленного смещения; когда нет отправленного смещения, начать потребление с нуля- latest
Когда в каждом разделе есть отправленное смещение, начать потребление с отправленного смещения; когда нет отправленного смещения, потреблять вновь сгенерированные данные в разделе- none
Когда в каждой партиции топика есть зафиксированное смещение, потребление начинается после смещения; пока в одной партиции нет зафиксированного смещения, выбрасывается исключение
Процесс тестирования:
-
самый ранний режим: имя источника кафки - a1
-
В a1 тема test1, groupId 0001, 0001 ни разу не использовалась, данные (24 штуки) отправляются заранее, а затем запускается sql1 (выбрать * из a1), он начнет потребление с нуля и отобразит 24 штуки данные
-
Остановите sql1, упомянутый в 1, отправьте разные 6 частей данных в kafka, не меняйте groupId a1, а затем запустите sql1 (выберите * из a1), он начнет потреблять с последней позиции потребления и отобразит 6 статьи данные
-
-
последний режим: имя источника кафки - a2
-
В a2 тема b, groupId 0002, 0002 не использовано, данные отправляются заранее, а затем запускается sql2 (выберите * из a2), на jmeter не видно результатов, а связанные метрики просматриваются во flink, данные не читаются; исходя из того, что sql2 не уничтожается, отправьте пакет (8 частей) данных и используйте только 8 частей данных, отправленных после потребления.
-
Остановите sql2 в 1, не изменяйте groupId в a2, отправьте 7 фрагментов данных в b, запустите sql2, отобразите только 7 фрагментов данных, отправленных позже
-
-
нет режима: имя источника кафки - a3
-
В а3 тема с, groupId установлен в 0001 (неиспользованный), данные отправляются заранее, а затем запускается sql3 (выбрать * из а3), выполнение sql не выполняется, а в логе сообщается об ошибке .
-
В a3 тема c, установите для groupId значение 0002 (использовано), запустите sql3 (выберите * из a3), отправьте 8 фрагментов данных в c и отобразите 8 фрагментов данных в jmeter.
-