1. Концепция окна
В большинстве сценариев поток данных, который нам нужно подсчитать, не ограничен, поэтому мы не можем ждать завершения всего потока данных перед подсчетом. Обычно нам нужно только выполнять статистический анализ данных в пределах определенного диапазона времени или количества: например, подсчитывать клики всех продуктов за последний час каждые пять минут или после каждой 1000 кликов. рейтинг кликов для каждого элемента. Во Flink мы используем окна (Window) для реализации таких функций. В соответствии с различными статистическими размерами окна во Flink можно разделить на окна времени и окна счета.
2. Окна времени
Окна времени используются для агрегирования данных со временем в качестве измерения, которое делится на следующие четыре категории:
2.1 Tumbling Windows
Переворачивающиеся окна — это окна, которые не перекрывают друг друга. Например, если клики товаров за последний 1 час подсчитываются каждый 1 час, то 1 день можно разделить только на 24 окна, и каждое окно не перекрывается друг с другом, следующим образом:
Здесь мы берем статистику частоты слов в качестве примера, чтобы дать конкретный вариант использования Код выглядит следующим образом:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 接收socket上的数据输入
DataStreamSource<String> streamSource = env.socketTextStream("hadoop001", 9999, "\n", 3);
streamSource.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Long>> out) throws Exception {
String[] words = value.split("\t");
for (String word : words) {
out.collect(new Tuple2<>(word, 1L));
}
}
}).keyBy(0).timeWindow(Time.seconds(3)).sum(1).print(); //每隔3秒统计一次每个单词出现的数量
env.execute("Flink Streaming");
Результаты теста следующие:
2.2 Sliding Windows
Скользящие окна используются для анализа скользящей агрегации, например, если клики всех товаров за последний час подсчитываются каждые 6 минут, статистические окна перекрывают друг друга, то есть сутки можно разделить на 240 окон. Схема выглядит следующим образом:
Можно видеть, что все четыре окна, окна 1-4, имеют одинаковое время перекрытия друг с другом. Чтобы реализовать скользящее окно, вам нужно только передать дополнительный второй параметр в качестве времени прокрутки при использовании метода timeWindow следующим образом:
// 每隔3秒统计一次过去1分钟内的数据
timeWindow(Time.minutes(1),Time.seconds(3))
2.3 Session Windows
Когда пользователь постоянно просматривает страницы, данные о кликах могут быть всегда. Например, во время интервала активности пользователь может часто добавлять и удалять определенные типы продуктов в корзину, и вам нужно знать только время последнего просмотра. Состояние корзины может быть засчитано после завершения сеанса, проведенного пользователем. Чтобы получить этот тип статистики, вы можете сделать это через Session Windows.
Конкретный код реализации выглядит следующим образом:
// 以处理时间为衡量标准,如果10秒内没有任何数据输入,就认为会话已经关闭,此时触发统计
window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))
// 以事件时间为衡量标准
window(EventTimeSessionWindows.withGap(Time.seconds(10)))
2.4 Global Windows
Последнее окно является глобальным окном.Глобальное окно будет назначать все элементы с одним и тем же ключом одному и тому же окну, которое обычно используется с триггерами. Если нет соответствующего триггера, расчет не будет выполнен.
Здесь мы продолжим использовать приведенный выше случай статистики частоты слов в качестве примера.Пример кода выглядит следующим образом:
// 当单词累计出现的次数每达到10次时,则触发计算,计算整个窗口内该单词出现的总数
window(GlobalWindows.create()).trigger(CountTrigger.of(10)).sum(1).print();
3. Подсчитайте окна
Окна подсчета используются для агрегирования данных с количеством в качестве измерения.Оно также делится на скользящее окно и скользящее окно.Метод реализации точно такой же, как и у временного окна, но вызываемый API отличается следующим образом:
// 滚动计数窗口,每1000次点击则计算一次
countWindow(1000)
// 滑动计数窗口,每10次点击发生后,则计算过去1000次点击的情况
countWindow(1000,10)
На самом деле внутренняя часть окна подсчета реализована путем вызова глобального окна, которое мы представили в предыдущем разделе.Исходный код выглядит следующим образом:
public WindowedStream<T, KEY, GlobalWindow> countWindow(long size) {
return window(GlobalWindows.create()).trigger(PurgingTrigger.of(CountTrigger.of(size)));
}
public WindowedStream<T, KEY, GlobalWindow> countWindow(long size, long slide) {
return window(GlobalWindows.create())
.evictor(CountEvictor.of(size))
.trigger(CountTrigger.of(slide));
}
использованная литература
Флинк Windows:this.apache.org/projects/legal…
Другие статьи серии больших данных можно найти в проекте с открытым исходным кодом GitHub.:Руководство для начинающих по большим данным