предисловие
В последнее время мне нечего делать, поэтому я начал разбирать техники, которые использовал раньше, чтобы скоротать время. Мое предыдущее использование на основе Flink в основном использовалось для обработки в реальном времени, и я в основном использовал искру в автономном режиме.Здесь вы можете кратко понятьКраткое сравнение Flink и Spark, короче, Flink лучше в обработке в реальном времени, а оффлайн Spark лучше. По установке и развёртыванию я его здесь больше демонстрировать не буду.Я его уже давно развернул.Это в основном для демонстрации повседневного использования Flink.На работе тоже пишем напрямую локально,потом пакуем и отправляем в пряжу , Вот и все.
1.1 Введение во Flink
1.1.1 Описание флинка
официальный сайт флинка:flink.apache.org/
Apache Flink® — вычисления с отслеживанием состояния над потоками данных
Apache Flink — это фреймворк и механизм распределенной обработки дляБез границ и с границамиВычисления с сохранением состояния выполняются над потоками данных. Flink работает во всех распространенных кластерных средах и может выполнять вычисления со скоростью памяти и в любом масштабе.
Любой тип данных может формировать поток событий. Транзакции по кредитным картам, измерения датчиков, журналы машин, записи действий пользователей на веб-сайте или в мобильном приложении — все это формирует поток.
данные можно использовать какНеограниченныйилиСвязалипоток для обработки.
-
неограниченный поток: Начало потока определено, но конец потока не определен. Они бесконечно генерируют данные. Неограниченные потоки данных должны обрабатываться непрерывно, т. е. данные должны обрабатываться сразу же после их приема. Мы не можем ждать, пока все данные поступят для обработки, потому что ввод бесконечен и ввод не будет завершен в любой момент. Обработка неограниченных данных часто требует, чтобы события принимались в определенном порядке, например в порядке их возникновения, чтобы можно было сделать вывод о полноте результатов.
-
ограниченный поток: определяет начало потока и конец потока. Ограниченные потоки могут быть вычислены после приема всех данных. Все данные в ограниченном потоке могут быть упорядочены, поэтому упорядоченный прием не требуется. Обработку ограниченного потока часто называют пакетной обработкой.
Apache Flink отлично справляется с обработкой неограниченных и ограниченных наборов данных.Точный контроль времени и сохранение состояния позволяют среде выполнения Flink запускать любое приложение, которое обрабатывает неограниченные потоки. Ограниченные потоки обрабатываются внутри с помощью некоторых алгоритмов и структур данных, специально разработанных для наборов данных фиксированного размера, что обеспечивает превосходную производительность.
1.1.2 Архитектура Flink
1.2 Базовый случай входа в Flink
1.2.1 maven-зависимости
Вот зависимости проекта моей предыдущей демонстрации, включая зависимости redis, kafak, scala и т. д., а также предыдущие зависимости kylin, упакованные плагины и т. д., которые можно выбирать по мере необходимости.
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>cn.tao</groupId>
<artifactId>flinkTestDemo</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<flink.version>1.8.2</flink.version>
<scala.version>2.11.8</scala.version>
</properties>
<dependencies>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.44</version>
</dependency>
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>${flink.version}</version>
<exclusions>
<exclusion>
<artifactId>slf4j-api</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>scala-parser-combinators_2.11</artifactId>
<groupId>org.scala-lang.modules</groupId>
</exclusion>
</exclusions>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_2.11</artifactId>
<version>${flink.version}</version>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>${flink.version}</version>
<exclusions>
<exclusion>
<artifactId>scala-library</artifactId>
<groupId>org.scala-lang</groupId>
</exclusion>
</exclusions>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>${flink.version}</version>
<exclusions>
<exclusion>
<artifactId>snappy-java</artifactId>
<groupId>org.xerial.snappy</groupId>
</exclusion>
<exclusion>
<artifactId>slf4j-api</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>${flink.version}</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_2.11</artifactId>
<version>${flink.version}</version>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>2.7</version>
</dependency>
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
</dependency>
<dependency>
<groupId>org.apache.kylin</groupId>
<artifactId>kylin-jdbc</artifactId>
<version>2.5.1</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<testExcludes>
<testExclude>/src/test/**</testExclude>
</testExcludes>
<encoding>utf-8</encoding>
</configuration>
</plugin>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<id>compile-scala</id>
<phase>compile</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>test-compile-scala</id>
<phase>test-compile</phase>
<goals>
<goal>add-source</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
<configuration>
<scalaVersion>${scala.version}</scalaVersion>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id> <!-- this is used for inheritance merges -->
<phase>package</phase> <!-- 指定在打包节点执行jar包合并操作 -->
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
1.2.2 Версия Java для обработки в реальном времени
Здесь нам нужно установить команду nc
Как установить команду nc на Mac,windows установить команду nc,линукс установить нк. Хотя эти 3 системы были установлены, это было давно, и я мало что помню Это только для справки.
Мы используем локальный сокет в качестве источника данных для отладки. Сначала запустите сокет локально
nc -lk 8888
блокирует вот так
Java-код
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
/**
* 每隔1秒统计最近2秒的单词数量
*/
public class JavaFlinkStreamingDemo {
public static void main(String[] args) throws Exception {
//拿到Stream执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//基于socket 构建数据源 数据源有几种,后面再演示
DataStreamSource<String> stream = env.socketTextStream("localhost", 8888);
/**
* 第一个泛形是输入类型
* 第二个是返回的类型
*/
SingleOutputStreamOperator<WordCount> flatStream = stream.flatMap(new FlatMapFunction<String, WordCount>() {
@Override
public void flatMap(String value, Collector<WordCount> out) throws Exception {
String[] split = value.split(",");
for (String word : split) {
out.collect(new WordCount(word, 1));
}
}
});
//这里根据对象的字段进行keyBy然后统计,最后聚合
SingleOutputStreamOperator<WordCount> sum = flatStream.keyBy("word")
.timeWindow(Time.seconds(2), Time.seconds(1))
.sum("count");
sum.print();
env.execute("javaFlinkStreamingDemo");
}
public static class WordCount {
public String word;
public long count;
public WordCount() {
}
public WordCount(String word, long count) {
this.word = word;
this.count = count;
}
@Override
public String toString() {
return "单词:" + word + "数量" + count;
}
}
}
запустить нашу программу
Ввод данных в консоль
идея здесь
1.2.3 Версия scala для обработки в реальном времени
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time
object ScalaFlinkStreamingDemo {
def main(args: Array[String]): Unit = {
//导入隐式转换
import org.apache.flink.api.scala._
//步骤一:获取执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//步骤二:获取数据源
val textStream = env.socketTextStream("localhost",8888)
//步骤三:数据处理
val wordCountStream = textStream.flatMap(line => line.split(","))
.map((_, 1))
.keyBy(0)
.timeWindow(Time.seconds(2), Time.seconds(1))
.sum(1)
wordCountStream.print()
env.execute("ScalaFlinkStreamingDemo")
}
}
Здесь ведется статистика в соответствии с нашими требованиями и выполняется простой расчет в реальном времени.
1.2.4 Чтение файлов в автономном режиме
Создайте файл со следующим содержимым
код показывает, как показано ниже
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
public class ReadTextFileDemo {
public static void main(String[] args) throws Exception {
//拿到Stream执行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//基于readTextFile 构建数据源
DataSource<String> source = env.readTextFile("/Users/xxx/workspace/flinkTestDemo/src/main/resources/wordFile.txt");
/**
* 第一个泛形是输入类型
* 第二个是返回的类型
*/
FlatMapOperator<String, Tuple2<String,Integer>> flatSource = source.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String,Integer>> out) throws Exception {
String[] split = value.split(",");
for (String word : split) {
out.collect(new Tuple2<String,Integer>(word, 1));
}
}
});
/**
* timeWindow
* size 第一个参数窗口的大小
* slide 第二个参数 间隔多久进行滑动
*/
AggregateOperator<Tuple2<String,Integer>> sum = flatSource.groupBy(0).sum(1);
//指定输出的路径 这里我设置了并行度为1
sum.writeAsText("/Users/xxx/workspace/flinkTestDemo/src/main/resources/result.txt").setParallelism(1);
env.execute("ReadTextFileDemo");
}
}
指定输出的路径 这里我设置了并行度为1,不设置会导致他按我的cpu核进行默认设置,相当于setParallelism(16),不同配置的电脑不一样,然后会输出一个文件夹,里面有16个文件,大部分是没有数据的,因为我这里就5行数据。并行度东西后面再写。 Как показано
SetParalLISM (1) Файл результата после запуска
2.1 DataStream, связанный с
2.1.1 Flink поставляется с коннекторами
2.1.2 source
-
на основе файлов
-
readTextFile(path)
Прочитайте текстовый файл, прочитайте строку за строкой и вернитесь.
DataSource<String> source = env.readTextFile("/Users/xxx/workspace/flinkTestDemo/src/main/resources/wordFile.txt");
- на основе сокета
сокеттекстовый поток Читать данные из сокетника, элементы можно обрезать по разделителю.
DataStreamSource<String> stream = env.socketTextStream("localhost", 8888);
- установить на основе
Чтобы создать поток данных через коллекцию коллекций java, все элементы в коллекции должны быть одного типа.
// 创建集合
ArrayList<String> data = new ArrayList<String>();
data.add("flink");
data.add("spark");
data.add("hive");
DataStreamSource<String> dataStream = env.fromCollection(data);
//..进行对应操作
dataStream.map(...).print()
env.execute(...)
- пользовательский ввод
addSource может считывать данные из сторонних источников данных
одна степень параллелизма
При реализации SourceFunction на основе источника данных с одной степенью параллелизма setParallelism() не может быть установлено больше 1, иначе возникнет исключение.Например, socketTextStream имеет одну степень параллелизма.
Ставим 2, запускаем и видим ошибку
public class CustomizeSourceSingleClass implements SourceFunction<Integer> {
boolean flag = true;
@Override
public void run(SourceContext<Integer> out) throws Exception {
while (flag){
out.collect(new Random().nextInt(1000));
Thread.sleep(1000);
}
}
@Override
public void cancel() {
flag = false;
}
}
Несколько степеней параллелизма
Реализовать функцию ParallelSourceFunction
public class CustomizeSourceParalleClass implements ParallelSourceFunction<Integer> {
boolean flag = true;
@Override
public void run(SourceContext<Integer> out) throws Exception {
while (flag){
out.collect(new Random().nextInt(1000));
Thread.sleep(1000);
}
}
@Override
public void cancel() {
flag = false;
}
}
2.1.3 Общие операторы преобразования
2.1.3.1 карта и фильтр
import demo.CustomizeSourceSingleClass;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class MapAndFilter {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Integer> numberStream = env.addSource(new CustomizeSourceSingleClass());
SingleOutputStreamOperator<Integer> dataStream = numberStream.map(new MapFunction<Integer, Integer>() {
@Override
public Integer map(Integer value) throws Exception {
System.out.println("map接受到了数据:"+value);
return value;
}
});
//只要大于100的数据
SingleOutputStreamOperator<Integer> streamOperator = dataStream.filter(new FilterFunction<Integer>() {
@Override
public boolean filter(Integer number) throws Exception {
return number > 100 ;
}
});
streamOperator.print().setParallelism(1);
env.execute("MapAndFilter");
}
}
2.1.3.2 flatMap, keyBy, сумма
пожалуйста, проверьте1.2.2 Версия Java для обработки в реальном времениможно использовать код.
2.1.3.3 union
import demo.CustomizeSourceSingleClass;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* 数据类型一样才可以合并
**/
public class UnionDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Integer> numberStream1 = env.addSource(new CustomizeSourceSingleClass());
DataStreamSource<Integer> numberStream2 = env.addSource(new CustomizeSourceSingleClass());
//这里我对数据流2做一个加法
SingleOutputStreamOperator<Integer> mapAdd1000 = numberStream2.map(a -> a + 1000);
DataStream<Integer> union = numberStream1.union(mapAdd1000);
//超过1000 就是第二个数据流的
SingleOutputStreamOperator<Integer> dataStream = union.map(new MapFunction<Integer, Integer>() {
@Override
public Integer map(Integer value) throws Exception {
System.out.println("map接受到了数据:"+value);
return value;
}
});
dataStream.print().setParallelism(1);
env.execute("UnionDemo");
}
}
2.1.3.4 connect
import demo.CustomizeSourceSingleClass;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
public class ConnectiDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Integer> numberStream1 = env.addSource(new CustomizeSourceSingleClass());
DataStreamSource<Integer> numberStream2 = env.addSource(new CustomizeSourceSingleClass());
//这里我对数据流2做一个加法
SingleOutputStreamOperator<String> mapAddAA = numberStream2.map(a -> a + "AA");
ConnectedStreams<Integer, String> connect = numberStream1.connect(mapAddAA);
//带AA的 就是第二个数据流的
//CoMapFunction 三个参数 第一个流的数据类型,第二个流的数据类型,输出的数据类型
SingleOutputStreamOperator<Object> dataStream = connect.map(new CoMapFunction<Integer, String, Object>() {
//获取相应数据流的数据
@Override
public Object map1(Integer value) throws Exception {
return value;
}
@Override
public Object map2(String value) throws Exception {
return value;
}
});
dataStream.print().setParallelism(1);
env.execute("ConnectiDemo");
}
}
2.1.3.5 Разделить и выбрать
import demo.CustomizeSourceSingleClass;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SplitStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.ArrayList;
public class SplitAndSelectDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Integer> stream = env.addSource(new CustomizeSourceSingleClass());
SplitStream<Integer> splitStream = stream.split(new OutputSelector<Integer>() {
@Override
public Iterable<String> select(Integer value) {
ArrayList<String> outPut = new ArrayList<>();
if (value % 2 != 0) {
outPut.add("odd");//奇数
} else {
outPut.add("even");//偶数
}
return outPut;
}
});
//选择一个或者多个切分后的流
DataStream<Integer> evenStream = splitStream.select("even");
DataStream<Integer> oddStream = splitStream.select("odd");
DataStream<Integer> moreStream = splitStream.select("odd","even");
//打印结果
evenStream.print().setParallelism(1);
env.execute("SplitAndSelectDemo");
}
}
2.1.4 работа с раковиной
2.1.4.1 print
evenStream.print().setParallelism(1);
2.1.4.2 writeAsText
stream.writeAsText("/xxx/xxx/xxx.txt").setParallelism(1);
2.1.4.3 Собственная раковина Flink
- Apache Kafka (source/sink)
- Apache Cassandra (sink)
- Amazon Kinesis Streams (source/sink)
- Elasticsearch (sink)
- Hadoop FileSystem (sink)
- RabbitMQ (source/sink)
- Apache NiFi (source/sink)
- Twitter Streaming API (source)
- Google PubSub (source/sink)
2.1.4.4 Пользовательский приемник
Мы также часто храним данные в Redis в нашей работе.
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
public class SinkRedisDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> text = env.socketTextStream("localhost", 8888);
DataStream<Tuple2<String, String>> l_wordsData = text.map(new MapFunction<String, Tuple2<String, String>>() {
@Override
public Tuple2<String, String> map(String value) throws Exception {
System.out.println(value);
return new Tuple2<>("listData", value);
}
});
//redis的配置
FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("node2").setPort(6379).build();
//创建sink
RedisSink<Tuple2<String, String>> redisSink = new RedisSink<>(conf, new MyRedisMapper());
l_wordsData.addSink(redisSink);
env.execute("SinkRedisDemo");
}
public static class MyRedisMapper implements RedisMapper<Tuple2<String, String>> {
//表示从数据中获取需要操作的redis的key
@Override
public String getKeyFromData(Tuple2<String, String> data) {
return data.f0;
}
//表示从数据中获取需要操作的redis的value
@Override
public String getValueFromData(Tuple2<String, String> data) {
return data.f1;
}
//下面可以选择插入redis的命令类型
@Override
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(RedisCommand.LPUSH);
}
}
}
Затем зайдите на сервер, чтобы проверить хранилище данных Redis, вы можете видеть, что данные также хранятся здесь.
redis-cli
127.0.0.1:6379> LRANGE listData 0 10
1) "asdasd"
2) "asdsad"
3) "sadasd"
4) "asdsa"
5) "23423423 32423432 32423 4"
127.0.0.1:6379>
2.2 Набор данных, связанный с
2.2.1 source
基于文件读取
env.readTextFile(path)
基于集合读取
env.fromCollection(Collection)
2.2.2 transform
2.2.2.1 Карта и MapPartition
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.MapPartitionFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.MapOperator;
import org.apache.flink.util.Collector;
import java.util.ArrayList;
import java.util.Iterator;
public class MapAndMapPartition {
public static void main(String[] args) throws Exception {
//获取运行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
ArrayList<String> data = new ArrayList<>();
data.add("flink ");
data.add("spark");
data.add("hadoop");
DataSource<String> text = env.fromCollection(data);
//假设我们现在要将数据插入到数据库中
MapOperator<String, String> map = text.map(new MapFunction<String, String>() {
@Override
public String map(String value) throws Exception {
//创建连接 处理 关闭连接 每一条数据都要这样做
return value;
}
});
DataSet<String> mapPartition = map.mapPartition(new MapPartitionFunction<String, String>() {
@Override
public void mapPartition(Iterable<String> values, Collector<String> out) throws Exception {
//基于一个分区创建连接
//一个分区的数据
Iterator<String> it = values.iterator();
while (it.hasNext()) {
out.collect(it.next());
}
//然后处理完毕 关闭连接
}
});
mapPartition.print();
}
}
2.2.2.2 отдельная дедупликация
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.util.Collector;
import java.util.ArrayList;
public class DistinctDemo {
public static void main(String[] args) throws Exception {
//获取运行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
ArrayList<String> data = new ArrayList<>();
data.add("spark flink");
data.add("spark");
DataSource<String> text = env.fromCollection(data);
FlatMapOperator<String, String> flatMap = text.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
String[] split = value.toLowerCase().split("\\W+");
for (String word : split) {
System.out.println("单词 " + word);
out.collect(word);
}
}
});
flatMap.distinct().print();
}
}
результат
2.2.2.3 join
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import java.util.ArrayList;
public class JoinDemo {
public static void main(String[] args) throws Exception {
//获取运行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
ArrayList<Tuple2<Integer, String>> tuple1 = new ArrayList<>();
tuple1.add(new Tuple2<>(1, "xiaoming"));
tuple1.add(new Tuple2<>(2, "xiaoli"));
tuple1.add(new Tuple2<>(3, "wangcai"));
//tuple2<用户id,用户所在城市>
ArrayList<Tuple2<Integer, String>> tuple2 = new ArrayList<>();
tuple2.add(new Tuple2<>(1, "guangzhou"));
tuple2.add(new Tuple2<>(2, "shenzhen"));
tuple2.add(new Tuple2<>(3, "foshan"));
DataSource<Tuple2<Integer, String>> data1 = env.fromCollection(tuple1);
DataSource<Tuple2<Integer, String>> data2 = env.fromCollection(tuple2);
data1.join(data2).where(0) //指定第一个数据集中需要进行比较的元素角标
.equalTo(0) //指定第二个数据集中需要进行比较的元素角标
.with(new JoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple3<Integer, String, String>>() {
@Override
public Tuple3<Integer, String, String> join(Tuple2<Integer, String> first, Tuple2<Integer, String> second)
throws Exception {
return new Tuple3<>(first.f0, first.f1, second.f1);
}
}).print();
}
}
результат
2.2.2.4 OutJoin
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import java.util.ArrayList;
public class OutJoin {
public static void main(String[] args) throws Exception {
//获取运行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
ArrayList<Tuple2<Integer, String>> tuple1 = new ArrayList<>();
tuple1.add(new Tuple2<>(1, "xiaoming"));
tuple1.add(new Tuple2<>(2, "xiaoli"));
tuple1.add(new Tuple2<>(3, "wangcai"));
tuple1.add(new Tuple2<>(4, "zhangfei"));
//tuple2<用户id,用户所在城市>
ArrayList<Tuple2<Integer, String>> tuple2 = new ArrayList<>();
tuple2.add(new Tuple2<>(1, "guangzhou"));
tuple2.add(new Tuple2<>(2, "shenzhen"));
tuple2.add(new Tuple2<>(3, "foshan"));
tuple2.add(new Tuple2<>(6, "zhaoyun"));
DataSource<Tuple2<Integer, String>> data1 = env.fromCollection(tuple1);
DataSource<Tuple2<Integer, String>> data2 = env.fromCollection(tuple2);
//左连接 意味着second可能为空 需要增加判断
data1.leftOuterJoin(data2).where(0) //指定第一个数据集中需要进行比较的元素角标
.equalTo(0) //指定第二个数据集中需要进行比较的元素角标
.with(new JoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple3<Integer, String, String>>() {
@Override
public Tuple3<Integer, String, String> join(Tuple2<Integer, String> first, Tuple2<Integer, String> second)
throws Exception {
if (second == null) {
return new Tuple3<>(first.f0, first.f1, null);
} else {
return new Tuple3<>(first.f0, first.f1, second.f1);
}
}
}).print();
System.out.println("===========" + "左连接结束" + "===========");
//右连接 意味着first可能为空 需要增加判断
data1.rightOuterJoin(data2).where(0) //指定第一个数据集中需要进行比较的元素角标
.equalTo(0) //指定第二个数据集中需要进行比较的元素角标
.with(new JoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple3<Integer, String, String>>() {
@Override
public Tuple3<Integer, String, String> join(Tuple2<Integer, String> first, Tuple2<Integer, String> second)
throws Exception {
if (first == null) {
return new Tuple3<>(second.f0, null, second.f1);
} else {
return new Tuple3<>(first.f0, first.f1, second.f1);
}
}
}).print();
System.out.println("===========" + "右连接结束" + "===========");
//全连接 意味着2个 可能为空 需要增加判断
data1.fullOuterJoin(data2).where(0) //指定第一个数据集中需要进行比较的元素角标
.equalTo(0) //指定第二个数据集中需要进行比较的元素角标
.with(new JoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple3<Integer, String, String>>() {
@Override
public Tuple3<Integer, String, String> join(Tuple2<Integer, String> first, Tuple2<Integer, String> second)
throws Exception {
if (first == null) {
return new Tuple3<>(second.f0, null, second.f1);
}
if (second == null) {
return new Tuple3<>(first.f0, first.f1, null);
} else {
return new Tuple3<>(first.f0, first.f1, second.f1);
}
}
}).print();
System.out.println("===========" + "全连接结束" + "===========");
}
}
результат
2.2.2.5 Кросс-декартово произведение
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.CrossOperator;
import org.apache.flink.api.java.operators.DataSource;
import java.util.ArrayList;
public class CrossDemo {
public static void main(String[] args) throws Exception{
//获取运行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
ArrayList<String> data1 = new ArrayList<>();
data1.add("A");
data1.add("B");
ArrayList<Integer> data2 = new ArrayList<>();
data2.add(1);
data2.add(2);
DataSource<String> text1 = env.fromCollection(data1);
DataSource<Integer> text2 = env.fromCollection(data2);
CrossOperator.DefaultCross<String, Integer> cross = text1.cross(text2);
cross.print();
}
}
результат
2.2.2.6 sortPartition и первый
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.GroupReduceOperator;
import org.apache.flink.api.java.operators.SortPartitionOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import java.util.ArrayList;
public class FirstAndSortDemo {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
ArrayList<Tuple2<Integer, String>> data = new ArrayList<>();
data.add(new Tuple2<>(2,"aa"));
data.add(new Tuple2<>(4,"bb"));
data.add(new Tuple2<>(3,"cc"));
data.add(new Tuple2<>(1,"ab"));
data.add(new Tuple2<>(1,"bc"));
data.add(new Tuple2<>(1,"bd"));
DataSource<Tuple2<Integer, String>> dataSource = env.fromCollection(data);
GroupReduceOperator<Tuple2<Integer, String>, Tuple2<Integer, String>> first2 = dataSource.first(2);
System.out.println("=====打印前2个 基于读取顺序=====");
first2.print();
System.out.println("=====按第一个属性升序排序 基于读取顺序=====");
SortPartitionOperator<Tuple2<Integer, String>> sort1 = dataSource.sortPartition(0, Order.ASCENDING);
sort1.print();
System.out.println("=====按第一列升序排序 再按第二列倒序排序 =====");
SortPartitionOperator<Tuple2<Integer, String>> sort2 = dataSource.sortPartition(0, Order.ASCENDING)
.sortPartition(1,Order.DESCENDING);
sort2.print();
System.out.println("=====分组后 按第二列升序排列 取第一个=====");
SortPartitionOperator<Tuple2<Integer, String>> sort3 = dataSource.groupBy(0).sortGroup(1, Order.ASCENDING).first(1)
.sortPartition(1,Order.DESCENDING);
sort3.print();
}
}
результат
2.2.2.7 раздел и пользовательский раздел
import org.apache.flink.api.common.functions.MapPartitionFunction;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
import java.util.ArrayList;
import java.util.Iterator;
public class PartitionDemo {
public static void main(String[] args) throws Exception {
//获取运行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
ArrayList<Tuple2<Integer, String>> data = new ArrayList<>();
data.add(new Tuple2<>(1, "demo1"));
data.add(new Tuple2<>(1, "demo2"));
data.add(new Tuple2<>(1, "demo3"));
data.add(new Tuple2<>(2, "demo4"));
data.add(new Tuple2<>(2, "demo5"));
data.add(new Tuple2<>(2, "demo6"));
data.add(new Tuple2<>(3, "demo7"));
data.add(new Tuple2<>(3, "demo8"));
data.add(new Tuple2<>(4, "demo9"));
data.add(new Tuple2<>(4, "demo10"));
data.add(new Tuple2<>(4, "demo11"));
data.add(new Tuple2<>(4, "demo12"));
data.add(new Tuple2<>(5, "demo13"));
data.add(new Tuple2<>(5, "demo14"));
data.add(new Tuple2<>(5, "demo15"));
data.add(new Tuple2<>(5, "demo16"));
data.add(new Tuple2<>(5, "demo17"));
data.add(new Tuple2<>(6, "demo18"));
data.add(new Tuple2<>(6, "demo19"));
data.add(new Tuple2<>(6, "demo20"));
data.add(new Tuple2<>(6, "demo21"));
DataSource<Tuple2<Integer, String>> testSource = env.fromCollection(data);
/* // 指定第一个字段 基于 hash来划分partition 相同的key 会在一个分区
testSource.partitionByHash(0).mapPartition(new MapPartitionFunction<Tuple2<Integer, String>, Tuple2<Integer, String>>() {
@Override
public void mapPartition(Iterable<Tuple2<Integer, String>> values, Collector<Tuple2<Integer, String>> out) throws Exception {
Iterator<Tuple2<Integer, String>> it = values.iterator();
while (it.hasNext()) {
Tuple2<Integer, String> next = it.next();
System.out.println("线程id:" + Thread.currentThread().getId() + "," + next);
}
}
}).print();
//指定第一个字段作为Range来划分partition 相同数字的会在一个分区
testSource.partitionByRange(0).mapPartition(new MapPartitionFunction<Tuple2<Integer,String>, Tuple2<Integer,String>>() {
@Override
public void mapPartition(Iterable<Tuple2<Integer, String>> values, Collector<Tuple2<Integer, String>> out) throws Exception {
Iterator<Tuple2<Integer, String>> it = values.iterator();
while (it.hasNext()){
Tuple2<Integer, String> next = it.next();
System.out.println("线程id:"+Thread.currentThread().getId()+","+next);
}
}
}).print();*/
//设置分区为4个
testSource.partitionCustom(new MyPartition(4), 0).mapPartition(new MapPartitionFunction<Tuple2<Integer, String>, Tuple2<Integer, String>>() {
@Override
public void mapPartition(Iterable<Tuple2<Integer, String>> values, Collector<Tuple2<Integer, String>> out) throws Exception {
Iterator<Tuple2<Integer, String>> iterator = values.iterator();
while (iterator.hasNext()) {
Tuple2<Integer, String> next = iterator.next();
System.out.println("线程id:" + Thread.currentThread().getId() + "," + next);
}
}
}).print();
}
//自定义分区器
public static class MyPartition implements Partitioner<Integer> {
private int numPartition;
public MyPartition() {
}
public MyPartition(int numPartitions) {
this.numPartition = numPartitions;
}
public int partition(Integer key, int numPartitions) {
System.out.println("分区总数" + numPartitions);
return key % numPartition;
}
}
}
Это слишком долго, чтобы публиковать результаты.
2.2.3 sink
Я прямо возьму пример с официального сайта и вставлю сюда
// 数据
DataSet<String> textData = // [...]
// 将文件数据逐行输入到指定目录
textData.writeAsText("file:///my/result/on/localFS");
// 将文件数据逐行输入到hdfs上指定目录
textData.writeAsText("hdfs://$Host:$Port/my/result/on/localFS");
// 如果文件已经存在则覆盖
textData.writeAsText("file:///my/result/on/localFS", WriteMode.OVERWRITE);
// 将tuples 按照指定的行分隔 字段分隔 写入到指定目录
DataSet<Tuple3<String, Integer, Double>> values = // [...]
values.writeAsCsv("file:///path/to/the/result/file", "\n", "|");
// 这将以文本格式“(a,b,c)”而不是CSV行写入元组
values.writeAsText("file:///path/to/the/result/file");
// 给当前输出指定一个格式化器,按照给定方式输出到磁盘
values.writeAsFormattedText("file:///path/to/the/result/file",
new TextFormatter<Tuple2<Integer, Integer>>() {
public String format (Tuple2<Integer, Integer> value) {
return value.f1 + " - " + value.f0;
}
});
2.2.4 широковещательные широковещательные переменные
Его можно понимать как общедоступную общую переменную. Путем трансляции набора данных набора данных на текущем узле могут быть получены разные задачи. На каждом узле существует только одна копия этих данных. Если она не используется, то каждая задача каждый узел будет копировать копию данных, расходуя память. В Spark также есть широковещательные переменные.
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
public class BroadCastDemo {
public static void main(String[] args) throws Exception{
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//构建广播数据
ArrayList<Tuple2<String, Integer>> broadData = new ArrayList<>();
broadData.add(new Tuple2<>("zhaoyun",33));
broadData.add(new Tuple2<>("zhangfei",30));
broadData.add(new Tuple2<>("liubei",40));
DataSet<Tuple2<String, Integer>> tupleData = env.fromCollection(broadData);
//模拟真实数据
DataSource<String> data = env.fromElements("zhaoyun", "zhangfei", "liubei");
//这里使用到RichMapFunction获取广播变量,因为Rich 会多一个open方法 可以做初始化
DataSet<String> result = data.map(new RichMapFunction<String, String>() {
List<HashMap<String, Integer>> broadCastMap = new ArrayList<HashMap<String, Integer>>();
HashMap<String, Integer> map = new HashMap<String, Integer>();
/**
* 这个方法只会执行一次
* 可以在这里实现一些初始化的功能
*/
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
List<Tuple2<String, Integer>> broadCastData = getRuntimeContext().getBroadcastVariable("broadCastData");
for (Tuple2<String, Integer> tuple2 : broadCastData) {
map.put(tuple2.f0,tuple2.f1);
}
}
@Override
public String map(String value) throws Exception {
Integer age = map.get(value);
return value + "," + age;
}
}).withBroadcastSet(tupleData, "broadCastData");//广播数据
result.print();
}
}
结果数据
zhaoyun,33
zhangfei,30
liubei,40
2.2.5 Счетчик Счетчик
По сути, он нацелен на задачи распределенных вычислений, каждая задача считает число, а затем окончательно сливает статистику, что аналогично счетчику MR.
Результат слияния можно получить только после завершения выполнения задачи.
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.accumulators.IntCounter;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileSystem;
public class CounterDemo {
public static void main(String[] args) throws Exception {
//获取运行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSource<String> data = env.fromElements("1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11");
//RichMapFunction 在open 方法 进行注册
DataSet<String> result = data.map(new RichMapFunction<String, String>() {
//创建累加器
private IntCounter numLines = new IntCounter();
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
//注册累加器
getRuntimeContext().addAccumulator("num-sum", this.numLines);
}
@Override
public String map(String value) throws Exception {
//考虑到我们的任务一般是多并行度的,不能简单++1。
this.numLines.add(1);
return value;
}
}).setParallelism(16);
result.writeAsText("/Users/xxxx/workspace/flinkTestDemo/src/main/resources/result.txt", FileSystem.WriteMode.OVERWRITE).setParallelism(1);
//拿到job 执行 结果
JobExecutionResult counter = env.execute("counter");
//在结果里面获取统计结果
int num = counter.getAccumulatorResult("num-sum");
System.out.println("总数:" + num);
}
}
结果
总数:11
наконец
На этом этапе было продемонстрировано базовое использование flink и общих операторов.Если демонстрации нет, вы можете перейти на официальный сайт или проверить информацию, чтобы протестировать ее. СледующийСостояние Флинка, контрольная точка, точка сохраненияВ основном речь идет о введении состояния Flink.Использование состояния очень важно и очень важная особенность Flink.