предисловие
Хорошо, давайте представим операторные операции Flink в этой статье.
Скачать Flink очень просто. После загрузки разархивируйте его, перейдите в каталог bin Flink и запустите start-cluster.sh. В это время мы можем посетить localhost: 8081, чтобы получить доступ к его красивой странице.
Чтобы остановить, используйте stop-cluster.sh
1. Операторы Flink
1.1 (Дополнительно) Использование Flink Shell
Для новичков легко сделать ошибки во время разработки.Если вы каждый раз упаковываете и отлаживаете, будет хлопотно и сложно найти проблему.Вы можете отлаживать в командной строке оболочки scala.
Подход оболочки scala поддерживает потоковую и пакетную обработку. При запуске командной строки оболочки автоматически создаются две разные ExecutionEnvironment. Используйте senv(Stream) и bev(Batch) для обработки потоковых и пакетных программ соответственно. (аналогично переменной sc в spark-shell)
bin/start-scala-shell.sh [local|remote|yarn] [options] <args>
Если мы столкнулись с вышеуказанной ошибкой, мы можем посмотреть информацию об этой ошибке.В ней говорится, что нам нужно подтвердить режим выполнения, поэтому нам нужно привести эту часть параметров.Есть три разных метода указания, а именно
[local | remote <host> <port> | yarn]
Давайте попробуем, начнем с местного
[root@node1 bin]# ./start-scala-shell.sh local
На данный момент я указал его режим работы локальный, и его можно успешно открыть.
···? ? ?
🤣, я полагаю, что у вас также может быть моя текущая ситуация, и сообщите об ошибке в это время"Не удалось создать DispatcherResourceManagerComponent"
На данный момент, если мы хотим решить эту проблему, мы можем cd /usr/local/flink-1.10.0/conf и добавить такой параметр
После изменения порта он может успешно работать.
Удаленный метод аналогичен методу на пряже, вы также можете запустить его, если хотите.
[root@node1 bin]# ./start-scala-shell.sh remote 192.168.200.11 8081
На данный момент мы успешно запустились, переехали
И осторожные друзья должны были обнаружить, что он показывает нам два примера пакетной обработки Flink и обработки в реальном времени.
Конечно, это не очень важно, потому что Flink-shell гораздо менее удобен в использовании, чем Spark-shell, поэтому мы просто пытаемся его открыть.
1.2 Источник данных Flink
Помните, что мы говорили в то время, чтобы понять программу реального времени, нам в основном нужно понять три аспекта: источник данных, обработка данных и вывод данных, тогда давайте сначала рассмотрим источник данных Flink.
1.2.1 Введение в источник в реальном времени
source - это ввод источника данных программы, вы можете добавить источник в свою программу с помощью StreamExecutionEnvironment.addSource(sourceFunction).
flink предоставляет большое количество уже реализованных исходных методов, а также вы можете настроить исходный код (позже будет соответствующее небольшое демо, просто скопируйте его в свою IDEA и запустите):
Настройка источников без параллелизма путем реализации интерфейса sourceFunction
Настройте источники с помощью параллелизма, реализуя интерфейс ParallelSourceFunction или наследуя RichParallelSourceFunction.
Но в большинстве случаев мы можем использовать исходный код, который поставляется вместе с ним.
1.2.2 Как получить исходный код
1. Основанный на документе
readTextFile(path)
读取文本文件,文件遵循TextInputFormat 读取规则,逐行读取并返回。
2. В зависимости от сокета
socketTextStream
从socket中读取数据,元素可以通过一个分隔符切开。
3. Установить на основе
fromCollection(Collection)
通过java 的collection集合创建一个数据流,集合中的所有元素必须是相同类型的。
4. Пользовательский ввод
addSource 可以实现读取第三方数据源的数据
系统内置提供了一批connectors,连接器会提供对应的source支持【kafka】
На официальном сайте также упоминаются другие источники данных, но в конце концов основное внимание уделяется Кафке, так что вы можете понять других.
- Apache Kafka (source/sink) Анализ позже
- Apache Cassandra (sink)
- Amazon Kinesis Streams (source/sink)
- Elasticsearch (sink)
- Hadoop FileSystem (sink)
- RabbitMQ (source/sink)
- Apache NiFi (source/sink)
- Twitter Streaming API (source)
1.2.3 Сбор источников данных (код можно скопировать и запустить напрямую)
public class StreamingSourceFromCollection {
public static void main(String[] args) throws Exception {
//步骤一:获取环境变量
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//步骤二:模拟数据
ArrayList<String> data = new ArrayList<String>();
data.add("hadoop");
data.add("spark");
data.add("flink");
//步骤三:获取数据源
DataStreamSource<String> dataStream = env.fromCollection(data);
//步骤四:transformation操作
SingleOutputStreamOperator<String> addPreStream = dataStream.map(new MapFunction<String, String>() {
@Override
// 简单地遍历一下数据
public String map(String word) throws Exception {
return "testCollection_" + word;
}
});
//步骤五:对结果进行处理(打印)
addPreStream.print().setParallelism(1);
//步骤六:启动程序
env.execute("StreamingSourceFromCollection");
}
}
выходной результат
1.2.4 Настройка однопараллельного источника данных (код можно скопировать и запустить напрямую)
Смоделируйте источник данных, который производит часть данных каждую секунду
/**
* 功能:每秒产生一条数据
*/
public class MyNoParalleSource implements SourceFunction<Long> {
private long number = 1L;
private boolean isRunning = true;
@Override
public void run(SourceContext<Long> sct) throws Exception {
while (isRunning){
sct.collect(number);
number++;
//每秒生成一条数据
Thread.sleep(1000);
}
}
@Override
public void cancel() {
isRunning=false;
}
}
В это время мы обрабатываем этот источник данных, и обработка очень проста, то есть выполняется операция сопоставления и операция фильтра, и фильтр заключается в выборе четных чисел.
/**
* 功能:从自定义的数据数据源里面获取数据,然后过滤出偶数
*/
public class StreamingDemoWithMyNoPralalleSource {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 接收数据源
DataStreamSource<Long> numberStream = env.addSource(new MyNoParalleSource()).setParallelism(1);
SingleOutputStreamOperator<Long> dataStream = numberStream.map(new MapFunction<Long, Long>() {
@Override
public Long map(Long value) throws Exception {
System.out.println("接受到了数据:"+value);
return value;
}
});
SingleOutputStreamOperator<Long> filterDataStream = dataStream.filter(new FilterFunction<Long>() {
@Override
public boolean filter(Long number) throws Exception {
return number % 2 == 0;
}
});
filterDataStream.print().setParallelism(1);
env.execute("StreamingDemoWithMyNoPralalleSource");
}
}
Результат бега есть
1.2.5 Пользовательский мультипараллельный источник данных
/**
* 每秒产生一条数据
*/
public class MyParalleSource implements ParallelSourceFunction<Long> {
private long number = 1L;
private boolean isRunning = true;
@Override
public void run(SourceContext<Long> sct) throws Exception {
while (isRunning){
sct.collect(number);
number++;
//每秒生成一条数据
Thread.sleep(1000);
}
}
@Override
public void cancel() {
isRunning=false;
}
}
Здесь мы видим, что мы просто реализуем другой интерфейс, а затем устанавливаем степень параллелизма в бизнес-коде.
public class StreamingDemoWithMyPralalleSource {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 这个代码.setParallelism(2)设置了并行2
DataStreamSource<Long> numberStream = env.addSource(new MyParalleSource()).setParallelism(2);
SingleOutputStreamOperator<Long> dataStream = numberStream.map(new MapFunction<Long, Long>() {
@Override
public Long map(Long value) throws Exception {
System.out.println("接受到了数据:"+value);
return value;
}
});
SingleOutputStreamOperator<Long> filterDataStream = dataStream.filter(new FilterFunction<Long>() {
@Override
public boolean filter(Long number) throws Exception {
return number % 2 == 0;
}
});
filterDataStream.print().setParallelism(1);
env.execute("StreamingDemoWithMyNoPralalleSource");
}
}
1.3 Общие операторы преобразования Flink
1.3.1 Карта и фильтр (только что продемонстрировано)
1.3.2 flatMap, keyBy, sum, union (то же, что и Spark)
1.3.3 подключение, MapFunction и coMapFunction
Операция соединения недоступна в spark, поэтому взгляните на нее. Она похожа на объединение, но может соединять только два потока. Типы данных двух потоков могут быть разными, и к ним будут применяться разные методы обработки. данные в двух потоках.Разница между CoMapFunction и MapFunction в том, что обработка данных одного потока стала обработкой двух потоков (обратите внимание, что их может быть только два).
public class ConnectionDemo {
public static void main(String[] args) throws Exception {
//获取Flink的运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//获取数据源
//注意:针对此source,并行度只能设置为1
DataStreamSource<Long> text1 = env.addSource(new MyNoParalleSource()).setParallelism(1);
DataStreamSource<Long> text2 = env.addSource(new MyNoParalleSource()).setParallelism(1);
SingleOutputStreamOperator<String> text2_str = text2.map(new MapFunction<Long, String>() {
@Override
public String map(Long value) throws Exception {
// 这里是第二个数据源,字符串我加了一个前缀str_
return "str_" + value;
}
});
ConnectedStreams<Long, String> connectStream = text1.connect(text2_str);
SingleOutputStreamOperator<Object> result = connectStream.map(new CoMapFunction<Long, String, Object>() {
@Override
public Object map1(Long value) throws Exception {
// 在这里可以进行业务处理
return value;
}
@Override
public Object map2(String value) throws Exception {
// 在这里也可以进行业务处理
return value;
}
});
//打印结果
result.print().setParallelism(1);
String jobName = ConnectionDemo.class.getSimpleName();
env.execute(jobName);
}
}
Также возможно, что два потока данных в выходном результате не являются одним для вас и одним для меня, но возможно, что когда один поток быстрый, сначала идет более одного потока.
1.3.4 Разделить и выбрать
Функция этого состоит в том, чтобы разрезать поток данных на несколько потоков данных.
Может быть так, что в реальной работе в исходном потоке данных смешивается множество однотипных данных, а правила обработки разных типов данных разные, поэтому по определенным правилам,
Разделите поток данных на несколько потоков данных, чтобы каждый поток данных мог использовать различную логику обработки, иselect — это функция, которая помогает нам извлекать разные потоки
public class SplitDemo {
public static void main(String[] args) throws Exception {
//获取Flink的运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//获取数据源
DataStreamSource<Long> text = env.addSource(new MyNoParalleSource()).setParallelism(1);//注意:针对此source,并行度只能设置为1
//对流进行切分,按照数据的奇偶性进行区分
SplitStream<Long> splitStream = text.split(new OutputSelector<Long>() {
@Override
public Iterable<String> select(Long value) {
ArrayList<String> outPut = new ArrayList<>();
if (value % 2 == 0) {
outPut.add("even");//偶数
} else {
outPut.add("odd");//奇数
}
return outPut;
}
});
//选择一个或者多个切分后的流
DataStream<Long> evenStream = splitStream.select("even");
DataStream<Long> oddStream = splitStream.select("odd");
DataStream<Long> moreStream = splitStream.select("odd","even");
//打印结果,此时我选择的全是偶数的数据
evenStream.print().setParallelism(1);
String jobName = SplitDemo.class.getSimpleName();
env.execute(jobName);
}
}
результат операции
1.4 Общие операторы приемника Flink
Вывод данных на самом деле относительно прост, я думаю, эту штуку можно и не разворачивать в связке с кодом, и ее можно будет грубо пропустить.
1.4.1 print() и printToErr()
Распечатать значение метода toString() каждого элемента в стандартный поток вывода или стандартный поток вывода ошибок
1.4.2 writeAsText()
/**
* 数据源:1 2 3 4 5.....源源不断过来
* 通过map打印一下接受到数据
* 通过filter过滤一下数据,我们只需要偶数
*/
public class WriteTextDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Long> numberStream = env.addSource(new MyNoParalleSource()).setParallelism(1);
SingleOutputStreamOperator<Long> dataStream = numberStream.map(new MapFunction<Long, Long>() {
@Override
public Long map(Long value) throws Exception {
System.out.println("接受到了数据:"+value);
return value;
}
});
SingleOutputStreamOperator<Long> filterDataStream = dataStream.filter(new FilterFunction<Long>() {
@Override
public boolean filter(Long number) throws Exception {
return number % 2 == 0;
}
});
// 没有集群的小伙伴也可以指定一个本地的路径,并写入一个文件中
filterDataStream.writeAsText("your path").setParallelism(1);
env.execute("StreamingDemoWithMyNoPralalleSource");
}
}
1.4.3 Пользовательский приемник
В дополнение к следующему, что мы упоминали выше
- Apache Kafka (source/sink) Анализ позже
- Apache Cassandra (sink)
- Amazon Kinesis Streams (source/sink)
- Elasticsearch (sink)
- Hadoop FileSystem (sink)
- RabbitMQ (source/sink)
- Apache NiFi (source/sink)
- Twitter Streaming API (source)
Конечно, текущая ситуация с моей стороны заключается в записи данных в Redis.На данный момент нам нужно сначала ввести зависимость.
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
</dependency>
Если вы хотите узнать о друзьях Redis, вы можете самостоятельно перейти на веб-сайт, например учебник для новичков.Следующий код был аннотирован.
/**
* 把数据写入redis
*/
public class SinkForRedisDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> text = env.socketTextStream("xxx", xxx, "\n");
//lpush l_words word
//对数据进行组装,把string转化为tuple2<String,String>
DataStream<Tuple2<String, String>> l_wordsData = text.map(new MapFunction<String, Tuple2<String, String>>() {
@Override
public Tuple2<String, String> map(String value) throws Exception {
return new Tuple2<>("l_words", value);
}
});
//创建redis的配置
FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost(xxx).setPort(xxx).build();
//创建redissink
RedisSink<Tuple2<String, String>> redisSink = new RedisSink<>(conf, new MyRedisMapper());
l_wordsData.addSink(redisSink);
env.execute("StreamingDemoToRedis");
}
public static class MyRedisMapper implements RedisMapper<Tuple2<String, String>> {
//表示从接收的数据中获取需要操作的redis key
@Override
public String getKeyFromData(Tuple2<String, String> data) {
return data.f0;
}
//表示从接收的数据中获取需要操作的redis value
@Override
public String getValueFromData(Tuple2<String, String> data) {
return data.f1;
}
@Override
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(RedisCommand.LPUSH);
}
}
}
1.5 Операторы пакетной обработки
Пакетная обработка Flink, как правило, средняя, и она редко используется в корпоративной разработке. Тем не менее, команда, ответственная за нее, по-прежнему очень усердна, поэтому предполагается, что в ближайшем будущем она станет еще лучше. нашего предыдущего Spark - функция ядра только что рассмотрена
1.5.1 source
файл на основе
readTextFile(path)
установить на основе
fromCollection(Collection)
1.5.2 transform
Обзор оператора:
Карта: введите элемент, а затем верните элемент, вы можете выполнить некоторую очистку и преобразование в середине
FlatMap: ввод элемента, может возвращать ноль, один или несколько элементов
MapPartition>: Аналогично карте, обработка данных одного раздела за раз [Если вам нужно получить ссылки на сторонние ресурсы во время обработки карты, рекомендуется использовать MapPartition]
Фильтр: функция фильтрации, оценка входящих данных, данные, соответствующие условиям, будут оставлены.
Уменьшить: агрегировать данные, объединить текущий элемент и значение, возвращенное последним сокращением, для выполнения операции агрегирования, а затем вернуть новое значение.
Агрегат: сумма, максимум, минимум и т. д.
Distinct: возвращает элементы в наборе данных после дедупликации, data.distinct().
Соединение: внутреннее соединение
OuterJoin: внешняя ссылка
Крест: получить декартово произведение двух наборов данных
Union: возвращает сумму двух наборов данных, типы данных должны быть согласованы.
First-n: получить первые N элементов в коллекции.
Сортировка раздела: сортирует все разделы набора данных локально и завершает сортировку нескольких полей с помощью связанного вызова sortPartition().
1.5.3 sink
- writeAsText(): записывает элементы построчно в виде строк, полученных путем вызова метода toString() каждого элемента.
- writeAsCsv(): записывает кортежи в файл, разделяя их запятыми.Разделение между строками и полями настраивается. Значение каждого поля поступает из метода toString() объекта.
- print(): печатает значение метода toString() каждого элемента в стандартный поток вывода или стандартный поток вывода ошибок.
1.5.4 Широковещательные переменные Flink
Требование: flink получает имя пользователя из источника данных и, наконец, должен распечатать имя пользователя и информацию о возрасте.
Анализ: Следовательно, необходимо получить информацию о возрасте пользователя во время промежуточной обработки карты и использовать широковещательную переменную для обработки реляционного набора данных пользователя.
Мы использовали RichMapFunction ниже.Функция этой штуки состоит в том, чтобы добавить процесс инициализации на основе mapFunction.Во время этого процесса инициализации я могу получить широковещательную переменную и получить значение возраста на карте, а затем поставить res для вывода.
public class BroadCastDemo {
public static void main(String[] args) throws Exception{
//获取运行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//1:准备需要广播的数据
ArrayList<Tuple2<String, Integer>> broadData = new ArrayList<>();
broadData.add(new Tuple2<>("zhangsan",18));
broadData.add(new Tuple2<>("lisi",19));
broadData.add(new Tuple2<>("wangwu",20));
DataSet<Tuple2<String, Integer>> tupleData = env.fromCollection(broadData);
//处理需要广播的数据,把数据集转换成map类型,map中的key就是用户姓名,value就是用户年龄
DataSet<HashMap<String, Integer>> toBroadcast = tupleData.map(new MapFunction<Tuple2<String, Integer>, HashMap<String, Integer>>() {
@Override
public HashMap<String, Integer> map(Tuple2<String, Integer> value) throws Exception {
HashMap<String, Integer> res = new HashMap<>();
res.put(value.f0, value.f1);
return res;
}
});
//源数据
DataSource<String> data = env.fromElements("zhangsan", "lisi", "wangwu");
//注意:在这里需要使用到RichMapFunction获取广播变量
DataSet<String> result = data.map(new RichMapFunction<String, String>() {
List<HashMap<String, Integer>> broadCastMap = new ArrayList<HashMap<String, Integer>>();
HashMap<String, Integer> allMap = new HashMap<String, Integer>();
/**
* 这个方法只会执行一次
* 可以在这里实现一些初始化的功能
* 所以,就可以在open方法中获取广播变量数据
*/
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
//获取广播数据
this.broadCastMap = getRuntimeContext().getBroadcastVariable("broadCastMapName");
for (HashMap map : broadCastMap) {
allMap.putAll(map);
}
}
@Override
public String map(String value) throws Exception {
Integer age = allMap.get(value);
return value + "," + age;
}
}).withBroadcastSet(toBroadcast, "broadCastMapName");//执行广播数据的操作
result.print();
}
}
Время выполнения будет больше, а напечатанный результат - Чжан Сан, Ли Си, Ван Ву и их возраст.
1.5.5 Счетчик Flink
Аккумулятор — это аккумулятор, аналогичный сценарию применения счетчика Mapreduce, он хорошо может наблюдать за изменением данных задач во время работы.
Аккумулятором можно управлять в операторной функции в задаче задания Flink, но окончательный результат работы аккумулятора можно получить только после завершения выполнения задачи.
Счетчик — это конкретная реализация аккумулятора (Accumulator).
IntCounter, LongCounter и DoubleCounter
использование
1:创建累加器
private IntCounter numLines = new IntCounter();
2:注册累加器
getRuntimeContext().addAccumulator("num-lines", this.numLines);
3:使用累加器
this.numLines.add(1);
4:获取累加器的结果
myJobExecutionResult.getAccumulatorResult("num-lines")
пример кода
public class CounterDemo {
public static void main(String[] args) throws Exception{
//获取运行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSource<String> data = env.fromElements("a", "b", "c", "d");
DataSet<String> result = data.map(new RichMapFunction<String, String>() {
//1:创建累加器
private IntCounter numLines = new IntCounter();
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
//2:注册累加器
getRuntimeContext().addAccumulator("num-lines",this.numLines);
}
//int sum = 0;
@Override
public String map(String value) throws Exception {
//如果并行度为1,使用普通的累加求和即可,但是设置多个并行度,则普通的累加求和结果就不准了
//sum++;
//System.out.println("sum:"+sum);
this.numLines.add(1);
return value;
}
}).setParallelism(8);
//如果要获取counter的值,只能是任务
//result.print();
result.writeAsText("d:\\data\\mycounter");
JobExecutionResult jobResult = env.execute("counter");
//3:获取累加器
int num = jobResult.getAccumulatorResult("num-lines");
System.out.println("num:"+num);
}
}
Тут почти упоминается оператор пакетной обработки.Заинтересованные друзья могут скопировать код и запустить.Если вам неинтересно, то ладно.В любом случае сейчас в основном используются операторы реального времени, что мало на что влияет.
1.5.6 Состояние оператора
Вернуться к примеру с подсчетом слов
public class WordCount {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> data = env.socketTextStream("localhost", 8888);
SingleOutputStreamOperator<Tuple2<String, Integer>> result = data.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String line, Collector<Tuple2<String, Integer>> collector) throws Exception {
String[] fields = line.split(",");
for (String word : fields) {
collector.collect(new Tuple2<>(word, 1));
}
}
}).keyBy("0")
.sum(1);
result.print();
env.execute("WordCount");
}
}
На этом этапе следует отметить, что мы должны контролировать порт 8888, чтобы запустить программу, иначе будет сообщено об ошибке отказа в подключении. Поскольку я делаю это под Windows, поэтому у меня есть netcat, чтобы помочь.На данный момент я сначала запускаю netcat, а затем nc -lk 8888 для мониторинга порта 8888.
Затем я снова ввожу некоторые слова, в это время мы смотрим нашу информацию о печати.
4> (hadoop,1)
4> (hadoop,2)
4> (flink,1)
4> (flink,2)
1> (hive,1)
1> (hive,2)
1> (hive,3)
В настоящее время мы обнаружим, что Flink является обработкой в реальном времени в истинном смысле, для обработки по одному, и вы обнаружите, что в Spark вам необходимо использовать расширенные операторы updateStateByKey или mapWithState для достижения накопления, которое может быть легко сделать во Flink.
Почему это? Просто потому, что на официальном сайте написано: Flink — это поток данных с отслеживанием состояния.
Поэтому состояние (состояние) является основным направлением нашего обучения Flink. Мы объясним позже
finally
···