В этой статье в общих чертах изложены идеи по устранению неполадок, связанных с тайм-аутом контрольной точки: (Эта статья основана на flink-1.4.2)
логика оценки тайм-аута
время менеджера по работеtrigger checkpoint
, отправить сигнал триггера источнику и одновременно запустить асинхронный поток.checkpoint timeout
Остановить раунд по истечении времени
контрольная точка, после выполнения действия отмены контрольная точка этого раунда будет тайм-аут.Если последний оператор стока получен до таймаутаack
сигнал, то контрольная точка прошла успешно.
Так в чем может быть причина тайм-аута?Время тратится либо на выравнивание барьера, либо на асинхронный обход состояния и запись в hdfsВторой тип будет хорошо смотреться, потому что это происходит, когда состояние большое. Феномен, последующий анализ причин возникновения первой категории
Поток обработки барьера
StreamTask
собрал соответствующиеinputChannel
Барьер, после завершения сбора, будет выдан барьер, и будет запущена логика контрольной точки собственной задачи.
В форме переадресации нисходящему потоку нужно ждать только одного параллельного барьера, потому что онpoint-to-point
Да, если это хеширование или ребалансировка, каждая нижестоящая задача начинает контрольную точку
Предпосылка состоит в том, чтобы собрать все параллельные барьеры вверх по течению.
Процесс отправки барьера
RecordWriter#broadcastEvent
Этот метод специально используется для распределения барьеров.Сначала будут распределены буферы, которые не были распределены в сериализаторе, которые были записаны в данные.Предотвратите переопределение барьером поведения доставки данных и обеспечьте согласованность, При этом барьер будет упакован в буферный объект, при этом память вне кучи не будет применяться, а память в куче будет напрямую упакована в буфер для доставки.
PipelinedSubpartition#add
Добавьте данные, которые будут отправлены вниз по течениюsubpartition
, уведомляя слушателяsubpartitionView
,notifybufferAvailable
событие здесьnotify
операция будет дифференцировать
Локальные и удаленные каналы обрабатываются по-разному.То, что мы видим, является проблемой задержки удаленного потребления.На самом деле, локальная область немедленно выполняет действие по сбору нисходящего барьера, а удаленный требует процесса сетевой передачи.
server netty handler
LengthFieldBasedFrameDecoder => messageDecode => PartitionRequestServerHandler => PartitionRequestQueue => messageEncoder
notifybufferAvailable
событие в конечном итоге сработаетPartitionRequestQueue
конвертировать данныеwriteAndFlush
В клиенте netty канал будет оцениваться перед промывкой
Независимо от того, доступен ли он для записи или нет, после успешного сброса будет выполнена соответствующая логика обработки слушателя -> барьера.Здесь вы можете взломать код, чтобы он перезвонил, когда барьер отправляется, чтобы напечатать успешную передачу барьера .
Заставляет нас подтвердить, что барьер был успешно отправлен из восходящего потока.
процесс получения барри
LengthFieldBasedFrameDecoder => messageDecode => PartitionRequestClientHandler => messageEncoder
Во-первых, при чтении сообщения будет судить о наличии накопленияstagedMessages
, если обработки нет, добавить в сообщение о накоплении, если нет копировать данные из нетти-буфера в локалбуфер
, в это время требуется requestBuffer.Этот метод не блокирует процесс потребления, но если запрос не может быть буферизован, данные будут выброшены вstagedMessages
, прослушивая буферный пул,
Когда у буфера есть перезапись, начнется преобразование буфера, и это время установить флаг автоматического чтения канала в false, чтобы этот канал больше не считывал данные, барьер
Это также нечитаемо, и каждый диспетчер задач разделяет канал, поэтому, пока диспетчер задач заблокирован, это повлияет на потребление этого диспетчера задач.
резюме
Как видно из вышеизложенного, по сути, основная причина невозможности выравнивания нижестоящего барьера заключается в том, что мощности потребления ниже по течению недостаточно, что приведет к накоплению буферов на какой-то период времени, но это не достаточно, чтобы вызвать обратное давление вверх по потоку, потому что обратное давление Требуется, чтобы нисходящий канал не мог непрерывно записываться, вызывая блокировку tcp и вызывая переполнение выходного буфера восходящего потока, что вызывает противодавление.
Общий метод исследования
Просмотрите сведения о неудачной контрольной точке в пользовательском интерфейсе, вы увидите, что ошибкаPending -> EventEmit xxx
10 подзадач этого оператора.
Как правило, подзадачи, которые не запускаются из-за выравнивания барьера, имеют время запуска.n/a
, потому что в отчетностиCheckpointStats
Когда чекпоинт вообще не запускается, обратите внимание на 11, 12 здесь
нетsubtaskIndex
, не заблуждайтесь здесь, чтобы посмотреть на проблему этой подзадачи.
Затем перейдите в менеджер заданий, чтобы просмотреть информацию о задержке этой контрольной точки.
2018-11-29 18:43:04,624 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 224 expired before completing.
2018-11-29 18:43:26,763 WARN org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received late message for now expired checkpoint attempt 224 from f2862289958b430bc3dc20f39794ca2c of job 7c7769847d333438dd9ce845d5a2d980.
2018-11-29 18:43:26,766 WARN org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received late message for now expired checkpoint attempt 224 from 8f569166274106f22e49ed2ce919c930 of job 7c7769847d333438dd9ce845d5a2d980.
2018-11-29 18:43:26,770 WARN org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received late message for now expired checkpoint attempt 224 from a29e34c210b39104004af7f067c1a5d0 of job 7c7769847d333438dd9ce845d5a2d980.
2018-11-29 18:43:26,771 WARN org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received late message for now expired checkpoint attempt 224 from 7d4914521fd53fca56a4050d6f191ae9 of job 7c7769847d333438dd9ce845d5a2d980.
2018-11-29 18:43:26,771 WARN org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received late message for now expired checkpoint attempt 224 from 618c78d0008d0d525728ff9824339229 of job 7c7769847d333438dd9ce845d5a2d980.
2018-11-29 18:43:26,773 WARN org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received late message for now expired checkpoint attempt 224 from c8ba24a328234dc7f2f271db4a8eb1e3 of job 7c7769847d333438dd9ce845d5a2d980.
2018-11-29 18:43:26,777 WARN org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received late message for now expired checkpoint attempt 224 from 72af6c722fcc085dc8f7c46e9124d82e of job 7c7769847d333438dd9ce845d5a2d980.
2018-11-29 18:43:26,777 WARN org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received late message for now expired checkpoint attempt 224 from f824cb6920b04d19e05278ee362ec675 of job 7c7769847d333438dd9ce845d5a2d980.
2018-11-29 18:43:26,780 WARN org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received late message for now expired checkpoint attempt 224 from af6d867d2f12be23c7b23a938aba7c5e of job 7c7769847d333438dd9ce845d5a2d980.
2018-11-29 18:43:27,265 WARN org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received late message for now expired checkpoint attempt 224 from cee0205fe9a85e3e89e023a1166ed1e6 of job 7c7769847d333438dd9ce845d5a2d980.
2018-11-29 18:43:44,624 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 225 @ 1543488224622
Вы можете запросить, на какой диспетчер задач попадают эти задачи в соответствии с идентификатором этих невыполненных задач.После расследования выясняется, что это одна и та же машина.Через пользовательский интерфейс мы видим, что приток данных с этой машины явно больше, чем других притоков. Таким образом, эта проблема вызвана искаженными данными, прослеживаемостью или недостаточной пропускной способностью нисходящего потока.
показатель
Есть несколько индикаторов, которые могут отражать работу оператора.
- inPoolUsage
- OutPoolUsage
- OutputQueueLength
- inputQueueLength
Сначала первые 3 значенияразблокированполучены неточные данные,Для производительности самого потребления данных, ссылка не имеет смысла. И inputQueueLength можно использовать для справки, почему это заблокировано, может быть
Сообщество отсутствует. . Максимальное значение должно быть (если раздел хешированный, другие методы раздела будут меньше)上游并发 * 2 + 8 + 上游并发
, если он достигает этого значения или около того, отправить барьер в это время
Он не сможет десериализовать и выполнить правильную операцию контрольной точки ниже по течению.Что касается того, почему последний上游并发
Он открывается отдельно, потому что количество барьеров, представленных этим значением, также будет учитываться количество барьеров.
существуетinputQueueLength
Внутри.
В оптимизированной версии сетевого стека @zhijiangW после flink1.5 сказано, что checkpointBarrier может пропустить данные для отправки первыми.Необходимо подтвердить, как обеспечить ровно один раз семантику обработки в этом случае?