Серия Flink (6) - оконная модель Flink

Flink

1. Концепция окна

В большинстве сценариев поток данных, который нам нужно подсчитать, не ограничен, поэтому мы не можем ждать завершения всего потока данных перед подсчетом. Обычно нам нужно только выполнять статистический анализ данных в пределах определенного диапазона времени или количества: например, подсчитывать клики всех продуктов за последний час каждые пять минут или после каждой 1000 кликов. рейтинг кликов для каждого элемента. Во Flink мы используем окна (Window) для реализации таких функций. В соответствии с различными статистическими размерами окна во Flink можно разделить на окна времени и окна счета.

2. Окна времени

Окна времени используются для агрегирования данных со временем в качестве измерения, которое делится на следующие четыре категории:

2.1 Tumbling Windows

Переворачивающиеся окна — это окна, которые не перекрывают друг друга. Например, если клики товаров за последний 1 час подсчитываются каждый 1 час, то 1 день можно разделить только на 24 окна, и каждое окно не перекрывается друг с другом, следующим образом:

https://github.com/heibaiying

Здесь мы берем статистику частоты слов в качестве примера, чтобы дать конкретный вариант использования Код выглядит следующим образом:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 接收socket上的数据输入
DataStreamSource<String> streamSource = env.socketTextStream("hadoop001", 9999, "\n", 3);
streamSource.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
    @Override
    public void flatMap(String value, Collector<Tuple2<String, Long>> out) throws Exception {
        String[] words = value.split("\t");
        for (String word : words) {
            out.collect(new Tuple2<>(word, 1L));
        }
    }
}).keyBy(0).timeWindow(Time.seconds(3)).sum(1).print(); //每隔3秒统计一次每个单词出现的数量
env.execute("Flink Streaming");

Результаты теста следующие:

https://github.com/heibaiying

2.2 Sliding Windows

Скользящие окна используются для анализа скользящей агрегации, например, если клики всех товаров за последний час подсчитываются каждые 6 минут, статистические окна перекрывают друг друга, то есть сутки можно разделить на 240 окон. Схема выглядит следующим образом:

https://github.com/heibaiying

Можно видеть, что все четыре окна, окна 1-4, имеют одинаковое время перекрытия друг с другом. Чтобы реализовать скользящее окно, вам нужно только передать дополнительный второй параметр в качестве времени прокрутки при использовании метода timeWindow следующим образом:

// 每隔3秒统计一次过去1分钟内的数据
timeWindow(Time.minutes(1),Time.seconds(3))

2.3 Session Windows

Когда пользователь постоянно просматривает страницы, данные о кликах могут быть всегда. Например, во время интервала активности пользователь может часто добавлять и удалять определенные типы продуктов в корзину, и вам нужно знать только время последнего просмотра. Состояние корзины может быть засчитано после завершения сеанса, проведенного пользователем. Чтобы получить этот тип статистики, вы можете сделать это через Session Windows.

https://github.com/heibaiying

Конкретный код реализации выглядит следующим образом:

// 以处理时间为衡量标准,如果10秒内没有任何数据输入,就认为会话已经关闭,此时触发统计
window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))
// 以事件时间为衡量标准    
window(EventTimeSessionWindows.withGap(Time.seconds(10)))

2.4 Global Windows

Последнее окно является глобальным окном.Глобальное окно будет назначать все элементы с одним и тем же ключом одному и тому же окну, которое обычно используется с триггерами. Если нет соответствующего триггера, расчет не будет выполнен.

https://github.com/heibaiying

Здесь мы продолжим использовать приведенный выше случай статистики частоты слов в качестве примера.Пример кода выглядит следующим образом:

// 当单词累计出现的次数每达到10次时,则触发计算,计算整个窗口内该单词出现的总数
window(GlobalWindows.create()).trigger(CountTrigger.of(10)).sum(1).print();

3. Подсчитайте окна

Окна подсчета используются для агрегирования данных с количеством в качестве измерения.Оно также делится на скользящее окно и скользящее окно.Метод реализации точно такой же, как и у временного окна, но вызываемый API отличается следующим образом:

// 滚动计数窗口,每1000次点击则计算一次
countWindow(1000)
// 滑动计数窗口,每10次点击发生后,则计算过去1000次点击的情况
countWindow(1000,10)

На самом деле внутренняя часть окна подсчета реализована путем вызова глобального окна, которое мы представили в предыдущем разделе.Исходный код выглядит следующим образом:

public WindowedStream<T, KEY, GlobalWindow> countWindow(long size) {
    return window(GlobalWindows.create()).trigger(PurgingTrigger.of(CountTrigger.of(size)));
}


public WindowedStream<T, KEY, GlobalWindow> countWindow(long size, long slide) {
    return window(GlobalWindows.create())
        .evictor(CountEvictor.of(size))
        .trigger(CountTrigger.of(slide));
}

использованная литература

Флинк Windows:this.apache.org/projects/legal…

Другие статьи серии больших данных можно найти в проекте с открытым исходным кодом GitHub.:Руководство для начинающих по большим данным