Операции оператора во Flink

Flink

предисловие

Хорошо, давайте представим операторные операции Flink в этой статье.

Скачать Flink очень просто. После загрузки разархивируйте его, перейдите в каталог bin Flink и запустите start-cluster.sh. В это время мы можем посетить localhost: 8081, чтобы получить доступ к его красивой странице.


Чтобы остановить, используйте stop-cluster.sh

1.1 (Дополнительно) Использование Flink Shell

Для новичков легко сделать ошибки во время разработки.Если вы каждый раз упаковываете и отлаживаете, будет хлопотно и сложно найти проблему.Вы можете отлаживать в командной строке оболочки scala.

Подход оболочки scala поддерживает потоковую и пакетную обработку. При запуске командной строки оболочки автоматически создаются две разные ExecutionEnvironment. Используйте senv(Stream) и bev(Batch) для обработки потоковых и пакетных программ соответственно. (аналогично переменной sc в spark-shell)

bin/start-scala-shell.sh [local|remote|yarn] [options] <args>

Если мы столкнулись с вышеуказанной ошибкой, мы можем посмотреть информацию об этой ошибке.В ней говорится, что нам нужно подтвердить режим выполнения, поэтому нам нужно привести эту часть параметров.Есть три разных метода указания, а именно

[local | remote <host> <port> | yarn]

Давайте попробуем, начнем с местного

[root@node1 bin]# ./start-scala-shell.sh local

На данный момент я указал его режим работы локальный, и его можно успешно открыть.

···? ? ?

🤣, я полагаю, что у вас также может быть моя текущая ситуация, и сообщите об ошибке в это время"Не удалось создать DispatcherResourceManagerComponent"

На данный момент, если мы хотим решить эту проблему, мы можем cd /usr/local/flink-1.10.0/conf и добавить такой параметр

После изменения порта он может успешно работать.

Удаленный метод аналогичен методу на пряже, вы также можете запустить его, если хотите.

[root@node1 bin]# ./start-scala-shell.sh remote 192.168.200.11 8081

На данный момент мы успешно запустились, переехали

И осторожные друзья должны были обнаружить, что он показывает нам два примера пакетной обработки Flink и обработки в реальном времени.

Конечно, это не очень важно, потому что Flink-shell гораздо менее удобен в использовании, чем Spark-shell, поэтому мы просто пытаемся его открыть.

Помните, что мы говорили в то время, чтобы понять программу реального времени, нам в основном нужно понять три аспекта: источник данных, обработка данных и вывод данных, тогда давайте сначала рассмотрим источник данных Flink.

1.2.1 Введение в источник в реальном времени

source - это ввод источника данных программы, вы можете добавить источник в свою программу с помощью StreamExecutionEnvironment.addSource(sourceFunction).

flink предоставляет большое количество уже реализованных исходных методов, а также вы можете настроить исходный код (позже будет соответствующее небольшое демо, просто скопируйте его в свою IDEA и запустите):

  1. Настройка источников без параллелизма путем реализации интерфейса sourceFunction

  2. Настройте источники с помощью параллелизма, реализуя интерфейс ParallelSourceFunction или наследуя RichParallelSourceFunction.

Но в большинстве случаев мы можем использовать исходный код, который поставляется вместе с ним.

1.2.2 Как получить исходный код

1. Основанный на документе

readTextFile(path)
读取文本文件,文件遵循TextInputFormat 读取规则,逐行读取并返回。

2. В зависимости от сокета

socketTextStream
从socket中读取数据,元素可以通过一个分隔符切开。

3. Установить на основе

fromCollection(Collection)
通过java 的collection集合创建一个数据流,集合中的所有元素必须是相同类型的。

4. Пользовательский ввод

addSource 可以实现读取第三方数据源的数据
系统内置提供了一批connectors,连接器会提供对应的source支持【kafka】

На официальном сайте также упоминаются другие источники данных, но в конце концов основное внимание уделяется Кафке, так что вы можете понять других.

1.2.3 Сбор источников данных (код можно скопировать и запустить напрямую)

public class StreamingSourceFromCollection {
    public static void main(String[] args) throws Exception {
        //步骤一:获取环境变量
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //步骤二:模拟数据
        ArrayList<String> data = new ArrayList<String>();
        data.add("hadoop");
        data.add("spark");
        data.add("flink");
        //步骤三:获取数据源
        DataStreamSource<String> dataStream = env.fromCollection(data);
        //步骤四:transformation操作
        SingleOutputStreamOperator<String> addPreStream = dataStream.map(new MapFunction<String, String>() {
            @Override
            // 简单地遍历一下数据
            public String map(String word) throws Exception {
                return "testCollection_" + word;
            }
        });
        //步骤五:对结果进行处理(打印)
        addPreStream.print().setParallelism(1);
        //步骤六:启动程序
        env.execute("StreamingSourceFromCollection");

    }
}

выходной результат

1.2.4 Настройка однопараллельного источника данных (код можно скопировать и запустить напрямую)

Смоделируйте источник данных, который производит часть данных каждую секунду

/**
 * 功能:每秒产生一条数据
 */
public class MyNoParalleSource implements SourceFunction<Long> {
    private long number = 1L;
    private boolean isRunning = true;
    @Override
    public void run(SourceContext<Long> sct) throws Exception {
        while (isRunning){
         sct.collect(number);
         number++;
         //每秒生成一条数据
         Thread.sleep(1000);
        }

    }

    @Override
    public void cancel() {
        isRunning=false;
    }
}

В это время мы обрабатываем этот источник данных, и обработка очень проста, то есть выполняется операция сопоставления и операция фильтра, и фильтр заключается в выборе четных чисел.

/**
 * 功能:从自定义的数据数据源里面获取数据,然后过滤出偶数
 */
public class StreamingDemoWithMyNoPralalleSource {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 接收数据源
        DataStreamSource<Long> numberStream = env.addSource(new MyNoParalleSource()).setParallelism(1);
        SingleOutputStreamOperator<Long> dataStream = numberStream.map(new MapFunction<Long, Long>() {
            @Override
            public Long map(Long value) throws Exception {
                System.out.println("接受到了数据:"+value);
                return value;
            }
        });
        SingleOutputStreamOperator<Long> filterDataStream = dataStream.filter(new FilterFunction<Long>() {
            @Override
            public boolean filter(Long number) throws Exception {
                return number % 2 == 0;
            }
        });

        filterDataStream.print().setParallelism(1);
        env.execute("StreamingDemoWithMyNoPralalleSource");
    }
}

Результат бега есть

1.2.5 Пользовательский мультипараллельный источник данных

/**
 * 每秒产生一条数据
 */
public class MyParalleSource implements ParallelSourceFunction<Long> {
    private long number = 1L;
    private boolean isRunning = true;
    @Override
    public void run(SourceContext<Long> sct) throws Exception {
        while (isRunning){
            sct.collect(number);
            number++;
            //每秒生成一条数据
            Thread.sleep(1000);
        }

    }

    @Override
    public void cancel() {
        isRunning=false;
    }
}

Здесь мы видим, что мы просто реализуем другой интерфейс, а затем устанавливаем степень параллелизма в бизнес-коде.

public class StreamingDemoWithMyPralalleSource {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 这个代码.setParallelism(2)设置了并行2
        DataStreamSource<Long> numberStream = env.addSource(new MyParalleSource()).setParallelism(2);
        SingleOutputStreamOperator<Long> dataStream = numberStream.map(new MapFunction<Long, Long>() {
            @Override
            public Long map(Long value) throws Exception {
                System.out.println("接受到了数据:"+value);
                return value;
            }
        });
        SingleOutputStreamOperator<Long> filterDataStream = dataStream.filter(new FilterFunction<Long>() {
            @Override
            public boolean filter(Long number) throws Exception {
                return number % 2 == 0;
            }
        });

        filterDataStream.print().setParallelism(1);
        env.execute("StreamingDemoWithMyNoPralalleSource");
    }
}

1.3 Общие операторы преобразования Flink

1.3.1 Карта и фильтр (только что продемонстрировано)

1.3.2 flatMap, keyBy, sum, union (то же, что и Spark)

1.3.3 подключение, MapFunction и coMapFunction

Операция соединения недоступна в spark, поэтому взгляните на нее. Она похожа на объединение, но может соединять только два потока. Типы данных двух потоков могут быть разными, и к ним будут применяться разные методы обработки. данные в двух потоках.Разница между CoMapFunction и MapFunction в том, что обработка данных одного потока стала обработкой двух потоков (обратите внимание, что их может быть только два).

public class ConnectionDemo {
    public static void main(String[] args) throws Exception {
        //获取Flink的运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //获取数据源
        //注意:针对此source,并行度只能设置为1
        DataStreamSource<Long> text1 = env.addSource(new MyNoParalleSource()).setParallelism(1);

        DataStreamSource<Long> text2 = env.addSource(new MyNoParalleSource()).setParallelism(1);

        SingleOutputStreamOperator<String> text2_str = text2.map(new MapFunction<Long, String>() {
            @Override
            public String map(Long value) throws Exception {
                // 这里是第二个数据源,字符串我加了一个前缀str_
                return "str_" + value;
            }
        });
        ConnectedStreams<Long, String> connectStream = text1.connect(text2_str);
        SingleOutputStreamOperator<Object> result = connectStream.map(new CoMapFunction<Long, String, Object>() {
            @Override
            public Object map1(Long value) throws Exception {
                // 在这里可以进行业务处理
                return value;
            }
            @Override
            public Object map2(String value) throws Exception {
                // 在这里也可以进行业务处理
                return value;
            }
        });

        //打印结果
        result.print().setParallelism(1);
        String jobName = ConnectionDemo.class.getSimpleName();
        env.execute(jobName);
    }
}

Также возможно, что два потока данных в выходном результате не являются одним для вас и одним для меня, но возможно, что когда один поток быстрый, сначала идет более одного потока.

1.3.4 Разделить и выбрать

Функция этого состоит в том, чтобы разрезать поток данных на несколько потоков данных.

Может быть так, что в реальной работе в исходном потоке данных смешивается множество однотипных данных, а правила обработки разных типов данных разные, поэтому по определенным правилам,
Разделите поток данных на несколько потоков данных, чтобы каждый поток данных мог использовать различную логику обработки, иselect — это функция, которая помогает нам извлекать разные потоки

public class SplitDemo {
    public static void main(String[] args) throws  Exception {
        //获取Flink的运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //获取数据源
        DataStreamSource<Long> text = env.addSource(new MyNoParalleSource()).setParallelism(1);//注意:针对此source,并行度只能设置为1
        //对流进行切分,按照数据的奇偶性进行区分
        SplitStream<Long> splitStream = text.split(new OutputSelector<Long>() {
            @Override
            public Iterable<String> select(Long value) {
                ArrayList<String> outPut = new ArrayList<>();
                if (value % 2 == 0) {
                    outPut.add("even");//偶数
                } else {
                    outPut.add("odd");//奇数
                }
                return outPut;
            }
        });

        //选择一个或者多个切分后的流
        DataStream<Long> evenStream = splitStream.select("even");
        DataStream<Long> oddStream = splitStream.select("odd");
        DataStream<Long> moreStream = splitStream.select("odd","even");

        //打印结果,此时我选择的全是偶数的数据
        evenStream.print().setParallelism(1);
        String jobName = SplitDemo.class.getSimpleName();
        env.execute(jobName);
    }
}

результат операции

1.4 Общие операторы приемника Flink

Вывод данных на самом деле относительно прост, я думаю, эту штуку можно и не разворачивать в связке с кодом, и ее можно будет грубо пропустить.

1.4.1 print() и printToErr()

Распечатать значение метода toString() каждого элемента в стандартный поток вывода или стандартный поток вывода ошибок

1.4.2 writeAsText()

/**
 * 数据源:1 2 3 4 5.....源源不断过来
 * 通过map打印一下接受到数据
 * 通过filter过滤一下数据,我们只需要偶数
 */
public class WriteTextDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<Long> numberStream = env.addSource(new MyNoParalleSource()).setParallelism(1);
        SingleOutputStreamOperator<Long> dataStream = numberStream.map(new MapFunction<Long, Long>() {
            @Override
            public Long map(Long value) throws Exception {
                System.out.println("接受到了数据:"+value);
                return value;
            }
        });
        SingleOutputStreamOperator<Long> filterDataStream = dataStream.filter(new FilterFunction<Long>() {
            @Override
            public boolean filter(Long number) throws Exception {
                return number % 2 == 0;
            }
        });

    // 没有集群的小伙伴也可以指定一个本地的路径,并写入一个文件中
   filterDataStream.writeAsText("your path").setParallelism(1);
        env.execute("StreamingDemoWithMyNoPralalleSource");
    }
}

1.4.3 Пользовательский приемник

В дополнение к следующему, что мы упоминали выше

Конечно, текущая ситуация с моей стороны заключается в записи данных в Redis.На данный момент нам нужно сначала ввести зависимость.

<dependency>
    <groupId>org.apache.bahir</groupId>
    <artifactId>flink-connector-redis_2.11</artifactId>
    <version>1.0</version>
</dependency>

Если вы хотите узнать о друзьях Redis, вы можете самостоятельно перейти на веб-сайт, например учебник для новичков.Следующий код был аннотирован.

/**
 * 把数据写入redis
 */
public class SinkForRedisDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> text = env.socketTextStream("xxx", xxx, "\n");
        //lpush l_words word
        //对数据进行组装,把string转化为tuple2<String,String>
        DataStream<Tuple2<String, String>> l_wordsData = text.map(new MapFunction<String, Tuple2<String, String>>() {
            @Override
            public Tuple2<String, String> map(String value) throws Exception {
                return new Tuple2<>("l_words", value);
            }
        });
        //创建redis的配置
        FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost(xxx).setPort(xxx).build();

        //创建redissink
        RedisSink<Tuple2<String, String>> redisSink = new RedisSink<>(conf, new MyRedisMapper());
        l_wordsData.addSink(redisSink);
        env.execute("StreamingDemoToRedis");

    }

    public static class MyRedisMapper implements RedisMapper<Tuple2<String, String>> {
        //表示从接收的数据中获取需要操作的redis key
        @Override
        public String getKeyFromData(Tuple2<String, String> data) {
            return data.f0;
        }
        //表示从接收的数据中获取需要操作的redis value
        @Override
        public String getValueFromData(Tuple2<String, String> data) {
            return data.f1;
        }

        @Override
        public RedisCommandDescription getCommandDescription() {
            return new RedisCommandDescription(RedisCommand.LPUSH);
        }
    }
}

1.5 Операторы пакетной обработки

Пакетная обработка Flink, как правило, средняя, ​​и она редко используется в корпоративной разработке. Тем не менее, команда, ответственная за нее, по-прежнему очень усердна, поэтому предполагается, что в ближайшем будущем она станет еще лучше. нашего предыдущего Spark - функция ядра только что рассмотрена

1.5.1 source

файл на основе

readTextFile(path)

установить на основе

fromCollection(Collection)

1.5.2 transform

Обзор оператора:

  • Карта: введите элемент, а затем верните элемент, вы можете выполнить некоторую очистку и преобразование в середине

  • FlatMap: ввод элемента, может возвращать ноль, один или несколько элементов

  • MapPartition>: Аналогично карте, обработка данных одного раздела за раз [Если вам нужно получить ссылки на сторонние ресурсы во время обработки карты, рекомендуется использовать MapPartition]

  • Фильтр: функция фильтрации, оценка входящих данных, данные, соответствующие условиям, будут оставлены.

  • Уменьшить: агрегировать данные, объединить текущий элемент и значение, возвращенное последним сокращением, для выполнения операции агрегирования, а затем вернуть новое значение.

  • Агрегат: сумма, максимум, минимум и т. д.

  • Distinct: возвращает элементы в наборе данных после дедупликации, data.distinct().

  • Соединение: внутреннее соединение

  • OuterJoin: внешняя ссылка

  • Крест: получить декартово произведение двух наборов данных

  • Union: возвращает сумму двух наборов данных, типы данных должны быть согласованы.

  • First-n: получить первые N элементов в коллекции.

  • Сортировка раздела: сортирует все разделы набора данных локально и завершает сортировку нескольких полей с помощью связанного вызова sortPartition().

1.5.3 sink

  • writeAsText(): записывает элементы построчно в виде строк, полученных путем вызова метода toString() каждого элемента.
  • writeAsCsv(): записывает кортежи в файл, разделяя их запятыми.Разделение между строками и полями настраивается. Значение каждого поля поступает из метода toString() объекта.
  • print(): печатает значение метода toString() каждого элемента в стандартный поток вывода или стандартный поток вывода ошибок.

Требование: flink получает имя пользователя из источника данных и, наконец, должен распечатать имя пользователя и информацию о возрасте.

Анализ: Следовательно, необходимо получить информацию о возрасте пользователя во время промежуточной обработки карты и использовать широковещательную переменную для обработки реляционного набора данных пользователя.

Мы использовали RichMapFunction ниже.Функция этой штуки состоит в том, чтобы добавить процесс инициализации на основе mapFunction.Во время этого процесса инициализации я могу получить широковещательную переменную и получить значение возраста на карте, а затем поставить res для вывода.

public class BroadCastDemo {
    public static void main(String[] args) throws Exception{

        //获取运行环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        //1:准备需要广播的数据
        ArrayList<Tuple2<String, Integer>> broadData = new ArrayList<>();
        broadData.add(new Tuple2<>("zhangsan",18));
        broadData.add(new Tuple2<>("lisi",19));
        broadData.add(new Tuple2<>("wangwu",20));
        DataSet<Tuple2<String, Integer>> tupleData = env.fromCollection(broadData);

        //处理需要广播的数据,把数据集转换成map类型,map中的key就是用户姓名,value就是用户年龄
        DataSet<HashMap<String, Integer>> toBroadcast = tupleData.map(new MapFunction<Tuple2<String, Integer>, HashMap<String, Integer>>() {
            @Override
            public HashMap<String, Integer> map(Tuple2<String, Integer> value) throws Exception {
                HashMap<String, Integer> res = new HashMap<>();
                res.put(value.f0, value.f1);
                return res;
            }
        });
        //源数据
        DataSource<String> data = env.fromElements("zhangsan", "lisi", "wangwu");
        //注意:在这里需要使用到RichMapFunction获取广播变量
        DataSet<String> result = data.map(new RichMapFunction<String, String>() {
            List<HashMap<String, Integer>> broadCastMap = new ArrayList<HashMap<String, Integer>>();
            HashMap<String, Integer> allMap = new HashMap<String, Integer>();

            /**
             * 这个方法只会执行一次
             * 可以在这里实现一些初始化的功能
             * 所以,就可以在open方法中获取广播变量数据
             */
            @Override
            public void open(Configuration parameters) throws Exception {
                super.open(parameters);
                //获取广播数据
                this.broadCastMap = getRuntimeContext().getBroadcastVariable("broadCastMapName");
                for (HashMap map : broadCastMap) {
                    allMap.putAll(map);
                }
            }
            @Override
            public String map(String value) throws Exception {
                Integer age = allMap.get(value);
                return value + "," + age;
            }
        }).withBroadcastSet(toBroadcast, "broadCastMapName");//执行广播数据的操作
        result.print();
    }
}

Время выполнения будет больше, а напечатанный результат - Чжан Сан, Ли Си, Ван Ву и их возраст.

1.5.5 Счетчик Flink

Аккумулятор — это аккумулятор, аналогичный сценарию применения счетчика Mapreduce, он хорошо может наблюдать за изменением данных задач во время работы.
Аккумулятором можно управлять в операторной функции в задаче задания Flink, но окончательный результат работы аккумулятора можно получить только после завершения выполнения задачи.

Счетчик — это конкретная реализация аккумулятора (Accumulator).
IntCounter, LongCounter и DoubleCounter

использование

1:创建累加器
private IntCounter numLines = new IntCounter(); 
2:注册累加器
getRuntimeContext().addAccumulator("num-lines", this.numLines);
3:使用累加器
this.numLines.add(1); 
4:获取累加器的结果
myJobExecutionResult.getAccumulatorResult("num-lines")

пример кода

public class CounterDemo {
    public static void main(String[] args) throws Exception{
        //获取运行环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSource<String> data = env.fromElements("a", "b", "c", "d");
        DataSet<String> result = data.map(new RichMapFunction<String, String>() {

            //1:创建累加器
            private IntCounter numLines = new IntCounter();
            @Override
            public void open(Configuration parameters) throws Exception {
                super.open(parameters);
                //2:注册累加器
                getRuntimeContext().addAccumulator("num-lines",this.numLines);

            }
            //int sum = 0;
            @Override
            public String map(String value) throws Exception {
                //如果并行度为1,使用普通的累加求和即可,但是设置多个并行度,则普通的累加求和结果就不准了
                //sum++;
                //System.out.println("sum:"+sum);
                this.numLines.add(1);
                return value;
            }
        }).setParallelism(8);
        //如果要获取counter的值,只能是任务
        //result.print();
        result.writeAsText("d:\\data\\mycounter");
        JobExecutionResult jobResult = env.execute("counter");
        //3:获取累加器
        int num = jobResult.getAccumulatorResult("num-lines");
        System.out.println("num:"+num);

    }
}

Тут почти упоминается оператор пакетной обработки.Заинтересованные друзья могут скопировать код и запустить.Если вам неинтересно, то ладно.В любом случае сейчас в основном используются операторы реального времени, что мало на что влияет.

1.5.6 Состояние оператора

Вернуться к примеру с подсчетом слов

public class WordCount {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> data = env.socketTextStream("localhost", 8888);
        SingleOutputStreamOperator<Tuple2<String, Integer>> result = data.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String line, Collector<Tuple2<String, Integer>> collector) throws Exception {
                String[] fields = line.split(",");
                for (String word : fields) {
                    collector.collect(new Tuple2<>(word, 1));
                }
            }
        }).keyBy("0")
                .sum(1);
        result.print();
        env.execute("WordCount");
    }
}

На этом этапе следует отметить, что мы должны контролировать порт 8888, чтобы запустить программу, иначе будет сообщено об ошибке отказа в подключении. Поскольку я делаю это под Windows, поэтому у меня есть netcat, чтобы помочь.На данный момент я сначала запускаю netcat, а затем nc -lk 8888 для мониторинга порта 8888.

Затем я снова ввожу некоторые слова, в это время мы смотрим нашу информацию о печати.

4> (hadoop,1)
4> (hadoop,2)
4> (flink,1)
4> (flink,2)
1> (hive,1)
1> (hive,2)
1> (hive,3)

В настоящее время мы обнаружим, что Flink является обработкой в ​​реальном времени в истинном смысле, для обработки по одному, и вы обнаружите, что в Spark вам необходимо использовать расширенные операторы updateStateByKey или mapWithState для достижения накопления, которое может быть легко сделать во Flink.

Почему это? Просто потому, что на официальном сайте написано: Flink — это поток данных с отслеживанием состояния.

Поэтому состояние (состояние) является основным направлением нашего обучения Flink. Мы объясним позже

finally

···