Общие идеи по устранению неполадок, связанных с тайм-аутом контрольной точки Flink

Flink

В этой статье в общих чертах изложены идеи по устранению неполадок, связанных с тайм-аутом контрольной точки: (Эта статья основана на flink-1.4.2)

логика оценки тайм-аута

время менеджера по работеtrigger checkpoint, отправить сигнал триггера источнику и одновременно запустить асинхронный поток.checkpoint timeoutОстановить раунд по истечении времени контрольная точка, после выполнения действия отмены контрольная точка этого раунда будет тайм-аут.Если последний оператор стока получен до таймаутаackсигнал, то контрольная точка прошла успешно.

Так в чем может быть причина тайм-аута?Время тратится либо на выравнивание барьера, либо на асинхронный обход состояния и запись в hdfsВторой тип будет хорошо смотреться, потому что это происходит, когда состояние большое. Феномен, последующий анализ причин возникновения первой категории

Поток обработки барьера

StreamTaskсобрал соответствующиеinputChannelБарьер, после завершения сбора, будет выдан барьер, и будет запущена логика контрольной точки собственной задачи. В форме переадресации нисходящему потоку нужно ждать только одного параллельного барьера, потому что онpoint-to-pointДа, если это хеширование или ребалансировка, каждая нижестоящая задача начинает контрольную точку Предпосылка состоит в том, чтобы собрать все параллельные барьеры вверх по течению.

Процесс отправки барьера

RecordWriter#broadcastEvent

Этот метод специально используется для распределения барьеров.Сначала будут распределены буферы, которые не были распределены в сериализаторе, которые были записаны в данные.Предотвратите переопределение барьером поведения доставки данных и обеспечьте согласованность, При этом барьер будет упакован в буферный объект, при этом память вне кучи не будет применяться, а память в куче будет напрямую упакована в буфер для доставки.

PipelinedSubpartition#add

Добавьте данные, которые будут отправлены вниз по течениюsubpartition, уведомляя слушателяsubpartitionView,notifybufferAvailableсобытие здесьnotifyоперация будет дифференцировать Локальные и удаленные каналы обрабатываются по-разному.То, что мы видим, является проблемой задержки удаленного потребления.На самом деле, локальная область немедленно выполняет действие по сбору нисходящего барьера, а удаленный требует процесса сетевой передачи.

server netty handler

LengthFieldBasedFrameDecoder => messageDecode => PartitionRequestServerHandler => PartitionRequestQueue => messageEncoder notifybufferAvailableсобытие в конечном итоге сработаетPartitionRequestQueueконвертировать данныеwriteAndFlushВ клиенте netty канал будет оцениваться перед промывкой Независимо от того, доступен ли он для записи или нет, после успешного сброса будет выполнена соответствующая логика обработки слушателя -> барьера.Здесь вы можете взломать код, чтобы он перезвонил, когда барьер отправляется, чтобы напечатать успешную передачу барьера . Заставляет нас подтвердить, что барьер был успешно отправлен из восходящего потока.

процесс получения барри

LengthFieldBasedFrameDecoder => messageDecode => PartitionRequestClientHandler => messageEncoderВо-первых, при чтении сообщения будет судить о наличии накопленияstagedMessages, если обработки нет, добавить в сообщение о накоплении, если нет копировать данные из нетти-буфера в локалбуфер , в это время требуется requestBuffer.Этот метод не блокирует процесс потребления, но если запрос не может быть буферизован, данные будут выброшены вstagedMessages, прослушивая буферный пул, Когда у буфера есть перезапись, начнется преобразование буфера, и это время установить флаг автоматического чтения канала в false, чтобы этот канал больше не считывал данные, барьер Это также нечитаемо, и каждый диспетчер задач разделяет канал, поэтому, пока диспетчер задач заблокирован, это повлияет на потребление этого диспетчера задач.

резюме

Как видно из вышеизложенного, по сути, основная причина невозможности выравнивания нижестоящего барьера заключается в том, что мощности потребления ниже по течению недостаточно, что приведет к накоплению буферов на какой-то период времени, но это не достаточно, чтобы вызвать обратное давление вверх по потоку, потому что обратное давление Требуется, чтобы нисходящий канал не мог непрерывно записываться, вызывая блокировку tcp и вызывая переполнение выходного буфера восходящего потока, что вызывает противодавление.

Общий метод исследования

Просмотрите сведения о неудачной контрольной точке в пользовательском интерфейсе, вы увидите, что ошибкаPending -> EventEmit xxx10 подзадач этого оператора.

checkpoint

Как правило, подзадачи, которые не запускаются из-за выравнивания барьера, имеют время запуска.n/a, потому что в отчетностиCheckpointStatsКогда чекпоинт вообще не запускается, обратите внимание на 11, 12 здесь нетsubtaskIndex, не заблуждайтесь здесь, чтобы посмотреть на проблему этой подзадачи.

checkpoint

Затем перейдите в менеджер заданий, чтобы просмотреть информацию о задержке этой контрольной точки.

2018-11-29 18:43:04,624 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Checkpoint 224 expired before completing.
2018-11-29 18:43:26,763 WARN  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Received late message for now expired checkpoint attempt 224 from f2862289958b430bc3dc20f39794ca2c of job 7c7769847d333438dd9ce845d5a2d980.
2018-11-29 18:43:26,766 WARN  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Received late message for now expired checkpoint attempt 224 from 8f569166274106f22e49ed2ce919c930 of job 7c7769847d333438dd9ce845d5a2d980.
2018-11-29 18:43:26,770 WARN  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Received late message for now expired checkpoint attempt 224 from a29e34c210b39104004af7f067c1a5d0 of job 7c7769847d333438dd9ce845d5a2d980.
2018-11-29 18:43:26,771 WARN  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Received late message for now expired checkpoint attempt 224 from 7d4914521fd53fca56a4050d6f191ae9 of job 7c7769847d333438dd9ce845d5a2d980.
2018-11-29 18:43:26,771 WARN  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Received late message for now expired checkpoint attempt 224 from 618c78d0008d0d525728ff9824339229 of job 7c7769847d333438dd9ce845d5a2d980.
2018-11-29 18:43:26,773 WARN  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Received late message for now expired checkpoint attempt 224 from c8ba24a328234dc7f2f271db4a8eb1e3 of job 7c7769847d333438dd9ce845d5a2d980.
2018-11-29 18:43:26,777 WARN  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Received late message for now expired checkpoint attempt 224 from 72af6c722fcc085dc8f7c46e9124d82e of job 7c7769847d333438dd9ce845d5a2d980.
2018-11-29 18:43:26,777 WARN  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Received late message for now expired checkpoint attempt 224 from f824cb6920b04d19e05278ee362ec675 of job 7c7769847d333438dd9ce845d5a2d980.
2018-11-29 18:43:26,780 WARN  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Received late message for now expired checkpoint attempt 224 from af6d867d2f12be23c7b23a938aba7c5e of job 7c7769847d333438dd9ce845d5a2d980.
2018-11-29 18:43:27,265 WARN  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Received late message for now expired checkpoint attempt 224 from cee0205fe9a85e3e89e023a1166ed1e6 of job 7c7769847d333438dd9ce845d5a2d980.
2018-11-29 18:43:44,624 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 225 @ 1543488224622

Вы можете запросить, на какой диспетчер задач попадают эти задачи в соответствии с идентификатором этих невыполненных задач.После расследования выясняется, что это одна и та же машина.Через пользовательский интерфейс мы видим, что приток данных с этой машины явно больше, чем других притоков. Таким образом, эта проблема вызвана искаженными данными, прослеживаемостью или недостаточной пропускной способностью нисходящего потока.

показатель

Есть несколько индикаторов, которые могут отражать работу оператора.

  • inPoolUsage
  • OutPoolUsage
  • OutputQueueLength
  • inputQueueLength

Сначала первые 3 значенияразблокированполучены неточные данные,Для производительности самого потребления данных, ссылка не имеет смысла. И inputQueueLength можно использовать для справки, почему это заблокировано, может быть Сообщество отсутствует. . Максимальное значение должно быть (если раздел хешированный, другие методы раздела будут меньше)上游并发 * 2 + 8 + 上游并发, если он достигает этого значения или около того, отправить барьер в это время Он не сможет десериализовать и выполнить правильную операцию контрольной точки ниже по течению.Что касается того, почему последний上游并发Он открывается отдельно, потому что количество барьеров, представленных этим значением, также будет учитываться количество барьеров. существуетinputQueueLengthВнутри.

В оптимизированной версии сетевого стека @zhijiangW после flink1.5 сказано, что checkpointBarrier может пропустить данные для отправки первыми.Необходимо подтвердить, как обеспечить ровно один раз семантику обработки в этом случае?