SparkStreaming читает Kafka и сообщает OffsetOutOfRangeException

Kafka

Недавно запущенная на проекте программа Spark для потоковых вычислений внезапно остановилась с ошибкой и не смогла перезапуститься по другим причинам, на выяснение этого ушел день, что едва не привело к катастрофе.

нерешенная головоломка

Проверьте журнал искр и обнаружите, что ошибка выглядит следующим образом:

 java.lang.IllegalArgumentException: requirement failed: numRecords must not be negative

Эта ошибка в основном связана с тем, что смещение потребления Kafka больше, чем максимальное смещение производства, что приводит к отрицательному результату при расчете количества numRecords (numRecords=untilOffset - fromOffset) в этом пакете. Я проверил много информации, и все они говорили, что проблема возникла при удалении исходной темы и восстановлении этой же темы.Ранее у нас не было связанных операций на kafka. К сожалению, воспроизвести эту проблему в настоящее время невозможно, и конкретная причина пока не найдена.Если у кого-то есть какие-либо идеи о подозрительном направлении, пожалуйста, дайте мне знать в комментариях.

В заголовок

новая ошибка

Причину ошибки и выключения пока найти не удается, но сначала программу нужно запустить заново, иначе заказчику придется убить дверь. Но после того, как программа SparkStreaming снова запустилась, она остановилась с ошибкой. Все еще отличается от предыдущей ошибки

org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions: {kafkaTest3-0=4406}
    at org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:970)
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:490)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1259)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1187)
    ...

отслеживать исходный код

Получается, что смещение выходит за пределы. Следуйте исходному коду, чтобы увидеть, где получено смещение.
Наконец, в методе #Fetcher.prepareFetchRequests() можно увидеть смещение отsubscriptionsполучить из переменной

long position = this.subscriptions.position(partition);

а такжеsubscriptionsДанные в переменной передаются в методе #ConsumerCoordinator.refreshCommittedOffsetsIfNeeded().this.subscriptions.seek(tp, offset)установить в

public boolean refreshCommittedOffsetsIfNeeded(final long timeoutMs) {
        final Set<TopicPartition> missingFetchPositions = subscriptions.missingFetchPositions();

        final Map<TopicPartition, OffsetAndMetadata> offsets = fetchCommittedOffsets(missingFetchPositions, timeoutMs);
        if (offsets == null) return false;

        for (final Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {
            final TopicPartition tp = entry.getKey();
            final long offset = entry.getValue().offset();
            log.debug("Setting offset for partition {} to the committed offset {}", tp, offset);
            this.subscriptions.seek(tp, offset);
        }
        return true;
    }

следовать заfetchCommittedOffsets(missingFetchPositions, timeoutMs)метод

//获取一组分区的提交偏移量。
future = sendOffsetFetchRequest(partitions);
client.poll(future, remainingTimeAtLeastZero(timeoutMs, elapsedTime));
if (future.succeeded()) {
    return future.value();
}

Итак, последнее — это смещение потребления текущего раздела, полученного из kafka.

узнать почему

Причина ошибки также неизбежна, потому что время хранения сообщений Kafka в проекте установлено на 1 день, и потребовалось более одного дня для перезапуска SparkStreaming после остановки ошибки, поэтому сообщение, соответствующее смещению потребления потерян, и соответствующее смещение в настоящее время больше не находится в диапазоне смещений, принадлежащем Kafka.

Решение

Зная запись причины, следующим шагом будет изменение записи смещения потребления в kafka.

Общие команды Кафки

Есть два способа:

  1. Непосредственно устанавливается на смещение самого раннего (самого раннего) текущего раздела

  2. Маловероятно, что первый метод приведет к истечению срока действия данных во время перезапуска SparkStreaming после установки нового смещения. Таким образом, вы можете запросить журнал данных производителя Kafka и определить смещение через следующий индексный файл. Или снова выполните первый метод.

//查询该组下所有topic的offset信息
./kafka-consumer-groups.sh  --bootstrap-server 192.168.19.128:9092 --describe --group spark-kafka

//修改某个topic某个组下的消费offset为当前生产者earliest-offset
./kafka-consumer-groups.sh --bootstrap-server 192.168.19.128:9092 --group spark-kafka --topic kafkaTest3 --execute --reset-offsets --to-earliest

//修改某个topic某个组下的消费offset为1500(指定offset)
./kafka-consumer-groups.sh --bootstrap-server 192.168.19.128:9092 --group spark-kafka --topic kafkaTest3 --execute --reset-offsets --to-offset 1500

//查询某topic下各个分区当前最大的消息位移值(注意,这里的位移不是consumer端的位移,而是指消息在每个分区的位置)
./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 192.168.19.128:9092 --topic kafkaTest3 --time -1

//表示去获取当前各个分区的最小位移(earliest)。把运行上一条命令的结果与这条命令的结果相减就是集群中该topic的当前消息总数。
./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 192.168.19.128:9092 --topic kafkaTest3 --time -2