Недавно запущенная на проекте программа Spark для потоковых вычислений внезапно остановилась с ошибкой и не смогла перезапуститься по другим причинам, на выяснение этого ушел день, что едва не привело к катастрофе.
нерешенная головоломка
Проверьте журнал искр и обнаружите, что ошибка выглядит следующим образом:
java.lang.IllegalArgumentException: requirement failed: numRecords must not be negative
Эта ошибка в основном связана с тем, что смещение потребления Kafka больше, чем максимальное смещение производства, что приводит к отрицательному результату при расчете количества numRecords (numRecords=untilOffset - fromOffset) в этом пакете. Я проверил много информации, и все они говорили, что проблема возникла при удалении исходной темы и восстановлении этой же темы.Ранее у нас не было связанных операций на kafka. К сожалению, воспроизвести эту проблему в настоящее время невозможно, и конкретная причина пока не найдена.Если у кого-то есть какие-либо идеи о подозрительном направлении, пожалуйста, дайте мне знать в комментариях.
В заголовок
новая ошибка
Причину ошибки и выключения пока найти не удается, но сначала программу нужно запустить заново, иначе заказчику придется убить дверь. Но после того, как программа SparkStreaming снова запустилась, она остановилась с ошибкой. Все еще отличается от предыдущей ошибки
org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions: {kafkaTest3-0=4406}
at org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:970)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:490)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1259)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1187)
...
отслеживать исходный код
Получается, что смещение выходит за пределы. Следуйте исходному коду, чтобы увидеть, где получено смещение.
Наконец, в методе #Fetcher.prepareFetchRequests() можно увидеть смещение отsubscriptions
получить из переменной
long position = this.subscriptions.position(partition);
а такжеsubscriptions
Данные в переменной передаются в методе #ConsumerCoordinator.refreshCommittedOffsetsIfNeeded().this.subscriptions.seek(tp, offset)
установить в
public boolean refreshCommittedOffsetsIfNeeded(final long timeoutMs) {
final Set<TopicPartition> missingFetchPositions = subscriptions.missingFetchPositions();
final Map<TopicPartition, OffsetAndMetadata> offsets = fetchCommittedOffsets(missingFetchPositions, timeoutMs);
if (offsets == null) return false;
for (final Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {
final TopicPartition tp = entry.getKey();
final long offset = entry.getValue().offset();
log.debug("Setting offset for partition {} to the committed offset {}", tp, offset);
this.subscriptions.seek(tp, offset);
}
return true;
}
следовать заfetchCommittedOffsets(missingFetchPositions, timeoutMs)
метод
//获取一组分区的提交偏移量。
future = sendOffsetFetchRequest(partitions);
client.poll(future, remainingTimeAtLeastZero(timeoutMs, elapsedTime));
if (future.succeeded()) {
return future.value();
}
Итак, последнее — это смещение потребления текущего раздела, полученного из kafka.
узнать почему
Причина ошибки также неизбежна, потому что время хранения сообщений Kafka в проекте установлено на 1 день, и потребовалось более одного дня для перезапуска SparkStreaming после остановки ошибки, поэтому сообщение, соответствующее смещению потребления потерян, и соответствующее смещение в настоящее время больше не находится в диапазоне смещений, принадлежащем Kafka.
Решение
Зная запись причины, следующим шагом будет изменение записи смещения потребления в kafka.
Есть два способа:
-
Непосредственно устанавливается на смещение самого раннего (самого раннего) текущего раздела
-
Маловероятно, что первый метод приведет к истечению срока действия данных во время перезапуска SparkStreaming после установки нового смещения. Таким образом, вы можете запросить журнал данных производителя Kafka и определить смещение через следующий индексный файл. Или снова выполните первый метод.
//查询该组下所有topic的offset信息
./kafka-consumer-groups.sh --bootstrap-server 192.168.19.128:9092 --describe --group spark-kafka
//修改某个topic某个组下的消费offset为当前生产者earliest-offset
./kafka-consumer-groups.sh --bootstrap-server 192.168.19.128:9092 --group spark-kafka --topic kafkaTest3 --execute --reset-offsets --to-earliest
//修改某个topic某个组下的消费offset为1500(指定offset)
./kafka-consumer-groups.sh --bootstrap-server 192.168.19.128:9092 --group spark-kafka --topic kafkaTest3 --execute --reset-offsets --to-offset 1500
//查询某topic下各个分区当前最大的消息位移值(注意,这里的位移不是consumer端的位移,而是指消息在每个分区的位置)
./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 192.168.19.128:9092 --topic kafkaTest3 --time -1
//表示去获取当前各个分区的最小位移(earliest)。把运行上一条命令的结果与这条命令的结果相减就是集群中该topic的当前消息总数。
./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 192.168.19.128:9092 --topic kafkaTest3 --time -2