Flink обратное давление

Java Flink
Flink обратное давление

предисловие

Поиск в WeChat【Java3y] Подписывайтесь на этого незатейливого мужика, ставьте лайки и подписывайтесь - это самая большая поддержка для меня!

Текст был включен в мой GitHub:GitHub.com/Zhongf UC очень…, с более чем 300 оригинальными статьями, недавно опубликованнымиИнтервью и проектыряд!

недавно мигрировалFlinkСвязанные проекты также наступили на некоторые ямы в течение периода.checkpointа также反压является одним из них.

Ао Бин слишком хорош.FlinkНет, я могу сделать это только сам.Смотреть на Ао Бин можно только с удовольствием, учиться приемам все равно придется смотреть на три кривых

Обычно у меня нет никакого уровня Ао Бинхей, поэтому, если я возьму что-то простое, я скажу, что не могу.Я черный фанат Ао Бин номер один

сегодня поделитьсяFlinkизcheckpointмеханизм и背压Впринципе, я считаю, что через эту статью все играютFlinkполучить более глубокое пониманиеCheckpointКак это реализовано, и может быть удобнее при настройке соответствующих параметров и использовании.

Предыдущая статья была написанаFlinkВводный урок по , если вы еще этого не знаетеFlinkстуденты могут пойти посмотреть: "Начало работы с Флинком

Напоминание в первом ряду: эта статья основана на Flink 1.7.

"Мелкий вход и выходИзучите знания Flink о противодавлении"

закуска

объяснениеFlinkизcheckPointа также背压Прежде чем механизм, давайте посмотрим наcheckpointа также背压Соответствующие основы полезны для последующего понимания.

Как пользователи, мы пишемFlinkПрограмма представлена ​​на платформе управления,FlinkОн работает (пока нет проблем с программным кодом), а подробности скрыты от пользователя.

На самом деле общий процесс выглядит следующим образом:

  1. FlinkСогласно коду, который мы написали, он сгенерируетStreamGraphДиаграмма выходит, чтобы представить топологию программы, которую мы написали.
  2. затем перед совершениемStreamGraphэта картинкаоптимизацияГорстка (задачи, которые можно объединить, чтобы объединиться), статьJobGraph
  3. будетJobGraphподчинятьсяJobManager
  4. JobManagerпосле полученияJobGraphпозже согласноJobGraphгенерироватьExecutionGraph (ExecutionGraphдаJobGraphраспараллеленная версия)
  5. TaskManagerПосле получения задания онExecutionGraph генерируется как реальный物理执行图

можно увидеть物理执行图действительно бежатьTaskManagerначальствоTransformа такжеSinkмеждуResultPartitionа такжеInputGateЭти два компонента,ResultPartitionиспользуется для отправки данных иInputGateиспользуется для получения данных.

заблокировать этиGraph, его можно найтиFlinkСхема:Client->JobManager->TaskManager

Как видно из названия,JobManagerэто "управление", иTaskManagerдействительно работает. Вернемся к нашей сегодняшней теме,checkpointкJobManagerпроблема.

а такжеFlinkсам по себесостояниеиз,Flinkпозволяет вам выбиратьданные во время выполненияГде сохраняться, на данный момент есть три места, вFlinkугол называетсяState Backends:

  • MemoryStateBackend(ОЗУ)
  • FsStateBackend(файловая система, обычно HSFS)
  • RocksDBStateBackend(база данных RocksDB)

такой же,checkpointИнформация также хранится вState Backendsначальство

крысиное дерьмо

недавно вStormмигрироватьFlinkКогда я сталкиваюсь с проблемой, позвольте мне кратко описать предысторию.

Мы очищаем данные из различных источников данных, с помощьюFlinkпочистили, собрали в широкую модель и, наконец, передалиkylinДелайте статистику данных практически в реальном времени и отображайте для просмотра операций в реальном времени.

В процессе миграции обнаруживается, чтоtopicПотребление было задержано в течение длительного времени, и первоначальное подозрение связано с тем, что заказ находится выше по течению.并发度Недостаточно повлиять, поэтому я настроил параллелизм на обоих концах и переиздал.

В процессе публикации, после того, как система заработает, посмотрите на нее еще раз.topicМониторинг задержек потребления сбивает с толку. Какие? Почему так долго? Это вовсе не означало опускаться.

В это время я могу только просить помощи у великих богов в группе.После расследования он сказал: ЭтоcheckpointЯ не делал это все время, это все заблокировано, и оно будет выпущено только в последний раз, когда оно будет переиздано.checkpointначать, потому чтоcheckpointОна давно не дорабатывалась, поэтому объем переопубликованных данных будет очень большим. Нет хорошего способа сделать это, его можно только выкинуть под этой заблокированной ссылкой, по оценкам, есть проблема с бизнес-логикой.

Голос за кадром: Получив данные заказа, перейдем кОтслеживаемостьНажмите, чтобы определить, из какой компании поступил заказ, через какую компанию он прошел и какая компания в конечном итоге привела к выполнению заказа.

Голос за кадром: Когда внешнее используется,полагатьсяДанные "Результат заказа HBase"

Мы считаем, что данные по щелчку могут быть медленнее, чем обработка данных заказа, поэтому данные, которые не могут быть найдены, будут опрашиваться через определенные промежутки времени, и посколькуFlinkпоставкаState«статус» иcheckpointмеханизм, мы помещаем данные, которые не могут быть найдены вListStateПросто опрос в определенное время (даже если система зависнет из-за перезагрузки или других причин, данные не потеряются).

Теоретически, пока нет проблем, эта схема осуществима. Но теперь результат говорит нам: после того, как приходит отчет с данными о заказе,небольшой пакет данныхЕсли данные не найдены в «HBase результатов заказа», они помещаются вListState, а затем пройти часть данныхListState. Последствия:

  • Потребление данных не может прийти, формаобратное давление
  • checkpointникогда не удавалось

Способ справиться с этим в то время заключался в том, чтобы очистить ListState, временно отбросить эту часть данных и позволить данным идти в ногу с прогрессом.

Позже, после расследования, было обнаружено, что апстрим «манипулировал» полем сообщения, и сбой парсинга привел к потере кликов, что привело к этой цепочке последствий.

Ключом к устранению неполадок является пониманиеFlinkиз反压а такжеcheckpointВ чем принцип этого, позвольте мне объяснить это ниже.

обратное давление

обратное давлениеbackpressureЭто очень распространенная проблема в потоковых вычислениях.Это означает, что узел в конвейере данных становится узким местом, скорость обработки не может угнаться за скоростью отправки данных «вверх по течению», а восходящему потоку необходимо ограничить скорость.

На картинке выше показано состояние минимального противодавления, грубо говоря, это:Нисходящий поток не может с этим справиться, восходящий поток должен замедлиться, он будет заблокирован!

Самое интересное: "Как нисходящий поток уведомил восходящий поток о необходимости отправлять медленнее?"

спередиFlinkОсновы объяснения мы видимResultPartitionотправить данные,InputGateиспользуется для получения данных.

а такжеFlinkвTaskManagerПри чтении и записи данных внутри будетBufferPool(буферный пул) для этогоTaskManagerчитать и писать использовать (аTaskManagerподелиться однимBufferPool), каждый читает и пишетResultPartition/InputGateбудут обращаться за своимиLocalBuffer

В качестве примера взято изображение выше.Если нисходящая обработка не может быть обработана, тоInputGateизLocalBufferОн заполнен? После заполнения,ResultPartitionневозможно ли пойтиInputGateОпубликовано? а такжеResultPartitionЕсли вы не можете отправить его, это его собственноеLocalBufferОн будет заполнен рано или поздно, разве что по этой логике, покаSourceданные не вытягиваются...

Этот процесс подобенInputGate/ResultPartitionоткрывать свои собственныеОграниченная очередь блокировки, так или иначе, "я" не могу справиться с таким количеством, пришлите мне, и я заблокирую его, когда оно будет заполнено, образуя цепную реакцию, чтобы заблокировать источник...

выше только одинTaskManagerслучае противодавления, что многократноTaskManagerШерстяная ткань? (В конце концов, у нас часто бывает несколькоTaskManagerработает у нас)

давайте оглянемся назадFlinkОбщая схема архитектуры потока данных связи:

Наглядно видно из рисунка: для телекоммуникацийNetty, нижний слойTCP Socketбыть реализованным.

Таким образом, с точки зрения макроэкономики несколькоTaskManagerЕще дваBuffer(буфер).

Согласно вышеприведенной идее, до тех пор, покаInputGateизLocalBufferбыть избитым,Netty Bufferрано или поздно он будет заполнен, иSocket BufferОн тоже рано или поздно заполнится (TCP сам имеет управление потоком), а потом обратная связь наResultPartition, данные не могут быть отправлены снова... что приводит к феномену обратного давления на весь канал передачи данных.

Теперь проблема возникает снова,TaskManagerизtaskНо их много, они всеобщийОдинTCP Buffer/Buffer Pool , то только один изtaskЕсть проблема со ссылкой, которая не вызывает весьTaskManagerСледить за катастрофой?

существуетFlink 1.5版本Раньше действительно была такая проблема. пока вFlink 1.5版本затем представилcreditмеханизм.

Сверху мы видимFlinkРеализованное противодавление макроскопически напрямую зависит от каждогоBufferОн заполнен, если он заполнен, он не может писать/читать, вызывая цепную реакцию, покаSourceконец.

а такжеcreditНа самом деле этот механизм можно просто понимать как «мелкозернистое» управление потоком: каждый разInputGateскажуResultPartitionСколько свободного места вы должны получить, пустьResultPartitionГлядя на волосы. еслиInputGateРассказыватьResultPartitionСвободного места нет, поэтомуResultPartitionОн не будет опубликован.

Как это достигается на самом деле? Получите исходный код!

Прежде чем развернуть исходный код, давайте взглянем на следующееДиаграмма физического исполнения: фактическиInPutGateследующийInputChannel,ResultPartitionследующийResultSubpartition(Это отражено в исходном коде).

InputGate (приемный конец обрабатывает противодавление)

Начнем с принимающей стороны.FlinkМетод получения данныхorg.apache.flink.streaming.runtime.io.StreamInputProcessor#processInput

Затем найдите логику для обработки противодавления:

final BufferOrEvent bufferOrEvent = barrierHandler.getNextNonBlocked();

входитьgetNextNonBlocked()метод (выбранBarrierBufferвыполнить):

мы посмотрим прямоnull, см. отфаза инициализацииКак это началось, заходиgetNextBufferOrEvent()

Заходим в метод и видим еще два важных вызова:

requestPartitions();

result = currentChannel.getNextBuffer();

начать сrequestPartitions()Посмотрите на него и обнаружите, что внутри есть слой (отInputChannelполучить следующееsubPartition):

Так что идиrequestSubpartition()(СмотретьRemoteInputChannelпойми)

Здесь это похоже на созданиеClientend, а затем получить данные, отправленные восходящим потоком:

Первый взглядclientДавайте создадим позу в конце, введитеcreatePartitionRequestClient()способ увидеть (мы видимNettyреализация).

Нажмите на два этажа, и мы войдемcreatePartitionRequestClient()метод, вы можете легко найти его, просмотрев комментарии к исходному коду, которые создадутTCPподключить и создатьClientдля нашего использования

мы все еще видимnull, поэтому он находится здесь:

входитьconnect()Смотри как:

Давайте посмотрим на реализацию конкретной логики генерации, поэтому перейдите кgetClientChannelHandlersначальство

Случайно обнаружил, что в исходном коде также есть краткая блок-схема связи, которую мы можем увидеть (хахаха):

Хорошо, давайте посмотримgetClientChannelHandlersМетод, этот метод не длинный, в основном, чтобы судить о следующем поколенииclientОткрыть лиcreditBasedмеханизм:

public ChannelHandler[] getClientChannelHandlers() {
		NetworkClientHandler networkClientHandler =
			creditBasedEnabled ? new CreditBasedPartitionRequestClientHandler() :
				new PartitionRequestClientHandler();
		return new ChannelHandler[] {
			messageEncoder,
			new NettyMessage.NettyMessageDecoder(!creditBasedEnabled),
			networkClientHandler};
	}

Итак, нашnetworkClientHandlerэкземплярCreditBasedPartitionRequestClientHandler

Здесь мы предполагаем на данный моментClientТерминал был сгенерирован, а затемВернуться кgetNextBufferOrEvent()Сюда,requestPartitions()Метод заключается в генерации полученных данныхClientконец, конкретный примерCreditBasedPartitionRequestClientHandler

Далее мы идемgetNextBuffer()Посмотрите, как обрабатываются полученные данные:

После получения данных он начнет выполнять вызов кода нашего пользователя.processметод (здесь мы его рассматривать не будем). или вернутьсяобратное давлениеЛогически, мы, кажется, не видим, где логика обратного давления. Дело в томreceivedBuffersВот кто поставил?

Итак, мы видимClientконкретные примерыCreditBasedPartitionRequestClientHandler, открой список методов и посмотри на него, вроде какChannelRead()Вот так:

	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		try {
			decodeMsg(msg);
		} catch (Throwable t) {
			notifyAllChannelsOfErrorAndClose(t);
		}
	}

следитьdecodeMsgПродолжайте идти вниз:

продолжить вниз кdecodeBufferOrEvent()

продолжить вниз кonBuffer:

Итак, мы идем кonSenderBacklogПроверьте это:

последний звонокnotifyCreditAvailableбудетCreditОтправить вверх по течению:

public void notifyCreditAvailable(final RemoteInputChannel inputChannel) {
		ctx.executor().execute(() -> ctx.pipeline().fireUserEventTriggered(inputChannel));
	}

Наконец, нарисуйте картинку, чтобы понять одну (ключевая ссылка):

ResultPartition (отправитель обрабатывает противодавление)

На стороне отправителя мы начинаем сorg.apache.flink.runtime.taskexecutor.TaskManagerRunner#startTaskManagerначать смотреть

Итак, мы входимfromConfiguration()

входитьstart()смотри, затем вводиconnectionManager.start()(Еще см.Nettyпример):

image-20201206141859451

зайди и посмотриservice.init()Что делает метод (и снова видим знакомый рисунок):

Хорошо, давай еще разgetServerChannelHandlers()посмотри:

Имея описанный выше опыт, мы можем напрямую войти и увидеть его метод, да, это сноваchannnelRead, только в этот разchannelRead0.

хорошо, давай зайдемaddCredit()посмотри:

reader.addCredit(credit)только что обновил номер

public void addCredit(int creditDeltas) {
		numCreditsAvailable += creditDeltas;
	}

Давайте посмотрим на точкуenqueueAvailableReader()метод, в то время какenqueueAvailableReader()Суть в том, чтобы судитьCreditДостаточно ли отправить

isAvailableРеализация тоже очень простая, то есть судитьCreditЯвляется ли оно больше 0 и есть ли реальные данные для отправки

а такжеwriteAndFlushNextMessageIfPossibleФактически, он отправляет данные вниз по течению:

Когда вы получите данные, вы будете судитьCreditДостаточно, недостаточно, чтобы вызвать исключение:

Давайте нарисуем картинку, чтобы понять это просто:

Сводка противодавления

Скорость обработки «вниз по течению» не может угнаться за скоростью отправки «вверх по течению», что снижает скорость обработки, что кажется очень хорошим (в конце концов, это, кажется, помогает нам ограничивать ток).

но вFlink, противодавление плюсCheckponitмеханизм, который может привести кStateСостояние продолжает увеличиваться, а завершение откладываетсяcheckpointСкорость падает даже с таймаутом.

когдаcheckpointПротиводавление усугубляется, когда скорость обработки задерживается (вероятно, обработка большую часть времениcheckpoint).

когдаcheckpointЕсли вы не можете этого сделать, это означает перезапускFlinkПриложение будет завершено с последнего разаcheckpointсделать это снова (...

Вот пример, с которым я действительно столкнулся:

у меня естьFlinkМиссия, я дал ей только однуTaskManagerДля выполнения задачи обнаружено, что при обновлении БД возникнут проблемы с параллелизмом.

только одинTaskManagerПроблема с позиционированием очень проста, я сделал небольшое суждение: я обновил Sink-параллелизм БД, чтобы увеличить.

Если для параллелизма приемника установлено значение 1, проблемы параллелизма определенно нет, но обработка слишком медленная.

Так что я просто основан на РаковинеuserIdпровестиkeyBy(Один и тот же идентификатор пользователя обрабатывается одним и тем же потоком, поэтому проблемы параллелизма нет)

выглядит красиво, ноuserIdЕсть проблема с горячими данными, что приводит к формированию нисходящей обработки данных.反压. первоначально один разcheckpointвыполнять только30~40ms,反压в следующий разcheckpointнужно2min+.

checkpointИнтервал выполнения относительно частый (6s/次),время исполнения2min+, и в итоге данные не могут обрабатываться все время, и скорость потребления всей ссылки меняется от исходной3000qpsк обратному давлению300qps, был заблокирован (проблем с программой нет, то есть скорость обработки сильно снижена, что сказывается на конечном выводе данных).

В конце концов

Я думал об этой статье, чтобы поставить противодавление иCheckpointЯ написал их все вместе, но обнаружил, что это было немного длинно, а потомcheckpointНачнем следующий.

поверьте мне, пока вы используетеFlink, Рано или поздно вы столкнетесь с такой проблемой. Некоторые студенты могут не понять ее сейчас. Это не имеет значения. Поставьте лайк 👍, сохраните и используйте позже.

Использованная литература:

Санвай организовал все [Точки знаний на собеседовании в Дачане], [Шаблон резюме] и [Оригинальную статью] в электронную книгу, содержащую в общей сложности 1263 страницы! нажмите нижеСсылка на сайтПросто возьмите это прямо

Содержимое PDF-документоввсе вручную, если вы ничего не понимаете, вы можете напрямуюспросите меня