ProcessFunction: руководство по использованию API нижнего уровня Flink.

Flink
ProcessFunction: руководство по использованию API нижнего уровня Flink.

Некоторые из упомянутых выше операторов и функций могут выполнять некоторые временные операции, но не могут получить текущее время обработки или отметку времени водяного знака оператора, который прост в вызове, но относительно ограничен в функциях. Если вы хотите получить отметку времени водяного знака в потоке данных или перемещаться вперед и назад во времени, вам нужно использоватьProcessFunctionРяд функций, которые представляют собой API-интерфейсы самого низкого уровня в системе Flink, предоставляют более подробные разрешения на операции с потоками данных. Flink SQL реализован на основе этих функций, и некоторые бизнес-сценарии, требующие высокой степени персонализации, также должны использовать эти функции.

二维码

В настоящее время эта серия функций в основном включаетKeyedProcessFunction,ProcessFunction,CoProcessFunction,KeyedCoProcessFunction,ProcessJoinFunctionа такжеProcessWindowFunctionМногие функции, эти функции сосредоточены, но основная функция аналогична, в основном включает два момента:

  • Состояние: мы можем получить доступ и обновить Keyed State в этих функциях.

  • Таймер: Установите таймер, как установку будильника Мы можем разработать более сложную бизнес-логику во временном измерении.

Введение в статус можно сослаться на мою статью:Подробное объяснение управления состоянием Flink,Здесь мы сосредоточимся на использованииProcessFunctionНесколько других особенностей. Весь код в этой статье загружен на мой гитхаб:Github.com/ Дорога положительная/...

Как использовать Таймер

Мы можем понимать Таймер как будильник.Перед использованием зарегистрируйте будущее время в Таймере.Когда это время наступит, будильник "зазвенит", программа выполнит функцию обратного вызова, а в обратном вызове будет выполнена определенная бизнес-логика функция. здесь сKeyedProcessFunctionНапример, ввести регистрацию и использование Timer.

ProcessFunctionЕсть два важных интерфейсаprocessElementа такжеonTimerprocessElementПодпись Java функции в исходном коде выглядит следующим образом:

// 处理数据流中的一条元素
public abstract void processElement(I value, Context ctx, Collector<O> out)

processElementметод обрабатывает элемент в потоке данных и передаетCollector<O>выход.Contextзаключается в том, что он отличается отFlatMapFunctionТакие функции, как обычные функции, разработчики могут передатьContextчтобы получить метку времени, посетитеTimerService, установите Таймер.

Другой интерфейс естьonTimer:

// 时间到达后的回调函数
public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out)

Это функция обратного вызова, когда наступит время «будильника», Flink позвонитonTimerи выполнить некоторую бизнес-логику. Здесь также есть параметрOnTimerContext, который фактически наследует предыдущийContext,а такжеContextпочти то же самое.

Основная логика метода с использованием Timer такова:

  1. существуетprocessElementчерез методContextЗарегистрируйте будущую метку времени t. Семантика этой метки времени может быть временем обработки или временем события, которое выбирается в соответствии с бизнес-требованиями.
  2. существуетonTimerРеализуйте некоторую логику в методе для достижения времени t,onTimerметод вызывается автоматически.

отContextмы можем получитьTimerService, который представляет собой интерфейс для доступа к меткам времени и таймерам. мы можем пройтиContext.timerService.registerProcessingTimeTimerили ``Context.timerService.registerEventTimeTimer这两个方法来注册Timer,只需要传入一个时间戳即可。我们可以通过Context.timerService.deleteProcessingTimeTimerContext.timerService.deleteEventTimeTimer来删除之前注册的Timer。此外,还可以从中获取当前的时间戳:Context.timerService.currentProcessingTimeContext.timerService.currentWatermark`. Как видно из имени функции, вот функции, которые появляются парами, и два метода соответствуют семантике двух времен соответственно.

Обратите внимание, что мы можем толькоKeyedStreamЗарегистрируйте таймер на . Разные таймеры могут быть зарегистрированы с разными временными метками для каждого ключа, но только один таймер может быть зарегистрирован для каждой временной метки каждого ключа. Если вы хотите быть вDataStreamПрименив Timer сверху, все данные могут быть сопоставлены с поддельным ключом, но все данные будут поступать в одну подзадачу оператора.

Давайте еще раз объясним, как использовать таймер в сценарии торговли акциями. Сделка с акциями включает в себя: код акции, метку времени, цену акции и объем торгов. Теперь мы хотим увидеть, росли ли акции непрерывно в течение 10 секунд, и если да, то отправить предупреждение.

case class StockPrice(symbol: String, ts: Long, price: Double, volume: Int)

class IncreaseAlertFunction(intervalMills: Long)
extends KeyedProcessFunction[String, StockPrice, String] {

  // 状态:保存某支股票上次交易价格
  lazy val lastPrice: ValueState[Double] =
  getRuntimeContext.getState(
    new ValueStateDescriptor[Double]("lastPrice", Types.of[Double])
  )

  // 状态:保存某支股票的定时器时间戳
  lazy val currentTimer: ValueState[Long] =
  getRuntimeContext.getState(
    new ValueStateDescriptor[Long]("timer", Types.of[Long])
  )

  override def processElement(stock: StockPrice,
                              context: KeyedProcessFunction[String, StockPrice, String]#Context,
                              out: Collector[String]): Unit = {

    // 获取lastPrice状态中的数据,第一次使用时会被初始化为0
    val prevPrice = lastPrice.value()
    // 更新lastPrice
    lastPrice.update(stock.price)
    val curTimerTimestamp = currentTimer.value()
    if (prevPrice == 0.0) {
      // 第一次使用,不做任何处理
    } else if (stock.price < prevPrice) {
      // 如果新流入的股票价格降低,删除Timer,否则该Timer一直保留
      context.timerService().deleteEventTimeTimer(curTimerTimestamp)
      currentTimer.clear()
    } else if (stock.price >= prevPrice && curTimerTimestamp == 0) {
      // 如果新流入的股票价格升高
      // curTimerTimestamp为0表示currentTimer状态中是空的,还没有对应的Timer
      // 新Timer = 当前时间 + interval
      val timerTs = context.timestamp() + intervalMills

      val formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")
      context.timerService().registerEventTimeTimer(timerTs)
      // 更新currentTimer状态,后续数据会读取currentTimer,做相关判断
      currentTimer.update(timerTs)
    }
  }

  override def onTimer(ts: Long,
                       ctx: KeyedProcessFunction[String, StockPrice, String]#OnTimerContext,
                       out: Collector[String]): Unit = {

    val formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")

    out.collect("time: " + formatter.format(ts) + ", symbol: '" + ctx.getCurrentKey +
                " monotonically increased for " + intervalMills + " millisecond.")
    // 清空currentTimer状态
    currentTimer.clear()
  }
}

В основной логике передайте следующееprocessвызов оператораKeyedProcessFunction:

val inputStream: DataStream[StockPrice] = ...
val warnings = inputStream
      .keyBy(stock => stock.symbol)
      // 调用process函数
      .process(new IncreaseAlertFunction(10000))

При контрольной точке таймер также будет сохранен вместе с другими данными о состоянии. Если некоторые таймеры установлены с использованием семантики времени обработки, а отметка времени истекла при перезапуске, эти функции обратного вызова будут вызваны и выполнены немедленно.

Боковой выход Боковой выход

ProcessFunctionДругая основная особенность заключается в том, что часть данных может быть отправлена ​​в другой поток, а типы данных двух потоков могут быть разными. Мы проходимOutputTag[T]чтобы пометить другой поток данных. существуетProcessFunctionОтфильтруйте данные определенного типа следующим образом:

class IncreaseAlertFunction(intervalMills: Long) extends KeyedProcessFunction[String, Stock, String] {

  override def processElement(stock: Stock,
                              context: KeyedProcessFunction[String, Stock, String]#Context,
                              out: Collector[String]): Unit = {

    // 其他业务逻辑...
    // 定义一个OutputTag,Stock为这个SideOutput流的数据类型
    val highVolumeOutput: OutputTag[Stock] = new OutputTag[Stock]("high-volume-trade")

    if (stock.volume > 1000) {
      // 将Stock筛选出来发送到该OutputTag下
      context.output(highVolumeOutput, stock)
    }
  }
}

В основной логике побочный вывод получается следующим методом:

// 收集SideOutput
val outputTag: OutputTag[Stock] = OutputTag[Stock]("high-volume-trade")
val sideOutputStream: DataStream[Stock] = mainStream.getSideOutput(outputTag)

Как видно из этого примера,KeyedProcessFunctionТип выводаString, а тип вывода SideOutput —Stock, они могут быть разными.

использоватьProcessFunctionРеализовать присоединение

Если вы хотите реализовать объединение двух потоков данных с более высокой степенью детализации, вы можете использоватьCoProcessFunctionилиKeyedCoProcessFunction. Обе функции имеютprocessElement1а такжеprocessElement2способ обработки каждого элемента первого потока данных и второго потока данных по отдельности. Типы данных и типы вывода двух потоков данных могут отличаться друг от друга. Хотя данные поступают из двух разных потоков, они могут иметь одно и то же состояние, поэтому вы можете обратиться к следующей логике для реализации соединения:

  • Создайте множество состояний, оба потока данных могут получить доступ к этим состояниям, здесь как состояние A в качестве примера.
  • processElement1Метод обрабатывает первый поток данных, обновляя состояние a.
  • processElement2Метод обрабатывает второй поток данных и на основе данных в состоянии а генерирует соответствующий вывод.

На этот раз мы обсудим два потока данных о цене акций в сочетании с оценкой СМИ Предположим, что существует поток данных оценки СМИ для определенной акции, и этот поток данных содержит положительные и отрицательные оценки акций. Два потока данных текут вместеKeyedCoProcessFunction,processElement2Метод обрабатывает входящие данные мультимедиа, обновляя рейтинги мультимедиа до состоянияmediaStateначальство,processElement1Метод обрабатывает входящие данные о транзакциях с акциями, получает состояние mediaState` и генерирует новый поток данных. Эти два метода обрабатывают два потока данных по отдельности, совместно используют состояние и взаимодействуют через состояние.

В основной логике мы объединяем два потока данныхconnect, затем по бегущей строкеkeyBy, а затем используйтеprocessоператор:

val stockPriceRawStream: DataStream[StockPrice] = ...
val mediaStatusStream: DataStream[Media] = ...
val warnings = stockStream.connect(mediaStream)
      .keyBy(0, 0)
      // 调用process函数
      .process(new AlertProcessFunction())

KeyedCoProcessFunctionКонкретная реализация:

class JoinStockMediaProcessFunction extends KeyedCoProcessFunction[String, StockPrice, Media, StockPrice] {

  // mediaState
  private var mediaState: ValueState[String] = _

  override def open(parameters: Configuration): Unit = {

    // 从RuntimeContext中获取状态
    mediaState = getRuntimeContext.getState(
      new ValueStateDescriptor[String]("mediaStatusState", classOf[String]))

  }

  override def processElement1(stock: StockPrice,
                               context: KeyedCoProcessFunction[String, StockPrice, Media, StockPrice]#Context,
                               collector: Collector[StockPrice]): Unit = {

    val mediaStatus = mediaState.value()
    if (null != mediaStatus) {
      val newStock = stock.copy(mediaStatus = mediaStatus)
      collector.collect(newStock)
    }

  }

  override def processElement2(media: Media,
                               context: KeyedCoProcessFunction[String, StockPrice, Media, StockPrice]#Context,
                               collector: Collector[StockPrice]): Unit = {
    // 第二个流更新mediaState
    mediaState.update(media.status)
  }

}

Этот пример относительно прост. Таймер не используется. В реальных бизнес-сценариях состояние обычно использует таймер для очистки состояния с истекшим сроком действия. Пример сплайсинга машинного обучения многих интернет-приложений может полагаться на эту функцию для реализации: функции машинного обучения сервера генерируются в режиме реального времени, а поведение пользователя в приложении генерируется после взаимодействия Эти два принадлежат к двум разным потокам данных , что может быть. Эта логика соединяет два потока данных вместе и быстрее получает образцы данных для следующего этапа машинного обучения путем объединения. Промежуточные данные двух потоков данных помещаются в состояние.Чтобы избежать бесконечного роста состояния, необходимо использовать Таймер для очистки просроченного состояния.

Обратите внимание, что при использовании времени события водяной знак должен быть установлен для обоих потоков данных.Задаются только время события и водяной знак одного потока.CoProcessFunctionа такжеKeyedCoProcessFunctionиспользуйте функцию таймера вprocessОператоры не могут определить, когда они должны обрабатывать данные.