1. Flink запускает модель
Выше приведена работающая модель Flink.Программа Flink в основном состоит из трех частей, а именно Source, Transformation и Sink. DataSource в основном отвечает за чтение данных, Transformation в основном отвечает за соответствующие операции преобразования, а Sink отвечает за вывод конечных данных.
2. Архитектура программы Flink
Каждая программа Flink содержит несколько следующих процессов:
- Получить среду выполнения; (Execution Environment)
- загрузить/создать исходные данные; (Источник)
- Укажите преобразование этих данных; (Преобразование)
- Укажите место для размещения результата расчета; (Раковина)
- Запустить выполнение программы.
3. Environment
Среда выполнения StreamExecutionEnvironment является основой всех программ Flink.
Существует три способа создания среды выполнения:
StreamExecutionEnvironment.getExecutionEnvironment
StreamExecutionEnvironment.createLocalEnvironment
StreamExecutionEnvironment.createRemoteEnvironment
3.1 StreamExecutionEnvironment.getExecutionEnvironment
Создайте среду выполнения, она представляет собой контекст текущего выполнения программы. Если программа называется независимой, этот метод возвращает локальную среду выполнения; если вызывающую программу из командной строки клиент фиксирует в кластере, этот метод возвращает среду выполнения этого кластера, то есть getExecutionEnvironment принимает решение о выполнении в режиме запроса Какой возврат в операционную среду, это самый распространенный способ создания среды исполнения.
val env = StreamExecutionEnvironment.getExecutionEnvironment
3.2 StreamExecutionEnvironment.createLocalEnvironment
Возвращает локальную среду выполнения и требует указания параллелизма по умолчанию при вызове.
val env = StreamExecutionEnvironment.createLocalEnvironment(1)
3.3 StreamExecutionEnvironment.createRemoteEnvironment
Вернитесь в среду выполнения кластера и отправьте файл Jar на удаленный сервер. Вам необходимо указать IP-адрес и номер порта JobManager при вызове, а также указать пакет Jar для запуска в кластере.
val env = StreamExecutionEnvironment.createRemoteEnvironment(1)
4. Source
4.1 Файловый источник данных
- readTextFile(path)
Считывает текстовый файл, который следует за столбцом спецификации TextInputFormat, и возвращает результат в виде строки.
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.readTextFile("/opt/modules/test.txt") stream.print()
env.execute("FirstJob")
- readFile(fileInputFormat, path)
Прочитайте файл в соответствии с указанным форматом файла.
val env = StreamExecutionEnvironment.getExecutionEnvironment
val path = new Path("/opt/modules/test.txt")
val stream = env.readFile(new TextInputFormat(path), "/opt/modules/test.txt")
stream.print() env.execute("FirstJob")
4.2 Источник данных на основе сокетов
- socketTextStream
Чтение информации из сокета, элементы могут быть разделены разделителем.
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.socketTextStream("localhost", 11111)
stream.print()
env.execute("FirstJob")
4.3 Источники данных на основе сбора
- fromCollection(seq)
Создает поток данных из коллекции, в которой все элементы коллекции относятся к одному типу.
val env = StreamExecutionEnvironment.getExecutionEnvironment
val list = List(1,2,3,4)
val stream = env.fromCollection(list)
stream.print()
env.execute("FirstJob")
- fromCollection(Iterator)
Создает поток данных из итератора, и итератор возвращает класс указанного типа данных элемента.
val env = StreamExecutionEnvironment.getExecutionEnvironment
val iterator = Iterator(1,2,3,4)
val stream = env.fromCollection(iterator)
stream.print()
env.execute("FirstJob")
- fromElements(elements:_*)
Создает поток данных из заданной последовательности объектов, все объекты должны быть одного типа.
val env = StreamExecutionEnvironment.getExecutionEnvironment
val list = List(1,2,3,4)
val stream = env.fromElements(list)
stream.print()
env.execute("FirstJob")
- generateSequence(from, to)
Генерирует последовательность чисел параллельно из заданного интервала.
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.generateSequence(1,10)
stream.print()
env.execute("FirstJob")
4. Sink
Приемник данных потребляет данные в потоке данных и пересылает их в файл, сокет, внешнюю систему или распечатывает их.
Flink имеет множество встроенных форматов вывода, заключенных в операции DataStream.
4.1 writeAsText
Записывает элементы построчно (TextOutputFormat) в виде строк, полученных путем вызова метода toString() каждого элемента.
4.2 WriteAsCsv
Записывает кортежи в файл (CsvOutputFormat), разделенные запятыми.Разделение между строками и полями настраивается. Значение каждого поля поступает из метода toString() объекта.
4.3 print/printToErr
Выводит значение метода toString() каждого элемента в стандартный поток вывода или стандартный поток вывода ошибок. Или вы можете добавить префикс к потоку вывода, это может помочь различать разные вызовы печати, если параллелизм больше 1, то вывод также будет иметь флаг, указывающий, какая задача его произвела.
4.4 writeUsingOutputFormat
Метод и базовый класс (FileOutputFormat) для вывода пользовательского файла, который поддерживает преобразование пользовательских объектов в байты.
4.5 writeToSocket
Записывает элементы в сокеты в соответствии с SerializationSchema.
5. Transformaction
5.1 Map
DataStream → DataStream: ввод параметра создает параметр.
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.generateSequence(1,10)
val streamMap = stream.map { x => x * 2 }
streamMap.print() env.execute("FirstJob")
Примечание: stream.print(): число перед каждой строкой представляет, какой параллельный поток выводит эту строку.
5.2 FlatMap
DataStream → DataStream: введите параметр, создайте 0, 1 или более выходных данных.
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.readTextFile("test.txt")
val streamFlatMap = stream.flatMap{ x => x.split(" ") }
streamFilter.print()
env.execute("FirstJob")
5.3 Filter
DataStream → DataStream: устанавливает логическое значение каждого элемента и возвращает элемент, логическое значение которого равно true. Следующий пример отфильтровывает ненулевые элементы:
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.generateSequence(1,10)
val streamFilter = stream.filter{ x => x == 1 }
streamFilter.print()
env.execute("FirstJob")
5.4 Connect
DataStream, DataStream → ConnectedStreams: соедините два потока данных, которые сохраняют свой тип. После соединения двух потоков данных они помещаются только в один и тот же поток, а их данные и формы остаются внутри неизменными. Потоки не зависят друг от друга.
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.readTextFile("test.txt")
val streamMap = stream.flatMap(item => item.split(" ")).filter(item => item.equals("hadoop"))
val streamCollect = env.fromCollection(List(1,2,3,4))
val streamConnect = streamMap.connect(streamCollect)
streamConnect.map(item=>println(item), item=>println(item))
env.execute("FirstJob")
5.5 CoMap,CoFlatMap
ConnectedStreams → DataStream: воздействуя на ConnectedStreams, функция такая же, как у map и flatMap, и каждый поток в ConnectedStreams обрабатывается соответственно map и flatMap.
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream1 = env.readTextFile("test.txt")
val streamFlatMap = stream1.flatMap(x => x.split(" "))
val stream2 = env.fromCollection(List(1,2,3,4))
val streamConnect = streamFlatMap.connect(stream2)
val streamCoMap = streamConnect.map(
(str) => str + "connect",
(in) => in + 100
)
env.execute("FirstJob")
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream1 = env.readTextFile("test.txt")
val stream2 = env.readTextFile("test1.txt")
val streamConnect = stream1.connect(stream2)
val streamCoMap = streamConnect.flatMap(
(str1) => str1.split(" "),
(str2) => str2.split(" ")
)
streamConnect.map(item=>println(item), item=>println(item))
env.execute("FirstJob")
5.6 Spilt
DataStream → SplitStream: разделите DataStream на два или более DataStream в соответствии с определенными характеристиками.
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.readTextFile("test.txt")
val streamFlatMap = stream.flatMap(x => x.split(" "))
val streamSplit = streamFlatMap.split(
num =>
# 字符串内容为hadoop的组成一个DataStream,其余的组成一个DataStream
(num.equals("hadoop")) match{
case true => List("hadoop")
case false => List("other")
}
)
env.execute("FirstJob")
5.7 Select
SplitStream→DataStream: получение одного или нескольких потоков данных из SplitStream.
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.readTextFile("test.txt")
val streamFlatMap = stream.flatMap(x => x.split(" "))
val streamSplit = streamFlatMap.split(
num =>
(num.equals("hadoop")) match{
case true => List("hadoop")
case false => List("other")
}
)
val hadoop = streamSplit.select("hadoop")
val other = streamSplit.select("other")
hadoop.print()
env.execute("FirstJob")
5.8 Union
DataStream → DataStream: операция объединения выполняется над двумя или более потоками данных, в результате чего создается новый поток данных, содержащий все элементы потока данных. Примечание. Если вы объедините поток данных с самим собой, вы увидите, что каждый элемент появляется дважды в новом потоке данных.
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream1 = env.readTextFile("test.txt")
val streamFlatMap1 = stream1.flatMap(x => x.split(" "))
val stream2 = env.readTextFile("test1.txt")
val streamFlatMap2 = stream2.flatMap(x => x.split(" "))
val streamConnect = streamFlatMap1.union(streamFlatMap2)
env.execute("FirstJob")
5.9 KeyBy
DataStream → KeyedStream: ввод должен быть типа Tuple, который логически разбивает поток на непересекающиеся разделы, каждый раздел содержит элементы с одним и тем же ключом, реализованным внутри как хэш.
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.readTextFile("test.txt")
val streamFlatMap = stream.flatMap{
x => x.split(" ")
}
val streamMap = streamFlatMap.map{
x => (x,1)
}
val streamKeyBy = streamMap.keyBy(0)
env.execute("FirstJob")
5.10 Reduce
KeyedStream → DataStream: операция агрегирования сгруппированного потока данных, объединяющая текущий элемент и результат последней агрегации для создания нового значения, а возвращаемый поток содержит результат каждой агрегации, а не только возвращает окончательный результат последняя совокупность.
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.readTextFile("test.txt").flatMap(item => item.split(" ")).map(item => (item, 1)).keyBy(0)
val streamReduce = stream.reduce(
(item1, item2) => (item1._1, item1._2 + item2._2)
)
streamReduce.print()
env.execute("FirstJob")
5.11 Fold
KeyedStream → DataStream: операция свертки сгруппированного потока данных с начальным значением, объединяющая текущий элемент и результат предыдущей операции свертки и создающая новое значение, возвращаемый поток содержит результаты каждой свертки, а не просто return Конечный результат последнего сгиба.
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.readTextFile("test.txt").flatMap(item => item.split(" ")).map(item => (item, 1)).keyBy(0)
val streamReduce = stream.fold(100)(
(begin, item) => (begin + item._2)
)
streamReduce.print()
env.execute("FirstJob")
5.12 Aggregations
KeyedStream → DataStream: скользящие операции агрегирования потоков пакетированных данных. Разница между min и minBy заключается в том, что min возвращает минимальное значение, а minBy возвращает элемент, поле которого содержит минимальное значение (тот же принцип применяется к max и maxBy), а возвращаемый поток содержит результат каждой агрегации, а не только возвращает окончательный результат последней агрегации.
keyedStream.sum(0)
keyedStream.sum("key")
keyedStream.min(0)
keyedStream.min("key")
keyedStream.max(0)
keyedStream.max("key")
keyedStream.minBy(0)
keyedStream.minBy("key")
keyedStream.maxBy(0)
keyedStream.maxBy("key")
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.readTextFile("test02.txt").map(item => (item.split(" ")(0), item.split(" ")(1).toLong)).keyBy(0)
val streamReduce = stream.sum(1)
streamReduce.print()
env.execute("FirstJob")
Операторы до 5.10 могут напрямую воздействовать на поток, потому что они не являются агрегатными операциями, но после 5.10 вы обнаружите, что, хотя мы можем напрямую применять оператор агрегирования к неограниченным потоковым данным, но каждый результат агрегирования записывается, что часто не то, что мы хотим.На самом деле, операторы агрегации, такие как сокращение, свертывание и агрегация, все используются в сочетании с окном.Только с окном можно получить желаемые результаты.