Некоторые из упомянутых выше операторов и функций могут выполнять некоторые временные операции, но не могут получить текущее время обработки или отметку времени водяного знака оператора, который прост в вызове, но относительно ограничен в функциях. Если вы хотите получить отметку времени водяного знака в потоке данных или перемещаться вперед и назад во времени, вам нужно использоватьProcessFunction
Ряд функций, которые представляют собой API-интерфейсы самого низкого уровня в системе Flink, предоставляют более подробные разрешения на операции с потоками данных. Flink SQL реализован на основе этих функций, и некоторые бизнес-сценарии, требующие высокой степени персонализации, также должны использовать эти функции.
В настоящее время эта серия функций в основном включаетKeyedProcessFunction
,ProcessFunction
,CoProcessFunction
,KeyedCoProcessFunction
,ProcessJoinFunction
а такжеProcessWindowFunction
Многие функции, эти функции сосредоточены, но основная функция аналогична, в основном включает два момента:
-
Состояние: мы можем получить доступ и обновить Keyed State в этих функциях.
-
Таймер: Установите таймер, как установку будильника Мы можем разработать более сложную бизнес-логику во временном измерении.
Введение в статус можно сослаться на мою статью:Подробное объяснение управления состоянием Flink,Здесь мы сосредоточимся на использованииProcessFunction
Несколько других особенностей. Весь код в этой статье загружен на мой гитхаб:Github.com/ Дорога положительная/...
Как использовать Таймер
Мы можем понимать Таймер как будильник.Перед использованием зарегистрируйте будущее время в Таймере.Когда это время наступит, будильник "зазвенит", программа выполнит функцию обратного вызова, а в обратном вызове будет выполнена определенная бизнес-логика функция. здесь сKeyedProcessFunction
Например, ввести регистрацию и использование Timer.
ProcessFunction
Есть два важных интерфейсаprocessElement
а такжеonTimer
,вprocessElement
Подпись Java функции в исходном коде выглядит следующим образом:
// 处理数据流中的一条元素
public abstract void processElement(I value, Context ctx, Collector<O> out)
processElement
метод обрабатывает элемент в потоке данных и передаетCollector<O>
выход.Context
заключается в том, что он отличается отFlatMapFunction
Такие функции, как обычные функции, разработчики могут передатьContext
чтобы получить метку времени, посетитеTimerService
, установите Таймер.
Другой интерфейс естьonTimer
:
// 时间到达后的回调函数
public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out)
Это функция обратного вызова, когда наступит время «будильника», Flink позвонитonTimer
и выполнить некоторую бизнес-логику. Здесь также есть параметрOnTimerContext
, который фактически наследует предыдущийContext
,а такжеContext
почти то же самое.
Основная логика метода с использованием Timer такова:
- существует
processElement
через методContext
Зарегистрируйте будущую метку времени t. Семантика этой метки времени может быть временем обработки или временем события, которое выбирается в соответствии с бизнес-требованиями. - существует
onTimer
Реализуйте некоторую логику в методе для достижения времени t,onTimer
метод вызывается автоматически.
отContext
мы можем получитьTimerService
, который представляет собой интерфейс для доступа к меткам времени и таймерам. мы можем пройтиContext.timerService.registerProcessingTimeTimer
или ``Context.timerService.registerEventTimeTimer这两个方法来注册Timer,只需要传入一个时间戳即可。我们可以通过
Context.timerService.deleteProcessingTimeTimer和
Context.timerService.deleteEventTimeTimer来删除之前注册的Timer。此外,还可以从中获取当前的时间戳:
Context.timerService.currentProcessingTime和
Context.timerService.currentWatermark`. Как видно из имени функции, вот функции, которые появляются парами, и два метода соответствуют семантике двух времен соответственно.
Обратите внимание, что мы можем толькоKeyedStream
Зарегистрируйте таймер на . Разные таймеры могут быть зарегистрированы с разными временными метками для каждого ключа, но только один таймер может быть зарегистрирован для каждой временной метки каждого ключа. Если вы хотите быть вDataStream
Применив Timer сверху, все данные могут быть сопоставлены с поддельным ключом, но все данные будут поступать в одну подзадачу оператора.
Давайте еще раз объясним, как использовать таймер в сценарии торговли акциями. Сделка с акциями включает в себя: код акции, метку времени, цену акции и объем торгов. Теперь мы хотим увидеть, росли ли акции непрерывно в течение 10 секунд, и если да, то отправить предупреждение.
case class StockPrice(symbol: String, ts: Long, price: Double, volume: Int)
class IncreaseAlertFunction(intervalMills: Long)
extends KeyedProcessFunction[String, StockPrice, String] {
// 状态:保存某支股票上次交易价格
lazy val lastPrice: ValueState[Double] =
getRuntimeContext.getState(
new ValueStateDescriptor[Double]("lastPrice", Types.of[Double])
)
// 状态:保存某支股票的定时器时间戳
lazy val currentTimer: ValueState[Long] =
getRuntimeContext.getState(
new ValueStateDescriptor[Long]("timer", Types.of[Long])
)
override def processElement(stock: StockPrice,
context: KeyedProcessFunction[String, StockPrice, String]#Context,
out: Collector[String]): Unit = {
// 获取lastPrice状态中的数据,第一次使用时会被初始化为0
val prevPrice = lastPrice.value()
// 更新lastPrice
lastPrice.update(stock.price)
val curTimerTimestamp = currentTimer.value()
if (prevPrice == 0.0) {
// 第一次使用,不做任何处理
} else if (stock.price < prevPrice) {
// 如果新流入的股票价格降低,删除Timer,否则该Timer一直保留
context.timerService().deleteEventTimeTimer(curTimerTimestamp)
currentTimer.clear()
} else if (stock.price >= prevPrice && curTimerTimestamp == 0) {
// 如果新流入的股票价格升高
// curTimerTimestamp为0表示currentTimer状态中是空的,还没有对应的Timer
// 新Timer = 当前时间 + interval
val timerTs = context.timestamp() + intervalMills
val formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")
context.timerService().registerEventTimeTimer(timerTs)
// 更新currentTimer状态,后续数据会读取currentTimer,做相关判断
currentTimer.update(timerTs)
}
}
override def onTimer(ts: Long,
ctx: KeyedProcessFunction[String, StockPrice, String]#OnTimerContext,
out: Collector[String]): Unit = {
val formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")
out.collect("time: " + formatter.format(ts) + ", symbol: '" + ctx.getCurrentKey +
" monotonically increased for " + intervalMills + " millisecond.")
// 清空currentTimer状态
currentTimer.clear()
}
}
В основной логике передайте следующееprocess
вызов оператораKeyedProcessFunction
:
val inputStream: DataStream[StockPrice] = ...
val warnings = inputStream
.keyBy(stock => stock.symbol)
// 调用process函数
.process(new IncreaseAlertFunction(10000))
При контрольной точке таймер также будет сохранен вместе с другими данными о состоянии. Если некоторые таймеры установлены с использованием семантики времени обработки, а отметка времени истекла при перезапуске, эти функции обратного вызова будут вызваны и выполнены немедленно.
Боковой выход Боковой выход
ProcessFunction
Другая основная особенность заключается в том, что часть данных может быть отправлена в другой поток, а типы данных двух потоков могут быть разными. Мы проходимOutputTag[T]
чтобы пометить другой поток данных. существуетProcessFunction
Отфильтруйте данные определенного типа следующим образом:
class IncreaseAlertFunction(intervalMills: Long) extends KeyedProcessFunction[String, Stock, String] {
override def processElement(stock: Stock,
context: KeyedProcessFunction[String, Stock, String]#Context,
out: Collector[String]): Unit = {
// 其他业务逻辑...
// 定义一个OutputTag,Stock为这个SideOutput流的数据类型
val highVolumeOutput: OutputTag[Stock] = new OutputTag[Stock]("high-volume-trade")
if (stock.volume > 1000) {
// 将Stock筛选出来发送到该OutputTag下
context.output(highVolumeOutput, stock)
}
}
}
В основной логике побочный вывод получается следующим методом:
// 收集SideOutput
val outputTag: OutputTag[Stock] = OutputTag[Stock]("high-volume-trade")
val sideOutputStream: DataStream[Stock] = mainStream.getSideOutput(outputTag)
Как видно из этого примера,KeyedProcessFunction
Тип выводаString
, а тип вывода SideOutput —Stock
, они могут быть разными.
использоватьProcessFunction
Реализовать присоединение
Если вы хотите реализовать объединение двух потоков данных с более высокой степенью детализации, вы можете использоватьCoProcessFunction
илиKeyedCoProcessFunction
. Обе функции имеютprocessElement1
а такжеprocessElement2
способ обработки каждого элемента первого потока данных и второго потока данных по отдельности. Типы данных и типы вывода двух потоков данных могут отличаться друг от друга. Хотя данные поступают из двух разных потоков, они могут иметь одно и то же состояние, поэтому вы можете обратиться к следующей логике для реализации соединения:
- Создайте множество состояний, оба потока данных могут получить доступ к этим состояниям, здесь как состояние A в качестве примера.
-
processElement1
Метод обрабатывает первый поток данных, обновляя состояние a. -
processElement2
Метод обрабатывает второй поток данных и на основе данных в состоянии а генерирует соответствующий вывод.
На этот раз мы обсудим два потока данных о цене акций в сочетании с оценкой СМИ Предположим, что существует поток данных оценки СМИ для определенной акции, и этот поток данных содержит положительные и отрицательные оценки акций. Два потока данных текут вместеKeyedCoProcessFunction
,processElement2
Метод обрабатывает входящие данные мультимедиа, обновляя рейтинги мультимедиа до состоянияmediaState
начальство,processElement1
Метод обрабатывает входящие данные о транзакциях с акциями, получает состояние mediaState` и генерирует новый поток данных. Эти два метода обрабатывают два потока данных по отдельности, совместно используют состояние и взаимодействуют через состояние.
В основной логике мы объединяем два потока данныхconnect
, затем по бегущей строкеkeyBy
, а затем используйтеprocess
оператор:
val stockPriceRawStream: DataStream[StockPrice] = ...
val mediaStatusStream: DataStream[Media] = ...
val warnings = stockStream.connect(mediaStream)
.keyBy(0, 0)
// 调用process函数
.process(new AlertProcessFunction())
KeyedCoProcessFunction
Конкретная реализация:
class JoinStockMediaProcessFunction extends KeyedCoProcessFunction[String, StockPrice, Media, StockPrice] {
// mediaState
private var mediaState: ValueState[String] = _
override def open(parameters: Configuration): Unit = {
// 从RuntimeContext中获取状态
mediaState = getRuntimeContext.getState(
new ValueStateDescriptor[String]("mediaStatusState", classOf[String]))
}
override def processElement1(stock: StockPrice,
context: KeyedCoProcessFunction[String, StockPrice, Media, StockPrice]#Context,
collector: Collector[StockPrice]): Unit = {
val mediaStatus = mediaState.value()
if (null != mediaStatus) {
val newStock = stock.copy(mediaStatus = mediaStatus)
collector.collect(newStock)
}
}
override def processElement2(media: Media,
context: KeyedCoProcessFunction[String, StockPrice, Media, StockPrice]#Context,
collector: Collector[StockPrice]): Unit = {
// 第二个流更新mediaState
mediaState.update(media.status)
}
}
Этот пример относительно прост. Таймер не используется. В реальных бизнес-сценариях состояние обычно использует таймер для очистки состояния с истекшим сроком действия. Пример сплайсинга машинного обучения многих интернет-приложений может полагаться на эту функцию для реализации: функции машинного обучения сервера генерируются в режиме реального времени, а поведение пользователя в приложении генерируется после взаимодействия Эти два принадлежат к двум разным потокам данных , что может быть. Эта логика соединяет два потока данных вместе и быстрее получает образцы данных для следующего этапа машинного обучения путем объединения. Промежуточные данные двух потоков данных помещаются в состояние.Чтобы избежать бесконечного роста состояния, необходимо использовать Таймер для очистки просроченного состояния.
Обратите внимание, что при использовании времени события водяной знак должен быть установлен для обоих потоков данных.Задаются только время события и водяной знак одного потока.CoProcessFunction
а такжеKeyedCoProcessFunction
используйте функцию таймера вprocess
Операторы не могут определить, когда они должны обрабатывать данные.