2.3 Apache Flink DataStream API

Flink

1. Flink запускает модель

图 Flink查询模型

Выше приведена работающая модель Flink.Программа Flink в основном состоит из трех частей, а именно Source, Transformation и Sink. DataSource в основном отвечает за чтение данных, Transformation в основном отвечает за соответствующие операции преобразования, а Sink отвечает за вывод конечных данных.

2. Архитектура программы Flink

Каждая программа Flink содержит несколько следующих процессов:

  • Получить среду выполнения; (Execution Environment)
  • загрузить/создать исходные данные; (Источник)
  • Укажите преобразование этих данных; (Преобразование)
  • Укажите место для размещения результата расчета; (Раковина)
  • Запустить выполнение программы.

3. Environment

Среда выполнения StreamExecutionEnvironment является основой всех программ Flink.

Существует три способа создания среды выполнения:

StreamExecutionEnvironment.getExecutionEnvironment 
StreamExecutionEnvironment.createLocalEnvironment 
StreamExecutionEnvironment.createRemoteEnvironment

3.1 StreamExecutionEnvironment.getExecutionEnvironment

Создайте среду выполнения, она представляет собой контекст текущего выполнения программы. Если программа называется независимой, этот метод возвращает локальную среду выполнения; если вызывающую программу из командной строки клиент фиксирует в кластере, этот метод возвращает среду выполнения этого кластера, то есть getExecutionEnvironment принимает решение о выполнении в режиме запроса Какой возврат в операционную среду, это самый распространенный способ создания среды исполнения.

val env = StreamExecutionEnvironment.getExecutionEnvironment

3.2 StreamExecutionEnvironment.createLocalEnvironment

Возвращает локальную среду выполнения и требует указания параллелизма по умолчанию при вызове.

val env = StreamExecutionEnvironment.createLocalEnvironment(1)

3.3 StreamExecutionEnvironment.createRemoteEnvironment

Вернитесь в среду выполнения кластера и отправьте файл Jar на удаленный сервер. Вам необходимо указать IP-адрес и номер порта JobManager при вызове, а также указать пакет Jar для запуска в кластере.

val env = StreamExecutionEnvironment.createRemoteEnvironment(1)

4. Source

4.1 Файловый источник данных

  • readTextFile(path)

Считывает текстовый файл, который следует за столбцом спецификации TextInputFormat, и возвращает результат в виде строки.

val env = StreamExecutionEnvironment.getExecutionEnvironment 
val stream = env.readTextFile("/opt/modules/test.txt") stream.print() 
env.execute("FirstJob")
  • readFile(fileInputFormat, path)

Прочитайте файл в соответствии с указанным форматом файла.

val env = StreamExecutionEnvironment.getExecutionEnvironment 
val path = new Path("/opt/modules/test.txt") 
val stream = env.readFile(new TextInputFormat(path), "/opt/modules/test.txt") 
stream.print() env.execute("FirstJob")

4.2 Источник данных на основе сокетов

  • socketTextStream

Чтение информации из сокета, элементы могут быть разделены разделителем.

val env = StreamExecutionEnvironment.getExecutionEnvironment
 val stream = env.socketTextStream("localhost", 11111) 
stream.print() 
env.execute("FirstJob")

4.3 Источники данных на основе сбора

  • fromCollection(seq)

Создает поток данных из коллекции, в которой все элементы коллекции относятся к одному типу.

val env = StreamExecutionEnvironment.getExecutionEnvironment 
val list = List(1,2,3,4) 
val stream = env.fromCollection(list) 
stream.print() 
env.execute("FirstJob")
  • fromCollection(Iterator)

Создает поток данных из итератора, и итератор возвращает класс указанного типа данных элемента.

val env = StreamExecutionEnvironment.getExecutionEnvironment 
val iterator = Iterator(1,2,3,4) 
val stream = env.fromCollection(iterator)
stream.print() 
env.execute("FirstJob")
  • fromElements(elements:_*)

Создает поток данных из заданной последовательности объектов, все объекты должны быть одного типа.

val env = StreamExecutionEnvironment.getExecutionEnvironment 
val list = List(1,2,3,4) 
val stream = env.fromElements(list) 
stream.print() 
env.execute("FirstJob")
  • generateSequence(from, to)

Генерирует последовательность чисел параллельно из заданного интервала.

val env = StreamExecutionEnvironment.getExecutionEnvironment 
val stream = env.generateSequence(1,10) 
stream.print() 
env.execute("FirstJob")

4. Sink

Приемник данных потребляет данные в потоке данных и пересылает их в файл, сокет, внешнюю систему или распечатывает их.

Flink имеет множество встроенных форматов вывода, заключенных в операции DataStream.

4.1 writeAsText

Записывает элементы построчно (TextOutputFormat) в виде строк, полученных путем вызова метода toString() каждого элемента.

4.2 WriteAsCsv

Записывает кортежи в файл (CsvOutputFormat), разделенные запятыми.Разделение между строками и полями настраивается. Значение каждого поля поступает из метода toString() объекта.

4.3 print/printToErr

Выводит значение метода toString() каждого элемента в стандартный поток вывода или стандартный поток вывода ошибок. Или вы можете добавить префикс к потоку вывода, это может помочь различать разные вызовы печати, если параллелизм больше 1, то вывод также будет иметь флаг, указывающий, какая задача его произвела.

4.4 writeUsingOutputFormat

Метод и базовый класс (FileOutputFormat) для вывода пользовательского файла, который поддерживает преобразование пользовательских объектов в байты.

4.5 writeToSocket

Записывает элементы в сокеты в соответствии с SerializationSchema.

5. Transformaction

5.1 Map

DataStream → DataStream: ввод параметра создает параметр.

val env = StreamExecutionEnvironment.getExecutionEnvironment 
val stream = env.generateSequence(1,10) 
val streamMap = stream.map { x => x * 2 }
streamMap.print() env.execute("FirstJob")

Примечание: stream.print(): число перед каждой строкой представляет, какой параллельный поток выводит эту строку.

5.2 FlatMap

DataStream → DataStream: введите параметр, создайте 0, 1 или более выходных данных.

val env = StreamExecutionEnvironment.getExecutionEnvironment  
val stream = env.readTextFile("test.txt") 
val streamFlatMap = stream.flatMap{     x => x.split(" ") } 
streamFilter.print() 
env.execute("FirstJob")

5.3 Filter

DataStream → DataStream: устанавливает логическое значение каждого элемента и возвращает элемент, логическое значение которого равно true. Следующий пример отфильтровывает ненулевые элементы:

val env = StreamExecutionEnvironment.getExecutionEnvironment 
val stream = env.generateSequence(1,10) 
val streamFilter = stream.filter{     x => x == 1 } 
streamFilter.print() 
env.execute("FirstJob")

5.4 Connect

图 Connect算子

DataStream, DataStream → ConnectedStreams: соедините два потока данных, которые сохраняют свой тип. После соединения двух потоков данных они помещаются только в один и тот же поток, а их данные и формы остаются внутри неизменными. Потоки не зависят друг от друга.

val env = StreamExecutionEnvironment.getExecutionEnvironment
 
val stream = env.readTextFile("test.txt")
 
val streamMap = stream.flatMap(item => item.split(" ")).filter(item => item.equals("hadoop"))
val streamCollect = env.fromCollection(List(1,2,3,4))
 
val streamConnect = streamMap.connect(streamCollect)
 
streamConnect.map(item=>println(item), item=>println(item))
 
env.execute("FirstJob")

5.5 CoMap,CoFlatMap

图 CoMap/CoFlatMap

ConnectedStreams → DataStream: воздействуя на ConnectedStreams, функция такая же, как у map и flatMap, и каждый поток в ConnectedStreams обрабатывается соответственно map и flatMap.

val env = StreamExecutionEnvironment.getExecutionEnvironment
 
val stream1 = env.readTextFile("test.txt")
val streamFlatMap = stream1.flatMap(x => x.split(" "))
val stream2 = env.fromCollection(List(1,2,3,4))
val streamConnect = streamFlatMap.connect(stream2)
val streamCoMap = streamConnect.map(
    (str) => str + "connect",
    (in) => in + 100
)
 
env.execute("FirstJob")
val env = StreamExecutionEnvironment.getExecutionEnvironment
 
val stream1 = env.readTextFile("test.txt")
val stream2 = env.readTextFile("test1.txt")
val streamConnect = stream1.connect(stream2)
val streamCoMap = streamConnect.flatMap(
    (str1) => str1.split(" "),
    (str2) => str2.split(" ")
)
streamConnect.map(item=>println(item), item=>println(item))
 
env.execute("FirstJob")

5.6 Spilt

图 Split

DataStream → SplitStream: разделите DataStream на два или более DataStream в соответствии с определенными характеристиками.

val env = StreamExecutionEnvironment.getExecutionEnvironment
 
val stream = env.readTextFile("test.txt")
val streamFlatMap = stream.flatMap(x => x.split(" "))
val streamSplit = streamFlatMap.split(
  num =>
# 字符串内容为hadoop的组成一个DataStream,其余的组成一个DataStream
    (num.equals("hadoop")) match{
        case true => List("hadoop")
        case false => List("other")
    }
)
 
env.execute("FirstJob")

5.7 Select

图 Select

SplitStream→DataStream: получение одного или нескольких потоков данных из SplitStream.

val env = StreamExecutionEnvironment.getExecutionEnvironment
 
val stream = env.readTextFile("test.txt")
val streamFlatMap = stream.flatMap(x => x.split(" "))
val streamSplit = streamFlatMap.split(
  num =>
    (num.equals("hadoop")) match{
        case true => List("hadoop")
        case false => List("other")
    }
)
 
val hadoop = streamSplit.select("hadoop")
val other = streamSplit.select("other")
hadoop.print()
 
env.execute("FirstJob")

5.8 Union

图 Union

DataStream → DataStream: операция объединения выполняется над двумя или более потоками данных, в результате чего создается новый поток данных, содержащий все элементы потока данных. Примечание. Если вы объедините поток данных с самим собой, вы увидите, что каждый элемент появляется дважды в новом потоке данных.

val env = StreamExecutionEnvironment.getExecutionEnvironment
 
val stream1 = env.readTextFile("test.txt")
val streamFlatMap1 = stream1.flatMap(x => x.split(" "))
val stream2 = env.readTextFile("test1.txt")
val streamFlatMap2 = stream2.flatMap(x => x.split(" "))
val streamConnect = streamFlatMap1.union(streamFlatMap2)
 
env.execute("FirstJob")

5.9 KeyBy

DataStream → KeyedStream: ввод должен быть типа Tuple, который логически разбивает поток на непересекающиеся разделы, каждый раздел содержит элементы с одним и тем же ключом, реализованным внутри как хэш.

val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.readTextFile("test.txt")
val streamFlatMap = stream.flatMap{
    x => x.split(" ")
}
val streamMap = streamFlatMap.map{
    x => (x,1)
}
val streamKeyBy = streamMap.keyBy(0)
env.execute("FirstJob")

5.10 Reduce

KeyedStream → DataStream: операция агрегирования сгруппированного потока данных, объединяющая текущий элемент и результат последней агрегации для создания нового значения, а возвращаемый поток содержит результат каждой агрегации, а не только возвращает окончательный результат последняя совокупность.

val env = StreamExecutionEnvironment.getExecutionEnvironment
 
val stream = env.readTextFile("test.txt").flatMap(item => item.split(" ")).map(item => (item, 1)).keyBy(0)
 
val streamReduce = stream.reduce(
  (item1, item2) => (item1._1, item1._2 + item2._2)
)
 
streamReduce.print()
 
env.execute("FirstJob")

5.11 Fold

KeyedStream → DataStream: операция свертки сгруппированного потока данных с начальным значением, объединяющая текущий элемент и результат предыдущей операции свертки и создающая новое значение, возвращаемый поток содержит результаты каждой свертки, а не просто return Конечный результат последнего сгиба.

val env = StreamExecutionEnvironment.getExecutionEnvironment
 
val stream = env.readTextFile("test.txt").flatMap(item => item.split(" ")).map(item => (item, 1)).keyBy(0)
 
val streamReduce = stream.fold(100)(
  (begin, item) => (begin + item._2)
)
 
streamReduce.print()
 
env.execute("FirstJob")

5.12 Aggregations

KeyedStream → DataStream: скользящие операции агрегирования потоков пакетированных данных. Разница между min и minBy заключается в том, что min возвращает минимальное значение, а minBy возвращает элемент, поле которого содержит минимальное значение (тот же принцип применяется к max и maxBy), а возвращаемый поток содержит результат каждой агрегации, а не только возвращает окончательный результат последней агрегации.

keyedStream.sum(0)
keyedStream.sum("key")
keyedStream.min(0)
keyedStream.min("key")
keyedStream.max(0)
keyedStream.max("key")
keyedStream.minBy(0)
keyedStream.minBy("key")
keyedStream.maxBy(0)
keyedStream.maxBy("key")
 
val env = StreamExecutionEnvironment.getExecutionEnvironment
 
val stream = env.readTextFile("test02.txt").map(item => (item.split(" ")(0), item.split(" ")(1).toLong)).keyBy(0)
 
val streamReduce = stream.sum(1)
 
streamReduce.print()
 
env.execute("FirstJob")

Операторы до 5.10 могут напрямую воздействовать на поток, потому что они не являются агрегатными операциями, но после 5.10 вы обнаружите, что, хотя мы можем напрямую применять оператор агрегирования к неограниченным потоковым данным, но каждый результат агрегирования записывается, что часто не то, что мы хотим.На самом деле, операторы агрегации, такие как сокращение, свертывание и агрегация, все используются в сочетании с окном.Только с окном можно получить желаемые результаты.