Крис поможет вам быстро начать работу с Flink

Flink
Крис поможет вам быстро начать работу с Flink

I. Обзор

1.1 Эволюция технологии потоковой обработки

В мире открытого исходного кода проект Apache Storm является пионером в области потоковой обработки. Storm был впервые разработан Натаном Марцем и командой стартапа BackType, прежде чем был принят Apache Foundation. Storm обеспечивает потоковую обработку с малой задержкой, но приходится платить за производительность в реальном времени:Трудно достичь высокой производительности, и его корректность не соответствует обычно требуемой, т. е. не гарантируетexactly-once, даже на том уровне корректности, который он гарантирует, это довольно дорого.

Поддерживать хорошую отказоустойчивость в системе обработки потоков с малой задержкой и высокой пропускной способностью очень сложно, но для получения гарантированно точного состояния был придуман альтернативный подход:Разделите потоковые данные в непрерывном режиме на серию небольших пакетных заданий.. При достаточно малом разбиении (так называемые микропакетные задания) вычисления могут быть почти по-настоящему потоковыми. Полный режим реального времени невозможен из-за задержки, но каждое простое приложение может обеспечить задержку всего в несколько секунд или даже доли секунды. Это подход, используемый Spark Streaming, работающим на пакетном движке Spark.

Что еще более важно, при микропакетном подходе можно достичь семантики ровно один раз, что гарантирует согласованность состояний. В случае сбоя микропакета его можно запустить повторно, что проще, чем подход с непрерывной потоковой обработкой. Storm Trident является расширением Storm, и его основной механизм обработки потоков основан на методе микропакетов для выполнения вычислений, таким образом реализуя семантику ровно один раз, но он платит большую цену с точки зрения задержки.

Для стратегий микропакетов, таких как Storm Trident и Spark Streaming, их можно разделить только в соответствии с кратностью времени пакетного задания, а данные о событиях нельзя разделить в соответствии с реальной ситуацией.Более того, для некоторых заданий, чувствительных к задержке, разработчикам часто приходится писать бизнес-код, для повышения производительности требуется много усилий. Эти недостатки в гибкости и выразительности замедляют разработку этих стратегий микропакетов и повышают затраты на эксплуатацию и обслуживание.

В результате появился Flink.Эта техническая структура позволяет избежать вышеперечисленных недостатков, имеет много необходимых функций и может эффективно обрабатывать данные в соответствии с непрерывными событиями.Некоторые особенности Flink показаны на следующем рисунке:

1.2 Первое знакомство с Flink

Flink возник из проекта Stratosphere.Stratosphere — это исследовательский проект, совместно осуществляемый тремя университетами Берлина и некоторыми другими университетами Европы с 2010 по 2014 год.В апреле 2014 года код Stratosphere был скопирован и передан в дар Apache Software Fund.Да , первые члены, участвующие в этом инкубационном проекте, являются основными разработчиками системы Stratosphere.В декабре 2014 года Flink стал проектом верхнего уровня Apache Software Foundation.

В переводе с немецкого слово Flink означает «быстрый и ловкий», а в качестве логотипа проекта используется красочный рисунок белки не только потому, что белки быстрые и ловкие, но и потому, что берлинские белки имеют привлекательный красновато-коричневый цвет, а The Flink’s The Логотип белки имеет симпатичный хвост, а цвет хвоста перекликается с цветом логотипа Apache Software Foundation, что означает, что это белка в стиле Apache.

Ссылка на официальный сайт

На главной странице Flink вверху представлена ​​философия проекта: "Apache Flink — это платформа обработки потоков с открытым исходным кодом для распределенных, высокопроизводительных, готовых к использованию и точных приложений обработки потоков.".

Apache Flink — это платформа и механизм распределенной обработки для вычислений с отслеживанием состояния на неограниченных и ограниченных потоках данных. Flink предназначен для работы во всех распространенных кластерных средах, выполняя вычисления со скоростью выполнения в памяти и в произвольном масштабе.

1.3 Базовая вычислительная среда Flink

Базовой вычислительной архитектурой Flink является исполнительный механизм Flink Runtime, показанный на рисунке ниже, который представляет собой распределенную систему, способную принимать программы потока данных и выполнять их отказоустойчивым образом на одной или нескольких машинах.

Механизм выполнения Flink Runtime может работать в кластере как приложение YARN (Yet Another Resource Negotiator), в кластере Mesos или на одном компьютере (что очень полезно для отладки приложений Flink).

На приведенном выше рисунке показаны основные компоненты стека технологий Flink.Стоит отметить, чтоFlink предоставляет потоково-ориентированный интерфейс (DataStream API) и пакетно-ориентированный интерфейс (DataSet API) соответственно.. Таким образом, Flink может выполнять как потоковую, так и пакетную обработку. Расширенные библиотеки, поддерживаемые Flink, включают машинное обучение (FlinkML), обработку сложных событий (CEP) и вычисление графов (Gelly), а также API-интерфейсы таблиц для потоковой обработки и пакетной обработки соответственно.

Flink — это настоящая среда вычислений больших данных, которая сочетает в себе пакетные и потоковые вычисления, интегрирует вычисления на фоне больших данных, что не только снижает сложность обучения и работы, но и эффективно реализует объединение офлайн-вычислений и вычислений в реальном времени.

Программы, которые могут быть приняты механизмом выполнения Flink Runtime, очень эффективны, но такие программы имеют длинный код и трудоемки для написания.По этой причине Flink предоставляет API-интерфейсы, инкапсулированные в механизме выполнения, чтобы помочь пользователям легко создавать программы потоковой передачи.Flink предоставляет API DataStream для потоковой обработки и API DataSet для пакетной обработки.. Стоит отметить, что, хотя исполнительный механизм Flink Runtime основан на потоковой обработке, API DataSet был разработан до API DataStream, потому что спрос отрасли на обработку бесконечных потоков в начале рождения Flink был невелик.

API DataStream может свободно анализировать бесконечные потоки данных и может быть реализован на Java или Scala. Разработчикам необходимо разрабатывать на основе структуры данных под названием DataStream, которая используется для представления бесконечного распределенного потока данных.

Распределенная особенность Flink заключается в том, что он может работать на сотнях или тысячах машин, делит большие вычислительные задачи на множество мелких частей, и каждая машина выполняет часть.. Flink может автоматически гарантировать, что вычисления могут быть продолжены в случае сбоя машины или другой ошибки, или они могут быть выполнены снова запланированным образом после исправления ошибки или выполнения обновления версии. Эта возможность позволяет разработчикам не беспокоиться о сбоях. Flink по своей сути использует отказоустойчивую потоковую передачу данных, что позволяет разработчикам анализировать данные, которые постоянно генерируются и никогда не заканчиваются (т. е. потоковая обработка).

Две базовые архитектуры Flink

2.1 Диспетчер заданий и диспетчер задач

Среда выполнения Flink содержит два типа процессоров:

** Процессор JobManager: ** Также называется Мастер, используется для координации распределенного выполнения, они используются для планирования задач, координации контрольных точек, восстановления при сбое координации и т. д. При работе Flink присутствует как минимум один мастер-процессор.Если настроен режим высокой доступности, то будет несколько мастер-процессоров, один из которых является ведущим, а остальные резервными.

Обработчик TaskManager: также называемый рабочим, он используется для выполнения задачи потока данных (или специальной подзадачи), буферизации данных и обмена потоками данных.При работе Flink будет как минимум один рабочий процессор.

Простая схема выглядит следующим образом

Главный и рабочий процессоры можно запускать непосредственно на физической машине или через структуру планирования ресурсов, такую ​​как YARN.

Worker подключается к Master, сообщает о своей доступности и получает задания.

2.2 Неограниченные и ограниченные потоки данных

Flink используется для обработки ограниченных и неограниченных данных:

Неограниченный поток данных:Неограниченный поток данных имеет начало, но не имеет конца, они не завершаются при генерации и предоставляют данные, неограниченные потоки должны обрабатываться непрерывно, то есть события должны обрабатываться сразу после выборки. Для неограниченных потоков данных мы не можем ждать поступления всех данных, потому что ввод не ограничен и не будет завершен в любой момент времени. Обработка неограниченных данных часто требует, чтобы события извлекались в определенном порядке (например, в том порядке, в котором они произошли), чтобы можно было вывести целостность результата, обработка неограниченных потоков называетсяпотоковая обработка.

ограниченный поток данных:Ограниченный поток данных имеет четко определенные начало и конец., ограниченный поток можно обрабатывать, извлекая все данные перед выполнением каких-либо вычислений,处理有界流不需要有序获取,因为可以始终对有界数据集进行排序, обработка ограниченных потоков также известна какпакетная обработка.

В неограниченных потоках данных и ограниченных потоках данных мы упомянули пакетную обработку и потоковую обработку, которые являются двумя распространенными методами обработки данных в системах обработки больших данных.

Характеристики пакетной обработки ограничены, постоянны и велики. Пакетная обработка очень удобна для вычислительной работы, требующей доступа к полному набору записей. Она обычно используется для автономной статистики..Потоковая обработка характеризуется неограниченной производительностью в режиме реального времени.Вместо выполнения операций над всем набором данных потоковая обработка работает с каждым элементом данных, который передается через систему., обычно используется для статистики в реальном времени.

В экосистеме Spark для пакетной обработки и потоковой обработки используются разные технические платформы. Пакетная обработка реализуется SparkSQL, а потоковая обработка реализуется Spark Streaming. Эта стратегия также принята большинством платформ. Для реализации пакетной обработки используются независимые процессоры. обработки и потоковой обработки, в то время как Flink может реализовывать пакетную и потоковую обработку одновременно.

Как Flink реализует пакетную и потоковую обработку одновременно? ответ,Flink рассматривает пакетную обработку (то есть обработку ограниченного количества статических данных) как особый вид потоковой обработки..

Apache Flink — это вычислительная платформа с открытым исходным кодом для распределенной обработки потоков данных и пакетной обработки данных. Основанная на той же среде выполнения Flink (Flink Runtime), она предоставляет функции, поддерживающие как потоковую, так и пакетную обработку приложений.. Существующие вычислительные решения с открытым исходным кодом рассматривают потоковую обработку и пакетную обработку как два разных типа приложений, поскольку их цели совершенно разные:Потоковая обработка, как правило, должна поддерживать низкую задержку и гарантии «Точно один раз».Пакетная обработка должна поддерживать высокую пропускную способность и эффективную обработку., поэтому в реализации обычно даются отдельно два набора методов реализации, либо каждая из схем обработки реализуется через независимый фреймворк с открытым исходным кодом. Например, решения с открытым исходным кодом для пакетной обработки включают MapReduce, Tez, Crunch и Spark, а решения с открытым исходным кодом для потоковой обработки включают Samza и Storm.

Когда Flink реализует потоковую и пакетную обработку, это полностью отличается от некоторых традиционных решений, он смотрит на потоковую и пакетную обработку с другой точки зрения и объединяет их:Flink полностью поддерживает потоковую обработку, что означает, что поток входных данных не ограничен при просмотре в виде потоковой обработки.;Пакетная обработка рассматривается как особый вид потока, за исключением того, что его входной поток данных определяется как ограниченный.. На основе одной и той же среды выполнения Flink (Flink Runtime) предоставляются API потоковой и пакетной обработки соответственно, и эти два API также являются основой для реализации фреймворков приложений потоковой обработки и пакетного типа верхнего уровня.

2.3 Модель программирования потока данных

Flink предоставляет различные уровни абстракции для разработки потоковых или пакетных заданий, как показано на следующей диаграмме.

Самый низкий уровень абстракции обеспечивает только потоки с отслеживанием состояния, которые встроены в API DataStream через функции обработки. Низкоуровневая функция процесса (Process Function) интегрирована с DataStream API, что позволяет выполнять низкоуровневые абстракции над некоторыми конкретными операциями, позволяет пользователям свободно обрабатывать события из одного или нескольких потоков данных, а также использовать согласованные ошибки. толерантное состояние. В дополнение к этому пользователи могут регистрировать время событий и обрабатывать обратные вызовы времени, что позволяет программам выполнять сложные вычисления.

По факту,Большинству приложений не нужны упомянутые выше низкоуровневые абстракции, вместо этого они программируют основные API, такие как API DataStream (ограниченные или неограниченные потоковые данные) и API DataSet (ограниченные наборы данных).. Эти API-интерфейсы предоставляют общие стандартные блоки для обработки данных, такие как различные определяемые пользователем преобразования, объединения, агрегации, окна и многое другое. API DataSet обеспечивает дополнительную поддержку для ограниченных наборов данных, таких как циклы и итерации. Типы данных, обрабатываемые этими API, представлены соответствующими языками программирования в виде классов.

Table API ориентирован на таблицы, где таблицы могут динамически изменяться (при выражении потоковых данных). API таблиц следует (расширенной) реляционной модели: таблицы имеют двумерную структуру данных (схему) (аналогичную таблицам в реляционных базах данных), а API предоставляет сопоставимые операции, такие как выбор, проект, объединение, группировка, агрегирование. , и т.д. . Программы Table API декларативно определяют, какие логические операции должны выполняться, а не определяют, как именно должен выглядеть код для этих операций. Хотя Table API можно расширить с помощью различных типов определяемых пользователем функций (UDF), он по-прежнему не такой выразительный, как основной API, но его гораздо проще использовать (меньше кода). Кроме того, программы Table API оптимизируются встроенным оптимизатором перед выполнением.

Вы можете легко переключаться между таблицами и потоками данных/наборами данных, чтобы позволить программам смешивать API таблиц с потоками данных и наборами данных..

Абстракцией высшего уровня, предоставляемой Flink, является SQL. Этот уровень абстракции аналогичен Table API с точки зрения синтаксиса и выразительности, но выражает программы в форме выражений запроса SQL. Абстракция SQL тесно взаимодействует с Table API, и SQL-запросы могут выполняться непосредственно в таблицах, определенных Table API.

Строительство трех кластеров Flink

Методы развертывания, которые может выбрать Flink:

Local, Standalone (низкое использование ресурсов), Yarn, Mesos, Docker, Kubernetes, AWS.

В основном мы анализируем развертывание кластера Flink в автономном режиме и режиме Yarn.

3.1 Установка в автономном режиме

Устанавливаем кластер Flink в автономном режиме и подготавливаем три виртуальные машины, одна из которых используется как JobManager (hadoop101), а две другие используются как TaskManager (hadoop102, hadoop103).

  1. во-первыхЗагрузка с официального сайта

  2. Затем отправьте загруженный сжатый пакет на виртуальную машину и распакуйте его в указанное место.

  3. Затем измените файл конфигурации

    [cris@hadoop101 conf]$ vim flink-conf.yaml
    

    Затем измените конфигурацию рабочего узла.

    [cris@hadoop101 conf]$ vim slaves
    

  4. Наконец, синхронизируйте Flink с двумя другими рабочими узлами.

    [cris@hadoop101 module]$ xsync flink-1.6.1/
    
  5. Команда запуска выглядит следующим образом

    [cris@hadoop101 bin]$ ./start-cluster.sh
    

    очень просто~

    Просмотр прогресса через jps

    [cris@hadoop101 bin]$ jpsall 
    ----------jps of hadoop101---------
    2491 StandaloneSessionClusterEntrypoint
    2555 Jps
    ----------jps of hadoop102---------
    2338 Jps
    2285 TaskManagerRunner
    ----------jps of hadoop103---------
    2212 Jps
    2159 TaskManagerRunner
    
  6. Доступ к веб-интерфейсу кластера (порт 8081)

    Появится следующий интерфейс, указывающий, что кластер Flink успешно запущен.

  7. Просто запустите задачу WC

  8. закрыть кластер

    [cris@hadoop101 bin]$ ./stop-cluster.sh 
    Stopping taskexecutor daemon (pid: 2285) on host hadoop102.
    Stopping taskexecutor daemon (pid: 2159) on host hadoop103.
    Stopping standalonesession daemon (pid: 2491) on host hadoop101.
    [cris@hadoop101 bin]$ jpsall 
    ----------jps of hadoop101---------
    3249 Jps
    ----------jps of hadoop102---------
    2842 Jps
    ----------jps of hadoop103---------
    2706 Jps
    

3.2 Установка режима пряжи

Первые четыре шага такие же, как и в автономном режиме.

  1. Укажите, что переменная среды HADOOP_HOME установлена ​​в виртуальной машине.

  2. Запустите кластер Hadoop (HDFS и Yarn)

  3. Отправьте Yarn-Session на узел hadoop101 и используйте скрипт yarn-session.sh в каталоге bin каталога установки, чтобы отправить:

    [cris@hadoop101 ~]$ /opt/module/flink-1.6.1/bin/yarn-session.sh -n 2 -s 6 -jm 1024 -tm 1024 -nm test -d
    

    в:

    -n(--container): количество диспетчеров задач.

    -s(--slots): количество слотов для каждого диспетчера задач, по умолчанию — один слот и одно ядро, а количество слотов по умолчанию для каждого диспетчера задач — 1.

    -jm: Память JobManager (в МБ).

    -tm: Память (в МБ) каждого диспетчера задач.

    -nm: Имя приложения пряжи (теперь имя в пользовательском интерфейсе пряжи).

    -d: Фоновое выполнение.

  4. Просмотрите веб-страницу Yarn после запуска, вы увидите только что отправленный сеанс:

    Просмотр информации о процессе

  5. простая задача

    [cris@hadoop101 flink-1.6.1]$ ./bin/flink run -m yarn-cluster examples/batch/WordCount.jar
    

    Терминал выводит результат напрямую

    Глядя на веб-интерфейс

Четыре работающая архитектура Flink

4.1 Процесс отправки задания

После отправки задачи Flink клиент загружает пакет Flink Jar и конфигурацию в HDFS, а затем отправляет задачу в Yarn ResourceManager, который выделяет ресурсы контейнера и уведомляет соответствующий NodeManager о запуске ApplicationMaster.

После запуска ApplicationMaster загружает пакет Flink Jar и настраивает среду сборки, а затем запускает JobManager. После этого ApplicationMaster подает заявку на ресурсы в ResourceManager для запуска TaskManager. После того, как ResourceManager выделяет ресурсы контейнера, ApplicationMaster уведомляет NodeManager об узле, где находится ресурс. для запуска диспетчера задач.

NodeManager загружает пакет Flink Jar, настраивает среду сборки и запускает TaskManager. После запуска TaskManager он отправляет пакет пульса в JobManager и ждет, пока JobManager назначит ему задачи.

4.2 Диспетчер задач и слоты

Каждый TaskManager — это процесс JVM, который может выполнять одну или несколько подзадач в отдельных потоках.. Чтобы контролировать, сколько задач может получить рабочий, рабочие контролируются слотами задач (у рабочего есть как минимум один слот задач). ·

Каждый слот задачи представляет собой подмножество ресурсов фиксированного размера, которыми владеет TaskManager. Если TaskManager имеет три слота, он разделит управляемую им память на три для каждого слота.Размещение ресурсов означает, что подзадаче не нужно будет конкурировать с подзадачами из других заданий за управляемую память, вместо этого у нее будет определенный объем памяти в резерве.. Следует отметить, что изоляция ЦП здесь не задействована, а слоты в настоящее время используются только для изоляции управляемой памяти задач.

Позволяет пользователям определять, как подзадачи изолированы друг от друга, регулируя количество слотов задач.. Если TaskManager имеет один слот, это будет означать, что каждая группа задач работает в отдельной JVM (JVM может быть запущена определенным контейнером), а несколько слотов TaskManager означает, что несколько подзадач могут совместно использовать одну и ту же JVM. Задачи в одном и том же процессе JVM будут совместно использовать соединения TCP (на основе мультиплексирования) и сообщения пульса. Они также могут совместно использовать наборы данных и структуры данных, что снижает нагрузку на каждую задачу.

TaskSlot — это статическая концепция, которая относится к возможности одновременного выполнения TaskManager**, которую можно настроить с помощью параметра taskmanager.numberOfTaskSlots, иПараллелизм Параллелизм — это динамическая концепция, то есть возможность параллелизма, фактически используемая TaskManager при запуске программы., который можно настроить через параметр parallelism.default.

То есть, если предположить, что всего имеется 3 диспетчера задач, и каждому диспетчеру задач назначено 3 слота задач, то есть каждый менеджер задач может получить 3 задачи, всего 9 слотов задач, если мы установим parallelism.default=1, то есть работающая программа по умолчанию Степень параллелизма равна 1, используется только 1 из 9 слотов задач, а 8 простаивают, поэтому установка подходящей степени параллелизма может повысить эффективность.

4.3 Dataflow

Программа Flink состоит из трех основных компонентов: Источник, Преобразование и Приемник. Источник в основном отвечает за чтение данных, Преобразование в основном отвечает за операцию преобразования принадлежности, Приемник отвечает за вывод окончательных данных и поток данных между ними. каждый компонент называется потоком (streams).

Основные строительные блоки программы Flink:поток(потоки) сконвертировать(преобразования) (следует отметить, что наборы данных, используемые Flink DataSet API, также являются внутренними потоками). Поток можно рассматривать как промежуточный результат, а преобразование — это операция, которая принимает один или несколько потоков в качестве входных данных, и операция использует эти потоки для выполнения вычислений для получения одного или нескольких потоков результатов.

Во время выполнения программы, работающие на Flink, сопоставляются с потоковыми потоками данных, которые содержат операторы потоков и преобразований.. Каждый поток данных начинается с одного или нескольких источников и заканчивается одним или несколькими приемниками.Поток данных подобен любому направленному ациклическому графу (DAG).

4.4 Параллельные потоки данных

Выполнение программ Flink является параллельным и распределенным. Во время выполнения поток содержит один или несколько разделов потока, а каждый оператор содержит одну или несколько подзадач оператора, которые выполняются независимо друг от друга в разных потоках, на разных физических машинах или в разных контейнерах.

Количество подзадач конкретного оператора называется его параллелизмом (параллелизмом). Параллелизм потока всегда равен параллелизму его производящего оператора. В программе разные операторы могут иметь разную степень параллелизма.

Потоки могут передавать данные между операторами в режиме «один-к-одному» (пересылке) или в режиме перераспределения, в зависимости от типа оператора.

One-to-one:Потоки (например, между источником и оператором карты) поддерживают разделы и порядок элементов.. Это означает, что количество и порядок элементов, видимых подзадачей оператора карты, совпадают с количеством и порядком элементов, созданных подзадачей оператора источника, а такие операторы, как map, fliter и flatMap, являются одним -однозначные соответствия.

Redistributing:Эта операция изменит количество разделов данных. Каждая подзадача оператора отправляет данные в разные целевые подзадачи в соответствии с выбранным преобразованием. Например, перераспределения keyBy() на основе хэш-кода, широковещательной передачи и перебалансировки будут случайным образом перераспределяться, эти операторы вызовут процесс перераспределения, а процесс перераспределения аналогичен процессу перемешивания в Spark.

4.5 задачи и цепочки операторов

В целях распределенного выполнения Flink связывает подзадачи операторов вместе, чтобы сформировать задачи, и каждая задача выполняется в потоке.. Объединение операторов в задачи — очень эффективная оптимизация:Это может уменьшить переключение между потоками и обмен данными на основе области буфера и повысить пропускную способность при одновременном уменьшении задержки.. Поведение ссылки можно указать в программном API.

На следующем рисунке показаны 5 подзадач, выполняемых в 5 параллельных потоках:

4.6 Процесс планирования задач

Клиент не является частью среды выполнения и выполнения программы, но используется для подготовки и отправки потока данных Мастеру. Затем клиент отключается или остается подключенным для ожидания получения результатов вычислений. Клиент может работать двумя способами: либо как Java Часть программы /Scala запускается программой или выполняется с помощью командной строки ./bin/flink run.

API Five Flink DataStream

5.1 Работающая модель Flink

Выше приведена рабочая модель Flink.Программа Flink в основном состоит из трех частей, а именно Source, Transformation и Sink. DataSource в основном отвечает за чтение данных, Transformation в основном отвечает за соответствующие операции преобразования, а Sink отвечает за вывод конечных данных.

5.2 Архитектура программы Flink

Каждая программа Flink содержит несколько следующих процессов:

  • Получить среду выполнения; (Execution Environment)

  • загрузить/создать исходные данные; (Источник)

  • Укажите преобразование этих данных; (Преобразование)

  • Укажите место для размещения результата расчета; (Раковина)

  • запускать выполнение программы

5.3 Environment

Среда выполнения StreamExecutionEnvironment является основой всех программ Flink..

Существует три способа создания среды выполнения:

StreamExecutionEnvironment.getExecutionEnvironment
StreamExecutionEnvironment.createLocalEnvironment
StreamExecutionEnvironment.createRemoteEnvironment

StreamExecutionEnvironment.getExecutionEnvironment

Создает среду выполнения, представляющую контекст выполняемой в данный момент программы. Если программа вызывается независимо, этот метод возвращает локальную среду выполнения, если программа вызывается из клиента командной строки для отправки в кластер, этот метод возвращает среду выполнения для этого кластера, то есть getExecutionEnvironment определяется исходя из того, как выполняется запрос. Тип рабочей среды, который возвращается, является наиболее часто используемым способом создания среды выполнения.

val env = StreamExecutionEnvironment.getExecutionEnvironment

5.4 Source

I Файловый источник данных

  1. readTextFile(path)

    Считывает текстовый файл, соответствующий спецификации TextInputFormat, столбец за столбцом, и возвращает результат в виде строки.

    object Test {
      def main(args: Array[String]): Unit = {
        // 1. 初始化 Flink 执行环境
        val executionEnvironment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
        // 2. 读取指定路径的文本文件
        val stream: DataStream[String] = executionEnvironment.readTextFile("test00.txt")
    
        // 3. action 算子对 DataStream 中的数据打印
        stream.print()
    
        // 4. 启动 Flink 应用
        executionEnvironment.execute("test")
      }
    }
    

    Результат печати терминала

    1> apache spark hadoop flume
    1> kafka hbase hive flink
    4> apache spark hadoop flink
    5> kafka hbase hive flink
    6> sqoop hue oozie zookeeper
    8> apache spark hadoop flume
    3> kafka hbase oozie zookeeper
    2> sqoop hue oozie zookeeper
    7> flink oozie azakaban spark
    

    Уведомление:stream.print(): Число перед каждой строкой представляет, какой параллельный поток выводит строку.

    Вы также можете читать файлы в соответствии с указанным fileInputFormat

    readFile(fileInputFormat, path)

  2. Источник данных на основе сокетов

    Чтение информации из сокета

    object Test {
      def main(args: Array[String]): Unit = {
        // 1. 初始化 Flink 执行环境
        val executionEnvironment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
        
        val stream: DataStream[String] = executionEnvironment.socketTextStream("localhost", 1234)
    
        // 3. action 算子对 DataStream 中的数据打印
        stream.print()
    
        // 4. 启动 Flink 应用
        executionEnvironment.execute("test")
      }
    }
    

  3. Источники данных на основе коллекции

    1. fromCollection(seq): создать поток данных из коллекции, все элементы в коллекции относятся к одному типу.

      val stream: DataStream[Int] = executionEnvironment.fromCollection(List(1,2,3,4))
      
    2. fromCollection(Iterator): Создает поток данных из итератора (Iterator), итератор возвращает класс указанного типа данных элемента

      val stream: DataStream[Int] = executionEnvironment.fromCollection(Iterator(3,1,2))
      
    3. fromElements(elements:_*): создает поток из заданной последовательности объектов, все объекты должны быть одного типа

      val list = List(1,2,3)
      val stream: DataStream[List[Int]] = executionEnvironment.fromElements(list)
      
    4. generateSequence(from, to): генерировать последовательность чисел параллельно из заданного интервала

      val stream: DataStream[Long] = executionEnvironment.generateSequence(1,10)
      

5.5 Sink

Приемник данных потребляет данные в потоке данных и пересылает их в файл, сокет, внешнюю систему или распечатывает их.

Flink имеет множество встроенных форматов вывода, заключенных в операции DataStream.

1. writeAsText

Записывает элементы построчно (TextOutputFormat) в виде строк, полученных путем вызова метода toString() каждого элемента.

2. WriteAsCsv

Записывает элементы в файл, разделенные запятыми (CsvOutputFormat).Разделение между строками и полями настраивается. Значение каждого поля поступает из метода toString() объекта.

3. print/printToErr

Выводит значение метода toString() каждого элемента в стандартный поток вывода или стандартный поток вывода ошибок. Или вы можете добавить префикс к потоку вывода, это может помочь различать разные вызовы печати, если параллелизм больше 1, то вывод также будет иметь флаг, указывающий, какая задача его произвела.

4. writeUsingOutputFormat

Метод и базовый класс (FileOutputFormat) для вывода пользовательского файла, который поддерживает преобразование пользовательских объектов в байты.

5. writeToSocket

Запишите элемент в сокет.

5.6 Transformation

1. map

Поток данных → Поток данных: введите параметр для создания параметра.

    // 初始化 Flink 执行环境
    val executionEnvironment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    val dataStream: DataStream[String] = executionEnvironment.readTextFile("test00.txt")
    // 针对每一行数据前面添加指定字符串
    val mapDataStream: DataStream[String] = dataStream.map("Apache:" + _)
    mapDataStream.print()

    // 启动 Flink 应用
    executionEnvironment.execute("test")

2. flatMap

Поток данных → Поток данных: Введите параметр, создайте 0, 1 или более выходов.

val dataStream: DataStream[String] = executionEnvironment.readTextFile("test00.txt")
    // 将每行数据按照空格分割成集合,最终 "压平"
    val mapDataStream: DataStream[String] = dataStream.flatMap(_.split(" "))
    mapDataStream.print()

3. filter

Поток данных → Поток данных: разрешает логическое значение каждого элемента и возвращает элемент, логическое значение которого равно true.

val dataStream: DataStream[String] = executionEnvironment.readTextFile("test00.txt")
val mapDataStream: DataStream[String] = dataStream.filter(_.contains("kafka"))

4. Connect

DataStream, DataStream → ConnectedStreams: Соедините два потока данных, сохраняющих свой тип. После соединения двух потоков данных они помещаются только в один и тот же поток, а их данные и формы внутри остаются неизменными. Два потока независимы друг от друга.

  // 初始化 Flink 执行环境
    val executionEnvironment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    val dataStream: DataStream[String] = executionEnvironment.readTextFile("test00.txt")
    val listDataStream: DataStream[Int] = executionEnvironment.fromCollection(List(1, 2, 3))
    val connStreams: ConnectedStreams[String, Int] = dataStream.connect(listDataStream)
    // map函数中的第一个函数作用于 ConnectedStreams 的第一个 DataStream;第二个函数作用于第二个 DataStream
    connStreams.map(e => println(e + "-----"), println(_))

    // 启动 Flink 应用
    executionEnvironment.execute("test")

Результаты теста следующие:

Операции map и flatMap для ConnectedStreams называются CoMap, CoFlatMap.

Воздействуя на ConnectedStreams, функция такая же, как у map и flatMap, а map и flatMap обрабатываются отдельно для каждого потока в ConnectedStreams.

5. split

Поток данных → Разделенный поток: разделить поток данных на два или более потоков данных в соответствии с определенными характеристиками.

    val dataStream: DataStream[String] = executionEnvironment.readTextFile("test00.txt")
    val flatMapDStream: DataStream[String] = dataStream.flatMap(_.split(" "))
    val splitDStream: SplitStream[String] = flatMapDStream.split(e => "hadoop".equals(e) match {
      case true => List("hadoop")
      case false => List("other")
    })

    splitDStream.select("hadoop").print()

Обычно используется с оператором выбора

6. Union

Поток данных → Поток данных: операция объединения двух или более потоков данных для создания нового потока данных, содержащего все элементы потока данных. Примечание. Если вы объедините поток данных с самим собой, вы увидите, что каждый элемент появляется дважды в новом потоке данных.

    val listDStream: DataStream[Int] = executionEnvironment.fromCollection(List(1,2))
    val unionDStream: DataStream[Int] = listDStream.union(listDStream)
    unionDStream.print()

7. KeyBy

DataStream → KeyedStream: ввод должен быть типа Tuple, логически разбивает поток на непересекающиеся разделы, каждый раздел содержит элементы с одним и тем же ключом, реализованным внутри как хэш

    val dataStream: DataStream[String] = executionEnvironment.readTextFile("test00.txt")
    val kvDStream: DataStream[(String, Int)] = dataStream.flatMap(_.split(" ")).map((_, 1))
    val result: KeyedStream[(String, Int), String] = kvDStream.keyBy(_._1)
    result.print()

Обычно используется в сочетании с операторами агрегации, такими как сокращение

8. Reduce,Fold,Aggregations

KeyedStream → DataStream: операция агрегирования сгруппированного потока данных, объединяющая текущий элемент и результат последней агрегации для создания нового значения, а возвращаемый поток содержит результат каждой агрегации, а не возвращает только окончательный результат последней агрегации.

    val dataStream: DataStream[String] = executionEnvironment.readTextFile("test00.txt")
    val kvDStream: DataStream[(String, Int)] = dataStream.flatMap(_.split(" ")).map((_, 1))
    val result: KeyedStream[(String, Int), String] = kvDStream.keyBy(_._1)
    val reduceDStream: DataStream[(String, Int)] = result.reduce((iter1, iter2) => (iter1._1, iter1._2 + iter2._2))
    reduceDStream.print()

Можно обнаружить, что Flink не возвращает окончательные общие статистические результаты, как Spark, а возвращает результаты для каждой статистики агрегации, поэтому необходимо использовать окно Flink для статистики агрегации данных (аналогично свертыванию и агрегации).

На самом деле операторы агрегации, такие как сокращение, свертывание и агрегация, используются вместе с Window, и только с ним можно получить желаемые результаты.

fold

KeyedStream → DataStream: операция последовательного сворачивания сгруппированного потока данных с начальным значением, объединяет текущий элемент с результатом предыдущей операции сворачивания и создает новое значение, а возвращаемый поток содержит результат каждого свертывания, а не только последнего. сложите окончательный результат.

Aggregations

KeyedStream → DataStream: скользящие операции агрегирования потоков пакетированных данных. Разница между min и minBy заключается в том, что min возвращает минимальное значение, а minBy возвращает элемент, поле которого содержит минимальное значение (тот же принцип применяется к max и maxBy), а возвращаемый поток содержит результат каждой агрегации, а не только возвращает окончательный результат последней агрегации.

Шесть раз и окно (акцент)

6.1 Time

В потоковой обработке Flink используются различные концепции времени, как показано на следующем рисунке.

Event Time: время создания события. Обычно это описывается временной меткой в ​​событии, например, в собранных данных журнала каждый журнал записывает собственное время генерации, и Flink получает доступ к временной метке события через распространителя временных меток.

Ingestion Time: время ввода данных Flink.

Processing Time: локальное системное время каждого оператора, выполняющего операции, основанные на времени. Оно связано с машиной. Атрибутом времени по умолчанию является время обработки.

Например, время входа журнала во Flink — 2017-11-12 10:00:00.123, а системное время входа в окно — 2017-11-12 10:00:01 234. Содержимое журнала выглядит следующим образом. :

2017-11-02 18:37:15.624 INFO Fail over to rm2

Какое время наиболее важно для бизнеса, чтобы подсчитать количество журналов отказов за 1 минуту? -- eventTime, потому что мы хотим считать по времени создания журнала.

Обычно нам нужно указать, какие данные в журнале относятся к eventTime.

6.2 Window

Окна можно разделить на две категории:

  • CountWindow: создание окна в соответствии с указанным количеством гистограмм независимо от времени.

  • TimeWindow: генерировать окно по времени.

Для TimeWindow его можно разделить на три категории в соответствии с различными принципами реализации окна: переворачивающееся окно, скользящее окно и окно сеанса.

Для CountWindow его можно разделить на скользящее окно и скользящее окно.

1. Кувыркающиеся окна

Разделите данные в соответствии с фиксированной длиной окна.

Функции:Выравнивание по времени, фиксированная длина окна, отсутствие перекрытия.

Назначатель окна прокрутки назначает каждый элемент окну определенного размера окна. Окно прокрутки имеет фиксированный размер и не перекрывается. Например: если вы укажете скользящее окно размером 5 минут, окно будет создано, как показано ниже:

Применимая сцена: подходит для ведения статистики BI и т. д. (для выполнения агрегированных расчетов за каждый период времени).

2. Раздвижные окна

Скользящее окно является более обобщенной формой фиксированного окна.Скользящее окно состоит из фиксированной длины окна и скользящего интервала..

Функции:Выравнивание по времени, фиксированная длина окна, перекрытие.

Назначатель скользящего окна назначает элементы окнам фиксированной длины, подобно скользящим окнам, размер окна настраивается параметром размера окна, а другой параметр скользящего окна определяет, как часто запускается скользящее окно. Следовательно, скользящие окна могут перекрываться, если параметр скольжения меньше размера окна, и в этом случае элементы будут распределены по нескольким окнам.

Например, если у вас есть 10-минутное окно и 5-минутный слайд, то 5-минутное окно в каждом окне содержит часть данных, сгенерированных за предыдущие 10 минут, как показано на следующем рисунке:

Применимая сцена: статистика за последний период времени (узнайте частоту отказов интерфейса за последние 5 минут, чтобы решить, следует ли подавать сигнал тревоги).

3. Сессионные окна

Он состоит из серии событий в сочетании с тайм-аутом определенной продолжительности, аналогично сеансу веб-приложения, то есть новое окно будет создано, если в течение определенного периода времени не будут получены новые данные..

Функции:несовпадение времени.

Назначатель окна сеанса группирует элементы по активности сеанса. По сравнению со скользящими окнами и скользящими окнами, окна сеанса не имеют перекрывающихся и фиксированных времени начала и окончания. Наоборот,Когда он больше не получает элементы в течение фиксированного периода времени, т.е. происходит интервал бездействия, окно закрывается.. Окно сеанса настраивается с помощью интервала сеанса, который определяет продолжительность периода бездействия.При наступлении этого периода бездействия текущий сеанс будет закрыт, а последующие элементы будут отнесены к новому окну сеанса.

4. Window API

CountWindow

CountWindow инициирует выполнение в соответствии с количеством элементов с одним и тем же ключом в окне и вычисляет результат только для ключа, количество элементов которого достигает размера окна. Примечание. Размер окна CountWindow относится к количеству элементов с одним и тем же ключом, а не к общему количеству всех входных элементов.

  • окно прокрутки

    По умолчанию CountWindow представляет собой скользящее окно. Вам нужно только указать размер окна. Когда количество элементов достигнет размера окна, будет запущено выполнение окна.

      def main(args: Array[String]): Unit = {
        // 初始化 Flink 执行环境
        val executionEnvironment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    
        val socketDStream: DataStream[String] = executionEnvironment.socketTextStream("localhost",1234)
        val mapDStream: DataStream[(String, Int)] = socketDStream.map(e => {
          val strings: Array[String] = e.split(" ")
          (strings(0), strings(1).toInt)
        })
        val keyDStream: KeyedStream[(String, Int), Tuple] = mapDStream.keyBy(0)
        // 只有等相同key 的元素个数达到3的时候才会进行 reduce 和 print 操作
        val windowDStream: WindowedStream[(String, Int), Tuple, GlobalWindow] = keyDStream.countWindow(3)
        val reduceDStream: DataStream[(String, Int)] = windowDStream.reduce((e1,e2)=>(e1._1,e1._2+e2._2))
        reduceDStream.print()
    
        // 启动 Flink 应用
        executionEnvironment.execute("test")
      }
    

    Результаты теста следующие:

  • раздвижное окно

    Имена функций скользящего окна и скользящего окна точно такие же, но при передаче параметров вам нужно передать два параметра: один — window_size, а другой — скользящий_размер.

    Скользящему_размеру в следующем коде присваивается значение 2, то есть он вычисляется один раз каждый раз при получении двух данных одного и того же ключа, а диапазон окна каждого вычисления — первые 4 элемента ключа.

      def main(args: Array[String]): Unit = {
        // 初始化 Flink 执行环境
        val executionEnvironment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    
        val socketDStream: DataStream[String] = executionEnvironment.socketTextStream("localhost",1234)
        val mapDStream: DataStream[(String, Int)] = socketDStream.map(e => {
          val strings: Array[String] = e.split(" ")
          (strings(0), strings(1).toInt)
        })
        val keyDStream: KeyedStream[(String, Int), Tuple] = mapDStream.keyBy(0)
        // 只有等相同key 的元素个数达到2的时候才会对该 key 的前4条数据进行 reduce 和 print 操作
        val windowDStream: WindowedStream[(String, Int), Tuple, GlobalWindow] = keyDStream.countWindow(4,2)
        val reduceDStream: DataStream[(String, Int)] = windowDStream.reduce((e1,e2)=>(e1._1,e1._2+e2._2))
        reduceDStream.print()
    
        // 启动 Flink 应用
        executionEnvironment.execute("test")
      }
    }
    

TimeWindow

TimeWindow предназначен для объединения всех данных в указанном временном диапазоне в окно и одновременного расчета всех данных в окне.

  • окно прокрутки

    Временное окно Flink по умолчанию делит окно в соответствии со временем обработки и делит данные, полученные Flink, на разные окна в соответствии со временем входа во Flink.

        // 初始化 Flink 执行环境
        val executionEnvironment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    
        val socketDStream: DataStream[String] = executionEnvironment.socketTextStream("localhost",1234)
        val mapDStream: DataStream[(String, Int)] = socketDStream.map(e => {
          val strings: Array[String] = e.split(" ")
          (strings(0), strings(1).toInt)
        })
        val keyDStream: KeyedStream[(String, Int), Tuple] = mapDStream.keyBy(0)
        // 每3 秒对进入该窗口的所有相同key 的数据进行reduce 和 print 操作
        val windowDStream: WindowedStream[(String, Int), Tuple, TimeWindow] = keyDStream.timeWindow(Time.seconds(3))
        val reduceDStream: DataStream[(String, Int)] = windowDStream.reduce((e1,e2)=>(e1._1,e1._2+e2._2))
        reduceDStream.print()
    
        // 启动 Flink 应用
        executionEnvironment.execute("test")
    

  • раздвижное окно

    Имена функций скользящего окна и скользящего окна точно такие же, но при передаче параметров вам нужно передать два параметра: один — window_size, а другой — скользящий_размер.

    Для параметра slide_size в следующем коде установлено значение 2 с, то есть окно вычисляется каждые 2 с, а диапазон окна для каждого вычисления — все элементы в пределах 4 с.

        // 初始化 Flink 执行环境
        val executionEnvironment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    
        val socketDStream: DataStream[String] = executionEnvironment.socketTextStream("localhost",1234)
        val mapDStream: DataStream[(String, Int)] = socketDStream.map(e => {
          val strings: Array[String] = e.split(" ")
          (strings(0), strings(1).toInt)
        })
        val keyDStream: KeyedStream[(String, Int), Tuple] = mapDStream.keyBy(0)
        // 每2 秒对进入该窗口的所有数据进行前 4 秒数据的 reduce 和 print 操作
        val windowDStream: WindowedStream[(String, Int), Tuple, TimeWindow] = keyDStream.timeWindow(Time.seconds(4),Time
          .seconds(2))
        val reduceDStream: DataStream[(String, Int)] = windowDStream.reduce((e1,e2)=>(e1._1,e1._2+e2._2))
        reduceDStream.print()
    
        // 启动 Flink 应用
        executionEnvironment.execute("test")
    

Window Fold

Оконный поток → DataStream: функция, которая присваивает окну функцию сворачивания и возвращает свернутый результат.

// 获取执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment

// 创建SocketSource
val stream = env.socketTextStream("localhost", 11111,'\n',3)

// 对stream进行处理并按key聚合
val streamKeyBy = stream.map(item => (item, 1)).keyBy(0)

// 引入滚动窗口
val streamWindow = streamKeyBy.timeWindow(Time.seconds(5))

// 执行fold操作
val streamFold = streamWindow.fold(100){
  (begin, item) =>
	begin + item._2
}

// 将聚合数据写入文件
streamFold.print()

// 执行程序
env.execute("TumblingWindow")
Aggregation on Window

Оконный поток → DataStream: объединение всех элементов в окне. Разница между min и minBy заключается в том, что min возвращает минимальное значение, а minBy возвращает элемент, содержащий поле минимального значения (тот же принцип применяется к max и maxBy).

// 获取执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment

// 创建SocketSource
val stream = env.socketTextStream("localhost", 11111)

// 对stream进行处理并按key聚合
val streamKeyBy = stream.map(item => (item.split(" ")(0), item.split(" ")(1))).keyBy(0)

// 引入滚动窗口
val streamWindow = streamKeyBy.timeWindow(Time.seconds(5))

// 执行聚合操作
val streamMax = streamWindow.max(1)

// 将聚合数据写入文件
streamMax.print()

// 执行程序
env.execute("TumblingWindow")

Семь EventTime и водяной знак

7.1 Введение в EventTime

В потоковой обработке Flink в большинстве случаев используется eventTime. Как правило, ProcessingTime или IngestionTime принудительно используются только тогда, когда eventTime нельзя использовать..

Если вы хотите использовать EventTime, вам нужно ввести атрибут времени EventTime, и метод введения выглядит следующим образом:

val env = StreamExecutionEnvironment.getExecutionEnvironment

// 从调用时刻开始给env创建的每一个stream追加时间特征
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

Время журнала здесь — это время события, которое Flink анализирует в соответствии с нашими правилами, а не время обработки по умолчанию.

Временной интервал окна остается закрытым и открытым справа, а журнал на 2019-01-25 00:00:06 войдет во второе окно.

7.2 Введение водяного знака

Мы знаем, что есть процесс и время в середине обработки потока от генерации события, до прохождения через источник, а затем до оператора.Хотя в большинстве случаев данные, поступающие к оператору, идут в хронологическом порядке генерации события, но Не исключено, что out-of-order происходит по сетевым и другим причинам.Так называемый out-of-order означает, что последовательность событий, полученных Flink, не соответствует строго временной последовательности событий.

Тогда есть проблема в это время.Как только происходит беспорядок, если работа окна определяется только по событиюВремя, мы не можем знать, все ли данные на месте, но мы не можем ждать бесконечно.В это время есть Должен быть механизм для обеспечения определенного времени, после которого окно должно быть запущено для выполнения вычислений.Этот специальный механизм является водяным знаком.

Водяной знак – это механизм измерения времени выполнения события. Это скрытый атрибут самих данных, а сами данные содержат соответствующий водяной знак.

Водяной знак используется для обработки событий не по порядку, а правильная обработка событий не по порядку обычно реализуется с помощью механизма водяных знаков в сочетании с window.

Водяной знак в потоке данных используется для обозначения того, что данные, временная метка которых меньше водяного знака, уже поступили, поэтому выполнение окна также инициируется водяным знаком.

Водяной знак можно понимать как механизм срабатывания задержки.Мы можем установить время задержки t Водяного знака.Каждый раз система будет проверять максимальное maxEventTime в пришедших данных, а затем определять, что все данные, чье eventTime меньше maxEventTime - t Время остановки окна равно maxEventTime - t, затем окно запускается для выполнения.

Личная сводка: для каждой части данных, поступающих в окно, вычислите максимальное время события всех текущих данных, которые достигают окна, и вычтите это время события и время задержки (водяной знак), если разница больше, чем время окончания определенного окна. , то операция оконного оператора

Водяной знак упорядоченного потока показан на следующем рисунке: (Водяной знак установлен на 0)

Водяной знак неупорядоченного потока показан на следующем рисунке: (Водяной знак установлен на 2)

Когда Flink получает каждый фрагмент данных, он генерирует водяной знак, равный maxEvent во всех текущих поступающих данных.Time - Время задержки, то есть водяной знак, переносимый данными.Как только водяной знак, переносимый данными, наступает позже, чем время остановки текущего неактивированного окна, будет инициировано выполнение соответствующего окна. Поскольку водяные знаки переносятся данными, если новые данные не могут быть получены во время работы, окна, которые не запущены, никогда не будут запущены..

На приведенном выше рисунке мы установили максимально допустимое время задержки прибытия на 2 с, поэтому водяной знак, соответствующий событию с отметкой времени 7 с, равен 5 с, а водяной знак события с отметкой времени 12 с — 10 с. Если наше окно 1 1с~5с, окно 2 - 6с~10с, затем водяной знак, когда приходит событие с отметкой времени 7с, просто запускает окно 1, а водяной знак, когда приходит событие с отметкой времени 12с, просто запускает окно 2.

7.3 Тестовый код

    // 初始化 Flink 执行环境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    // 将 Flink 时间由默认的processingTime 设置为 eventTime
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    val source: DataStream[String] = env.socketTextStream("localhost", 1234)

    // 设置watermark 以及如何解析每条日志数据中的eventTime
    val stream: DataStream[String] = source.assignTimestampsAndWatermarks(
      new BoundedOutOfOrdernessTimestampExtractor[String](Time.seconds(0)) {
        override def extractTimestamp(element: String): Long = {
          val time: Long = element.split(" ")(0).toLong
          println(time)
          time
        }
      }
    )

    val keyStream: KeyedStream[(String, Int), Tuple] = stream.map(e => (e.split(" ")(1), 1)).keyBy(0)
    // 设置滚动窗口的长度为5秒,及每5秒的eventTime 间隔计算一次
    val windowStream: WindowedStream[(String, Int), Tuple, TimeWindow] = keyStream.window(TumblingEventTimeWindows.of(Time.seconds(5)))
    val reduceStream: DataStream[(String, Int)] = windowStream.reduce(
      (e1, e2) => (e1._1, e1._2 + e2._2)
    )
    reduceStream.print()

    env.execute("test")
  }

Тест выглядит следующим образом

Если для водяного знака установлено значение 2, расчет первого окна не будет выполняться до тех пор, пока в окно не войдет лог 7000 (миллисекунд) и больше этого времени.

Если установлен тип окна SlidingEventTimeWindows, то водяной знак влияет на время расчета скользящего окна, если интересно, можете попробовать сами

Если для типа окна задано значение EventTimeSessionWindows.withGap(Time.seconds(10)), то в результате временной интервал между двумя соседними данными должен быть больше указанного времени для запуска вычисления.

Восьмое резюме

Flink – это настоящий механизм потоковых вычислений. Благодаря низкой задержке и низкой отказоустойчивости он прекрасно решает задачу ровно один раз. Поскольку Flink имеет много преимуществ, все больше и больше предприятий начинают использовать Flink. В качестве потока фреймворк обработки, исходные технические фреймворки Storm и Spark были постепенно заменены.