предисловие
Поиск в WeChat【Java3y] Подписывайтесь на этого незатейливого мужика, ставьте лайки и подписывайтесь - это самая большая поддержка для меня!
Текст был включен в мой GitHub:GitHub.com/Zhongf UC очень…, с более чем 300 оригинальными статьями, недавно опубликованнымиИнтервью и проектыряд!
недавно мигрировалFlink
Связанные проекты также наступили на некоторые ямы в течение периода.checkpoint
а также反压
является одним из них.
Ао Бин слишком хорош.Flink
Нет, я могу сделать это только сам.Смотреть на Ао Бин можно только с удовольствием, учиться приемам все равно придется смотреть на три кривых
Обычно у меня нет никакого уровня Ао Бинхей, поэтому, если я возьму что-то простое, я скажу, что не могу.Я черный фанат Ао Бин номер один
сегодня поделитьсяFlink
изcheckpoint
механизм и背压
Впринципе, я считаю, что через эту статью все играютFlink
получить более глубокое пониманиеCheckpoint
Как это реализовано, и может быть удобнее при настройке соответствующих параметров и использовании.
Предыдущая статья была написанаFlink
Вводный урок по , если вы еще этого не знаетеFlink
студенты могут пойти посмотреть: "Начало работы с Флинком》
Напоминание в первом ряду: эта статья основана на Flink 1.7.
"Мелкий вход и выходИзучите знания Flink о противодавлении"
закуска
объяснениеFlink
изcheckPoint
а также背压
Прежде чем механизм, давайте посмотрим наcheckpoint
а также背压
Соответствующие основы полезны для последующего понимания.
Как пользователи, мы пишемFlink
Программа представлена на платформе управления,Flink
Он работает (пока нет проблем с программным кодом), а подробности скрыты от пользователя.
На самом деле общий процесс выглядит следующим образом:
-
Flink
Согласно коду, который мы написали, он сгенерируетStreamGraph
Диаграмма выходит, чтобы представить топологию программы, которую мы написали. - затем перед совершением
StreamGraph
эта картинкаоптимизацияГорстка (задачи, которые можно объединить, чтобы объединиться), статьJobGraph
- будет
JobGraph
подчинятьсяJobManager
-
JobManager
после полученияJobGraph
позже согласноJobGraph
генерироватьExecutionGraph
(ExecutionGraph
даJobGraph
распараллеленная версия) -
TaskManager
После получения задания онExecutionGraph
генерируется как реальный物理执行图
можно увидеть物理执行图
действительно бежатьTaskManager
начальствоTransform
а такжеSink
междуResultPartition
а такжеInputGate
Эти два компонента,ResultPartition
используется для отправки данных иInputGate
используется для получения данных.
заблокировать этиGraph
, его можно найтиFlink
Схема:Client
->JobManager
->TaskManager
Как видно из названия,JobManager
это "управление", иTaskManager
действительно работает. Вернемся к нашей сегодняшней теме,checkpoint
кJobManager
проблема.
а такжеFlink
сам по себесостояниеиз,Flink
позволяет вам выбиратьданные во время выполненияГде сохраняться, на данный момент есть три места, вFlink
угол называетсяState Backends
:
- MemoryStateBackend(ОЗУ)
- FsStateBackend(файловая система, обычно HSFS)
- RocksDBStateBackend(база данных RocksDB)
такой же,checkpoint
Информация также хранится вState Backends
начальство
крысиное дерьмо
недавно вStorm
мигрироватьFlink
Когда я сталкиваюсь с проблемой, позвольте мне кратко описать предысторию.
Мы очищаем данные из различных источников данных, с помощьюFlink
почистили, собрали в широкую модель и, наконец, передалиkylin
Делайте статистику данных практически в реальном времени и отображайте для просмотра операций в реальном времени.
В процессе миграции обнаруживается, чтоtopic
Потребление было задержано в течение длительного времени, и первоначальное подозрение связано с тем, что заказ находится выше по течению.并发度
Недостаточно повлиять, поэтому я настроил параллелизм на обоих концах и переиздал.
В процессе публикации, после того, как система заработает, посмотрите на нее еще раз.topic
Мониторинг задержек потребления сбивает с толку. Какие? Почему так долго? Это вовсе не означало опускаться.
В это время я могу только просить помощи у великих богов в группе.После расследования он сказал: Этоcheckpoint
Я не делал это все время, это все заблокировано, и оно будет выпущено только в последний раз, когда оно будет переиздано.checkpoint
начать, потому чтоcheckpoint
Она давно не дорабатывалась, поэтому объем переопубликованных данных будет очень большим. Нет хорошего способа сделать это, его можно только выкинуть под этой заблокированной ссылкой, по оценкам, есть проблема с бизнес-логикой.
Голос за кадром: Получив данные заказа, перейдем кОтслеживаемостьНажмите, чтобы определить, из какой компании поступил заказ, через какую компанию он прошел и какая компания в конечном итоге привела к выполнению заказа.
Голос за кадром: Когда внешнее используется,полагатьсяДанные "Результат заказа HBase"
Мы считаем, что данные по щелчку могут быть медленнее, чем обработка данных заказа, поэтому данные, которые не могут быть найдены, будут опрашиваться через определенные промежутки времени, и посколькуFlink
поставкаState
«статус» иcheckpoint
механизм, мы помещаем данные, которые не могут быть найдены вListState
Просто опрос в определенное время (даже если система зависнет из-за перезагрузки или других причин, данные не потеряются).
Теоретически, пока нет проблем, эта схема осуществима. Но теперь результат говорит нам: после того, как приходит отчет с данными о заказе,небольшой пакет данныхЕсли данные не найдены в «HBase результатов заказа», они помещаются вListState
, а затем пройти часть данныхListState
. Последствия:
- Потребление данных не может прийти, формаобратное давление
-
checkpoint
никогда не удавалось
Способ справиться с этим в то время заключался в том, чтобы очистить ListState, временно отбросить эту часть данных и позволить данным идти в ногу с прогрессом.
Позже, после расследования, было обнаружено, что апстрим «манипулировал» полем сообщения, и сбой парсинга привел к потере кликов, что привело к этой цепочке последствий.
Ключом к устранению неполадок является пониманиеFlink
из反压
а такжеcheckpoint
В чем принцип этого, позвольте мне объяснить это ниже.
обратное давление
обратное давлениеbackpressure
Это очень распространенная проблема в потоковых вычислениях.Это означает, что узел в конвейере данных становится узким местом, скорость обработки не может угнаться за скоростью отправки данных «вверх по течению», а восходящему потоку необходимо ограничить скорость.
На картинке выше показано состояние минимального противодавления, грубо говоря, это:Нисходящий поток не может с этим справиться, восходящий поток должен замедлиться, он будет заблокирован!
Самое интересное: "Как нисходящий поток уведомил восходящий поток о необходимости отправлять медленнее?"
спередиFlink
Основы объяснения мы видимResultPartition
отправить данные,InputGate
используется для получения данных.
а такжеFlink
вTaskManager
При чтении и записи данных внутри будетBufferPool
(буферный пул) для этогоTaskManager
читать и писать использовать (аTaskManager
поделиться однимBufferPool
), каждый читает и пишетResultPartition/InputGate
будут обращаться за своимиLocalBuffer
В качестве примера взято изображение выше.Если нисходящая обработка не может быть обработана, тоInputGate
изLocalBuffer
Он заполнен? После заполнения,ResultPartition
невозможно ли пойтиInputGate
Опубликовано? а такжеResultPartition
Если вы не можете отправить его, это его собственноеLocalBuffer
Он будет заполнен рано или поздно, разве что по этой логике, покаSource
данные не вытягиваются...
Этот процесс подобенInputGate/ResultPartition
открывать свои собственныеОграниченная очередь блокировки, так или иначе, "я" не могу справиться с таким количеством, пришлите мне, и я заблокирую его, когда оно будет заполнено, образуя цепную реакцию, чтобы заблокировать источник...
выше только одинTaskManager
случае противодавления, что многократноTaskManager
Шерстяная ткань? (В конце концов, у нас часто бывает несколькоTaskManager
работает у нас)
давайте оглянемся назадFlink
Общая схема архитектуры потока данных связи:
Наглядно видно из рисунка: для телекоммуникацийNetty
, нижний слойTCP Socket
быть реализованным.
Таким образом, с точки зрения макроэкономики несколькоTaskManager
Еще дваBuffer
(буфер).
Согласно вышеприведенной идее, до тех пор, покаInputGate
изLocalBuffer
быть избитым,Netty Buffer
рано или поздно он будет заполнен, иSocket Buffer
Он тоже рано или поздно заполнится (TCP сам имеет управление потоком), а потом обратная связь наResultPartition
, данные не могут быть отправлены снова... что приводит к феномену обратного давления на весь канал передачи данных.
Теперь проблема возникает снова,TaskManager
изtask
Но их много, они всеобщийОдинTCP Buffer/Buffer Pool
, то только один изtask
Есть проблема со ссылкой, которая не вызывает весьTaskManager
Следить за катастрофой?
существуетFlink 1.5版本
Раньше действительно была такая проблема. пока вFlink 1.5版本
затем представилcredit
механизм.
Сверху мы видимFlink
Реализованное противодавление макроскопически напрямую зависит от каждогоBuffer
Он заполнен, если он заполнен, он не может писать/читать, вызывая цепную реакцию, покаSource
конец.
а такжеcredit
На самом деле этот механизм можно просто понимать как «мелкозернистое» управление потоком: каждый разInputGate
скажуResultPartition
Сколько свободного места вы должны получить, пустьResultPartition
Глядя на волосы. еслиInputGate
РассказыватьResultPartition
Свободного места нет, поэтомуResultPartition
Он не будет опубликован.
Как это достигается на самом деле? Получите исходный код!
Прежде чем развернуть исходный код, давайте взглянем на следующееДиаграмма физического исполнения: фактическиInPutGate
следующийInputChannel
,ResultPartition
следующийResultSubpartition
(Это отражено в исходном коде).
InputGate (приемный конец обрабатывает противодавление)
Начнем с принимающей стороны.Flink
Метод получения данныхorg.apache.flink.streaming.runtime.io.StreamInputProcessor#processInput
Затем найдите логику для обработки противодавления:
final BufferOrEvent bufferOrEvent = barrierHandler.getNextNonBlocked();
входитьgetNextNonBlocked()
метод (выбранBarrierBuffer
выполнить):
мы посмотрим прямоnull
, см. отфаза инициализацииКак это началось, заходиgetNextBufferOrEvent()
Заходим в метод и видим еще два важных вызова:
requestPartitions();
result = currentChannel.getNextBuffer();
начать сrequestPartitions()
Посмотрите на него и обнаружите, что внутри есть слой (отInputChannel
получить следующееsubPartition
):
Так что идиrequestSubpartition()
(СмотретьRemoteInputChannel
пойми)
Здесь это похоже на созданиеClient
end, а затем получить данные, отправленные восходящим потоком:
Первый взглядclient
Давайте создадим позу в конце, введитеcreatePartitionRequestClient()
способ увидеть (мы видимNetty
реализация).
Нажмите на два этажа, и мы войдемcreatePartitionRequestClient()
метод, вы можете легко найти его, просмотрев комментарии к исходному коду, которые создадутTCP
подключить и создатьClient
для нашего использования
мы все еще видимnull
, поэтому он находится здесь:
входитьconnect()
Смотри как:
Давайте посмотрим на реализацию конкретной логики генерации, поэтому перейдите кgetClientChannelHandlers
начальство
Случайно обнаружил, что в исходном коде также есть краткая блок-схема связи, которую мы можем увидеть (хахаха):
Хорошо, давайте посмотримgetClientChannelHandlers
Метод, этот метод не длинный, в основном, чтобы судить о следующем поколенииclient
Открыть лиcreditBased
механизм:
public ChannelHandler[] getClientChannelHandlers() {
NetworkClientHandler networkClientHandler =
creditBasedEnabled ? new CreditBasedPartitionRequestClientHandler() :
new PartitionRequestClientHandler();
return new ChannelHandler[] {
messageEncoder,
new NettyMessage.NettyMessageDecoder(!creditBasedEnabled),
networkClientHandler};
}
Итак, нашnetworkClientHandler
экземплярCreditBasedPartitionRequestClientHandler
Здесь мы предполагаем на данный моментClient
Терминал был сгенерирован, а затемВернуться кgetNextBufferOrEvent()
Сюда,requestPartitions()
Метод заключается в генерации полученных данныхClient
конец, конкретный примерCreditBasedPartitionRequestClientHandler
Далее мы идемgetNextBuffer()
Посмотрите, как обрабатываются полученные данные:
После получения данных он начнет выполнять вызов кода нашего пользователя.process
метод (здесь мы его рассматривать не будем). или вернутьсяобратное давлениеЛогически, мы, кажется, не видим, где логика обратного давления. Дело в томreceivedBuffers
Вот кто поставил?
Итак, мы видимClient
конкретные примерыCreditBasedPartitionRequestClientHandler
, открой список методов и посмотри на него, вроде какChannelRead()
Вот так:
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
decodeMsg(msg);
} catch (Throwable t) {
notifyAllChannelsOfErrorAndClose(t);
}
}
следитьdecodeMsg
Продолжайте идти вниз:
продолжить вниз кdecodeBufferOrEvent()
продолжить вниз кonBuffer
:
Итак, мы идем кonSenderBacklog
Проверьте это:
последний звонокnotifyCreditAvailable
будетCredit
Отправить вверх по течению:
public void notifyCreditAvailable(final RemoteInputChannel inputChannel) {
ctx.executor().execute(() -> ctx.pipeline().fireUserEventTriggered(inputChannel));
}
Наконец, нарисуйте картинку, чтобы понять одну (ключевая ссылка):
ResultPartition (отправитель обрабатывает противодавление)
На стороне отправителя мы начинаем сorg.apache.flink.runtime.taskexecutor.TaskManagerRunner#startTaskManager
начать смотреть
Итак, мы входимfromConfiguration()
входитьstart()
смотри, затем вводиconnectionManager.start()
(Еще см.Netty
пример):
зайди и посмотриservice.init()
Что делает метод (и снова видим знакомый рисунок):
Хорошо, давай еще разgetServerChannelHandlers()
посмотри:
Имея описанный выше опыт, мы можем напрямую войти и увидеть его метод, да, это сноваchannnelRead
, только в этот разchannelRead0
.
хорошо, давай зайдемaddCredit()
посмотри:
reader.addCredit(credit)
только что обновил номер
public void addCredit(int creditDeltas) {
numCreditsAvailable += creditDeltas;
}
Давайте посмотрим на точкуenqueueAvailableReader()
метод, в то время какenqueueAvailableReader()
Суть в том, чтобы судитьCredit
Достаточно ли отправить
isAvailable
Реализация тоже очень простая, то есть судитьCredit
Является ли оно больше 0 и есть ли реальные данные для отправки
а такжеwriteAndFlushNextMessageIfPossible
Фактически, он отправляет данные вниз по течению:
Когда вы получите данные, вы будете судитьCredit
Достаточно, недостаточно, чтобы вызвать исключение:
Давайте нарисуем картинку, чтобы понять это просто:
Сводка противодавления
Скорость обработки «вниз по течению» не может угнаться за скоростью отправки «вверх по течению», что снижает скорость обработки, что кажется очень хорошим (в конце концов, это, кажется, помогает нам ограничивать ток).
но вFlink
, противодавление плюсCheckponit
механизм, который может привести кState
Состояние продолжает увеличиваться, а завершение откладываетсяcheckpoint
Скорость падает даже с таймаутом.
когдаcheckpoint
Противодавление усугубляется, когда скорость обработки задерживается (вероятно, обработка большую часть времениcheckpoint
).
когдаcheckpoint
Если вы не можете этого сделать, это означает перезапускFlink
Приложение будет завершено с последнего разаcheckpoint
сделать это снова (...
Вот пример, с которым я действительно столкнулся:
у меня есть
Flink
Миссия, я дал ей только однуTaskManager
Для выполнения задачи обнаружено, что при обновлении БД возникнут проблемы с параллелизмом.только один
TaskManager
Проблема с позиционированием очень проста, я сделал небольшое суждение: я обновил Sink-параллелизм БД, чтобы увеличить.Если для параллелизма приемника установлено значение 1, проблемы параллелизма определенно нет, но обработка слишком медленная.
Так что я просто основан на Раковине
userId
провестиkeyBy
(Один и тот же идентификатор пользователя обрабатывается одним и тем же потоком, поэтому проблемы параллелизма нет)
выглядит красиво, ноuserId
Есть проблема с горячими данными, что приводит к формированию нисходящей обработки данных.反压
. первоначально один разcheckpoint
выполнять только30~40ms
,反压
в следующий разcheckpoint
нужно2min+
.
checkpoint
Интервал выполнения относительно частый (6s/次
),время исполнения2min+
, и в итоге данные не могут обрабатываться все время, и скорость потребления всей ссылки меняется от исходной3000qps
к обратному давлению300qps
, был заблокирован (проблем с программой нет, то есть скорость обработки сильно снижена, что сказывается на конечном выводе данных).
В конце концов
Я думал об этой статье, чтобы поставить противодавление иCheckpoint
Я написал их все вместе, но обнаружил, что это было немного длинно, а потомcheckpoint
Начнем следующий.
поверьте мне, пока вы используетеFlink
, Рано или поздно вы столкнетесь с такой проблемой. Некоторые студенты могут не понять ее сейчас. Это не имеет значения. Поставьте лайк 👍, сохраните и используйте позже.
Использованная литература:
Санвай организовал все [Точки знаний на собеседовании в Дачане], [Шаблон резюме] и [Оригинальную статью] в электронную книгу, содержащую в общей сложности 1263 страницы! нажмите нижеСсылка на сайтПросто возьмите это прямо
Содержимое PDF-документоввсе вручную, если вы ничего не понимаете, вы можете напрямуюспросите меня