Взгляните на различные состояния Flink

Flink

предисловие

Мы, наконец, открыли нашу ключевую статью, Следующий код - это код, используемый для объяснения оператора на официальном сайте, поэтому, если вы чувствуете, что официальный сайт выглядит сложно, зайдите сюда и давайте рассмотрим его медленно.

Во-первых, снять его

1.1 Полный код wordCount от Flink

Сначала мы возвращаемся к последнему блоку подсчета слов.

/**
 * 单词计数
 */
public class WordCount {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> data = env.socketTextStream("localhost", 8888);
        SingleOutputStreamOperator<Tuple2<String, Integer>> result = data.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String line, Collector<Tuple2<String, Integer>> collector) throws Exception {
                String[] fields = line.split(",");
                for (String word : fields) {
                    collector.collect(new Tuple2<>(word, 1));
                }
            }
        }).keyBy("0")
                .sum(1);

        result.print();

        env.execute("WordCount");
    }
}

1.2 Анализ потока кода ###

Логика подсчета слов не поясняется, она та же. Начните с первого предложения. Сначала мы определяем конфигурацию

Configuration conf = new Configuration();

Затем идет вход в программу

StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);

Это количество слов, на которое мы должны обратить внимание, для кода

env.execute("test word count");

Мой подход здесь заключается в том, чтобы генерировать исключение напрямую, почему? Потому что, если здесь в продакшене есть исключение, и у вашей задачи возникают проблемы с запуском, какой смысл его ловить, верно? Так что мы можем просто выбросить это здесь.

После этого данные можно получить из сокета, и соответствующий код

DataStreamSource<String> dataStream = env.socketTextStream("localhost", 8888).setParallelism(1);

После этого пишется количество слов, и оно не поясняется.

Из предыдущей статьи мы знаем, что если вы откроете netcat и в это время введете несколько слов, вы обнаружите, что Flink помогает нам сохранять промежуточное состояние, если через Spark Streaming.
Чтобы реализовать эту функцию, мы должны использовать расширенные операторы, такие как checkPoint, updateStateByKey или mapWithState, или мы сохраняем промежуточное состояние на каком-либо носителе, таком как Redis, Hbase и т. д. Итак, на этот раз поговорим о том, как Flink реализует эту функцию накопления через состояние.

1.3 Состояние Флинка

состояние: обычно относится к состоянию конкретной задачи/оператора. Состояние может быть записано, и данные могут быть восстановлены в случае сбоя.Два основных типа состояния: состояние с ключом, состояние оператора., оба из которых могут существовать в двух формах:Необработанное состояние и управляемое состояние

Например, после подсчета слов мы только что передали код «keyBy()», состояние следующего оператора — это состояние ключа, но если я удалю этот код, оставшийся оператор — это состояние работы, очень простое. отличительное условие — keyBy

Управляемое состояние: состояние, управляемое фреймворком Flink, которое мы обычно используем.

Исходное состояние: конкретная структура данных состояния управляется пользователем.Когда фреймворк выполняет контрольную точку, он использует byte[] для чтения и записи содержимого состояния и ничего не знает о своей внутренней структуре данных. Обычно для состояния DataStream рекомендуется использовать управляемое состояние, при реализации определяемого пользователем оператора используется необработанное состояние. Но когда мы используем Flink, мы в основном не настраиваем состояние.

1.3.1 operator state

Нет состояния операции тасования, иными словами, нет операции keyBy

  1. Состояние оператора — это состояние уровня задачи, грубо говоря, каждой задаче соответствует состояние.
  2. Каждый раздел (задача) в источнике Kafka Connector должен записывать такую ​​информацию, как раздел и смещение потребляемой темы.
  3. Состояние оператора имеет только одно управляемое состояние: ValueState.

1.3.2 Keyed State

  1. keyed state записывает состояние каждой клавиши

  2. Существует шесть типов управляемого состояния Keyed state:

  3. ValueState

  4. ListState

  5. MapState

  6. ReducingState

  7. AggregatingState

  8. FoldingState (это не слишком важно)

2. Демонстрация различных состояний

2.1 ValueState

Теперь поговорим о требованиях: при количестве полученных элементов одного ключа, равном 3 или более 3, вычисляется среднее значение значений этих элементов. Вычислить среднее значение каждых 3 элементов в ключевом потоке

Это требование нетрудно понять.В следующем коде метод main имитирует часть данных

DataStreamSource<Tuple2<Long, Long>> dataStreamSource = env.fromElements(
Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), 
Tuple2.of(1L, 7L),Tuple2.of(2L, 4L), 
Tuple2.of(2L, 2L), Tuple2.of(2L, 5L));

Здесь данные с ключом 1 появляются 3 раза, и данные с ключом 2 тоже появляются 3 раза, затем я вычисляю среднее из них, это так просто. Тогда наш результат должен быть (1,5),(2,3,6666...).

2.1.1 Определить класс TestKeyedstateMain

public class TestKeyedStateMain {}

2.3 Передняя часть кода

Это очень просто, просто сначала получите запись программы, а затем добавьте данные моделирования, которые я только что упомянул.

В это время то, что мы моделируем сами, — это данные, такие как ключ-значение, поэтому мы даже сохраняем операцию flatMap, просто keyBy, и нажимаем 0 (то есть первую позицию в первом Tuple.of()) keyBy , и поскольку мы хотим взять среднее значение, а операторы, предоставляемые самим Flink, ограничены, нам нужно выполнить некоторые дополнительные операции.

На данный момент мы выбрали flatMap, но нам нужно реализовать себя для управления состоянием, что немного похоже на пользовательский оператор.

2.1.2 Кодекс государственного самоуправления

public class CountWindowAverageWithValueState
    extends RichFlatMapFunction<Tuple2<Long,Long>,Tuple2<Long,Double>>{

}

Здесь мы определяем входной тип Tuple2 и выходной тип Tuple2, которые соответствуют паре ключ-значение смоделированных данных, и выходные данные (1,5), (2,3,6666...)

Поскольку нам нужно сохранить наше состояние через ValueState, мы инициализируем ValueState

//第一个Long用来保存key出现的次数
//第二个Long代表和key对应的value的总值
ValueState<Tuple2<Long, Long>> countAndSum;

И нам нужно обратить внимание, что у каждого ключа в нашем источнике данных будет свое соответствующее valueState.

После наследования RichFlatMapFunction мы можем переопределить два метода, один — open(), а другой — flatMap(). Метод open() будет выполняться только один раз. В open мы зарегистрируем состояние и передадим его во Flink. управлять.

Процедура регистрации состояния исправлена, ValueStateDescriptor

@Override
public void open(Configuration parameters) throws Exception {

   //注册状态,其实就是初始化一个描述,这个描述有两个参数
   //一个参数是一个名字,另一个也是固定套路,对应你Tuple的参数类型

   //比如你Tuple<Long, Long>对应就是Types.LONG, Types.LONG
    ValueStateDescriptor<Tuple2<Long, Long>> descriptor = new ValueStateDescriptor<>(
            "average", //状态的名字
            Types.TUPLE(Types.LONG, Types.LONG));//状态存储的数据类型

    通过描述从Flink去获取状态
    countAndSum = getRuntimeContext().getState(descriptor);
}

Это фиксированная операция, вы можете сначала запомнить ее, она просто понимается как регистрация состояния, а затем получение его для использования.

@Override
public void flatMap(Tuple2<Long, Long> element,
                    Collector<Tuple2<Long, Double>> out) throws Exception {

    // 当前key出现的次数,及对应value的和
    Tuple2<Long, Long> currentState = countAndSum.value();
    //如果第一次进来,currentState为空,进行初始化,简单设置为0
    if(currentState == null){
        currentState = Tuple2.of(0L,0L);
    }
    //更新状态中的元素
    currentState.f0+=1;
    //更新状态中的总值
    currentState.f1+= element.f1;
    //更新状态
    countAndSum.update(currentState);

    //判断
    if(currentState.f0 >= 3){
        double avg=(double)currentState.f1 / currentState.f0;
        //对出对应的key及其对应的平均值
        out.collect(Tuple2.of(element.f0,avg));

        // 算了一次累计3,清除状态
        countAndSum.clear();
    }
}

Обратите внимание, что currentState.f0 и f1 здесь соответствуют двум длинным параметрам Tuple 2. Здесь flatMap основного метода дополняется связанным с состоянием CountWindowAverageWithValueState()

public class TestKeyedStateMain {
    public static void main(String[] args) throws  Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<Tuple2<Long, Long>> dataStreamSource =
                env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L),
                        Tuple2.of(2L, 4L), Tuple2.of(2L, 2L), Tuple2.of(2L, 5L));

        // 输出:
        //(1,5.0)
        //(2,3.6666666666666665)
        dataStreamSource
                .keyBy(0)
                .flatMap(new CountWindowAverageWithValueState())
                .print();

        env.execute("TestStatefulApi");
    }
}

2.1.3 Полный обзор кода

/**
 *  ValueState<T> :这个状态为每一个 key 保存一个值
 *  value() 获取状态值
 *  update() 更新状态值
 *  clear() 清除状态
 */
public class CountWindowAverageWithValueState
        extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Double>> {
    // 用以保存每个 key 出现的次数,以及这个 key 对应的 value 的总值
    // managed keyed state
    //1. ValueState 保存的是对应的一个 key 的一个状态值
    private ValueState<Tuple2<Long, Long>> countAndSum;

    @Override
    public void open(Configuration parameters) throws Exception {
        // 注册状态
        ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
                new ValueStateDescriptor<Tuple2<Long, Long>>(
                        "average",  // 状态的名字
                        Types.TUPLE(Types.LONG, Types.LONG)); // 状态存储的数据类型
        countAndSum = getRuntimeContext().getState(descriptor);
    }

    @Override
    public void flatMap(Tuple2<Long, Long> element,
                        Collector<Tuple2<Long, Double>> out) throws Exception {
        // 拿到当前的 key 的状态值
        Tuple2<Long, Long> currentState = countAndSum.value();

        // 如果状态值还没有初始化,则初始化
        if (currentState == null) {
            currentState = Tuple2.of(0L, 0L);
        }

        // 更新状态值中的元素的个数
        currentState.f0 += 1;

        // 更新状态值中的总值
        currentState.f1 += element.f1;

        // 更新状态
        countAndSum.update(currentState);

        // 判断,如果当前的 key 出现了 3 次,则需要计算平均值,并且输出
        if (currentState.f0 >= 3) {
            double avg = (double)currentState.f1 / currentState.f0;
            // 输出 key 及其对应的平均值
            out.collect(Tuple2.of(element.f0, avg));
            //  清空状态值
            countAndSum.clear();
        }
    }
}


public class TestKeyedStateMain {
    public static void main(String[] args) throws  Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<Tuple2<Long, Long>> dataStreamSource =
                env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L),
                        Tuple2.of(2L, 4L), Tuple2.of(2L, 2L), Tuple2.of(2L, 5L));

        // 输出:
        //(1,5.0)
        //(2,3.6666666666666665)
        dataStreamSource
                .keyBy(0)
                .flatMap(new CountWindowAverageWithValueState())
                .print();

        env.execute("TestStatefulApi");
    }
}

2.2 ListState

Он по-прежнему реализует ту же функцию, что и выше, потому что процедура в основном такая же, поэтому это не объясняется.

import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.curator.org.apache.curator.shaded.com.google.common.collect.Lists;
import org.apache.flink.util.Collector;

import java.util.ArrayList;
import java.util.Collections;

public class CountWindowAverageWithListState
        extends RichFlatMapFunction<Tuple2<Long,Long>,Tuple2<Long,Double>>{
    //List里面保存这所有的key出现的次数
    ListState<Tuple2<Long, Long>> elementByKey;

    @Override
    public void open(Configuration parameters) throws Exception {
        ListStateDescriptor<Tuple2<Long, Long>> descriptor = new ListStateDescriptor<>(
                "average",
                Types.TUPLE(Types.LONG, Types.LONG));
       elementByKey = getRuntimeContext().getListState(descriptor);
    }

    @Override
    public void flatMap(Tuple2<Long, Long> element,
                        Collector<Tuple2<Long, Double>> out) throws  Exception {
        Iterable<Tuple2<Long, Long>> currentState = elementByKey.get();
        //初始化
        if(currentState == null){
            elementByKey.addAll(Collections.emptyList());
        }
        //更新状态
        elementByKey.add(element);
        ArrayList<Tuple2<Long, Long>> allElement = Lists.newArrayList(elementByKey.get());
        if(allElement.size() >= 3){
            long count=0;
            long sum=0;
            for (Tuple2<Long,Long> ele:allElement){
                count++;
                sum += ele.f1;
            }
            double avg=(double) sum/count;
            out.collect(new Tuple2<>(element.f0,avg));
            //清除数据
            elementByKey.clear();
        }

    }
}

2.4 MapState

Он также достигает тех же требований, но на самом деле с этим есть проблема, потому что mapState отличается от двух состояний выше, особенность mapState заключается в том, что он будет выполнять операцию перезаписи для того же ключа, то есть того же

Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), 
Tuple2.of(1L, 7L),Tuple2.of(2L, 4L), 
Tuple2.of(2L, 2L), Tuple2.of(2L, 5L));

Эти данные после прихода Tuple2.of(1L, 3L) и повторного прихода Tuple2.of(1L, 5L) заменят предыдущий 3L на 5L вместо подсчета. По сути, это то же самое, что и карта Java.

Таким образом, объяснение в одном предложении заключается в том, что данные с одним и тем же ключом в mapState будут в одном и том же состоянии, поэтому на этот раз мы будем использовать ключ строкового типа, который оформлен в виде 1_1, 1_2, 1_3.

2.4.1 Предварительные условия

На этот раз мы используем String, и есть старая процедура открытого метода, которая используется после регистрации, просто запомните эту процедуру.

public class CountWindowAverageWithMapState
    extends RichFlatMapFunction<Tuple2<Long,Long>,Tuple2<Long,Double>> {
        private MapState<String,Long> mapState;
        @Override
        public void open(Configuration parameters) throws Exception {
            MapStateDescriptor<String, Long> descriptor = new MapStateDescriptor<>(
                    "average",
                    String.class, Long.class);
            mapState = getRuntimeContext().getMapState(descriptor);
        }
    }

2.4.2 Написание плоской карты

На этом этапе мы можем поместить элемент в эту структуру карты.

@Override
public void flatMap(Tuple2<Long, Long> element,
                    Collector<Tuple2<Long, Double>> out) throws Exception {
    mapState.put(UUID.randomUUID().toString(),element.f1);
    ArrayList<Long> arrayList = Lists.newArrayList(mapState.values());
    if(arrayList.size() >= 3){
        long count=0;
        long sum=0;
        for (Long ele:arrayList){
            count++;
            sum += ele;
        }
        double avg = (double) sum/count;
        out.collect(new Tuple2<Long,Double>(element.f0,avg));
        mapState.clear();
    }
}

Наша процедура такова. Мы используем UUID в ключе, потому что UUID не будет повторяться. Хотя эта операция заставляет нас не знать, как выглядит ключ, мы вообще не заботимся о ключе. Так что в принципе не важно.

Затем мы оцениваем, больше ли длина Arraylist 3, и если она больше 3, мы можем выполнить наш алгоритм для вычисления среднего значения. В этот момент мы получаем результат операции, который является нормальным

В это время некоторые друзья скажут: эй, твой результат слишком фальшивый, почему ключ после UUID все еще такой нормальный номер?

На этом этапе давайте отметим, что ключ, который я взял, был вовсе не ключом, который я использовал для UUID, а ключом, который я принес, когда пришли данные, так что это нормально.

2.5 Используйте ReductionState для реализации функции оператора суммы

ReductionState имеет эффект агрегирования, поэтому он может имитировать накопление суммы и эффект конечного результата.

2.5.1 Преамбула

На данный момент мы не находимся в фиксированной процедуре.Сначала у нас будет дескриптор ReductionStateDescriptor для получения описания, и в дополнение к имени мы реализуем интерфейс ReduceFunction, а затем есть тип данных Long.class

По сути, по сравнению с предыдущей, это всего лишь еще одна операция по реализации интерфейса.

public class SumFunction
    extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {

        // 这个东西是拿来保存同一个key累加的值
        ReducingState<Long> reducingState;

        @Override
        public void open(Configuration parameters) throws Exception {
            ReducingStateDescriptor<Long> descriptor = new ReducingStateDescriptor<>(
                    "sum",//状态的名字
                    new ReduceFunction<Long>() {//聚合函数
                        @Override
                        public Long reduce(Long v1, Long v2) throws Exception {
                            return v1 + v2;
                        }
                    }, Long.class);
            reducingState = getRuntimeContext().getReducingState(descriptor);

        }

Тогда это легко

@Override
public void flatMap(Tuple2<Long, Long> element,
                    Collector<Tuple2<Long, Long>> out) throws Exception {
    //将数据放到状态中
    reducingState.add(element.f1);
    out.collect(new Tuple2(element.f0,reducingState.get()));

}

2.6 AggregatingState

В это время мы хотим реализовать такую ​​функцию.Например, наши данные все те же.Эффект, которого мы хотим добиться,

(1,Contains:3 and 5 and 7)
(2,Contains:4 and 2 and 5)

Это функция для записи всех данных, что один и тот же ключ появился. Этот оператор в основном самый сложный 😂, если предыдущий слишком сложен, вы можете пропустить его и сразу перейти к следующему примеру требования симуляции. Этот пример будет подробно объяснен. Мы будем использовать это состояние позже, поэтому мы не буду спешить, расскажи потом

2.6.1 Преамбула

Во-первых, ключ имеет тип long, но вывод становится String, поэтому мы используем Tuple в качестве вывода.

В настоящее время наш класс описания немного отличается от обычного, нажмите на исходный код, чтобы увидеть

Опубликовать на Baidu Translate

Первый — тип входных данных, третий — тип выходных данных, а средний — вспомогательная переменная для накопления.На данный момент вам необходимо реализовать такой интерфейс, как new AggregateFunction(), и вы обнаружите, что у вас есть 4 вещи, которые нужно реализовать сразу.

createAccumulator---创建一个累加变量
    我们就是想把结果给一个一个拼起来,这个东西充当String str = ""的作用
add---拼接的作用
    return accumulator+" and "+value;也就是Contains:+value

merge---这玩意在这里没用

getResult---得出结果
    此时就是将accumulator+" and "+value的最终值给输出出来

В начале разработки AggregatingState слияние было разработано для того, чтобы объяснить, что результаты, рассчитанные в разных задачах, необходимо объединять посредством операции слияния, но наше текущее требование состоит в том, чтобы один и тот же ключ был в одной и той же задаче, а это значит, что мы не может возникнуть ситуация, что один и тот же ключ и разные значения, рассчитанные разными задачами, нужно объединить.Даже если задач 3, наш ключ 1 будет в задаче1, а ключ 2 будет в задаче2

Так что на данный момент эта вещь действительно бесполезна

import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.AggregatingState;
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;

public class ContainsValueFunction
        extends RichFlatMapFunction<Tuple2<Long,Long>,Tuple2<Long,String>> {
    AggregatingState<Long, String> aggregatingState;
    @Override
    public void open(Configuration parameters) throws Exception {
        AggregatingStateDescriptor<Long, String, String> descriptor = new AggregatingStateDescriptor<>(
                "totalStr", //状态的名字
                new AggregateFunction<Long, String, String>() {
                    @Override
                    public String createAccumulator() {
                        return "Contains:";
                    }

                    @Override
                    public String add(Long value, String accumulator) {
                        if("Contains:".equals(accumulator)){
                            return accumulator+value;

                        }

                        return accumulator+" and "+value;
                    }

                    @Override
                    public String getResult(String s) {
                        return s;
                    }

                    @Override
                    public String merge(String accumulator1, String accumulator2) {
                        if("Contains:".equals(accumulator1)){
                            return accumulator2;
                        }
                        if("Contains:".equals(accumulator2)){
                            return accumulator1;
                        }
                        String[] fields = accumulator1.split(":");
                        return accumulator2+fields[1];
                    }
                }, String.class);

        aggregatingState = getRuntimeContext().getAggregatingState(descriptor);

    }

Работа в FlatMap состоит в том, чтобы сохранить данные в агрегировании, когда она приходит, а затем выводится ключ данных и элементы, извлеченные из агрегата.

    @Override
    public void flatMap(Tuple2<Long, Long> element,
                        Collector<Tuple2<Long, String>> out) throws Exception {
        aggregatingState.add(element.f1);
        out.collect(new Tuple2<>(element.f0,aggregatingState.get()));
    }
}

2.7 FoldingState

2.8 Моделирование спроса

Данные о номере заказа в двух потоках объединены вместе, и журналы, напечатанные разными линиями трафика, могут не совпадать, поэтому у нас на самом деле есть ряд возможностей столкнуться с этой необходимостью помещать данные разных бизнес-линий. является эффектом ETL в реальном времени, чем ETL в реальном времени.

Так почему бы нам не присоединиться? Не забывайте, что это сцена в реальном времени, а данные приходят быстро или медленно, конечно, какие-то специальные сцены и методы мы здесь рассматривать не будем.

данные orderinfo1, данные есть в топике в Kafka

订单号:123,商品名:拖把,价格:30.0
订单号:234,商品名:牙膏,价格:20.0
订单号:345,商品名:被子,价格:114.4
订单号:333,商品名:杯子,价格:112.2
订单号:444,商品名:Mac电脑,价格:30000.0

данные orderinfo2, тоже в другой теме кафки

订单号:123,下单时间:2019-11-11 10:11:12,下单地点:江苏
订单号:234,下单时间:2019-11-11 11:11:13,下单地点:云南
订单号:345,下单时间:2019-11-11 12:11:14,下单地点:安徽
订单号:333,下单时间:2019-11-11 13:11:15,下单地点:北京
订单号:444,下单时间:2019-11-11 14:11:16,下单地点:深圳

Вывод такой: (123, швабра, 30.0, 2019-11-11 10:11:12, Цзянсу)

2.8.1 Установка полки

В любом случае, эти две строки кода все равно не запускаются.

public class OrderETLStream {
public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.execute("OrderETLStream");
}

2.8.2 Два потока данных

Конечно, сама Kafka используется для предоставления данных, но теперь у меня нет локальной среды, поэтому я сначала использую пользовательский источник данных для моделирования с объектно-ориентированным мышлением.

orderInfo1 и orderInfo2 очень просты, то есть определяют поля и предоставляют методы построения, геттер, сеттер, базовые методы toString.

orderInfo1.java

public class OrderInfo1 {
    //订单号
    private Long orderId;
    //商品
    private String productName;
    //价格
    private double price;

    public static OrderInfo1 line2Info1(String line){
        String[] fields = line.split(",");
        OrderInfo1 orderInfo1 = new OrderInfo1();
        orderInfo1.setOrderId(Long.parseLong(fields[0]));
        orderInfo1.setProductName(fields[1]);
        orderInfo1.setPrice(Double.parseDouble(fields[2]));
        return orderInfo1;
    }

    public OrderInfo1(){
    }

    @Override
    public String toString() {
        return "OrderInfo1{" +
                "orderId=" + orderId +
                ", productName='" + productName + '\'' +
                ", price=" + price +
                '}';
    }

    public OrderInfo1(Long orderId, String productName, double price) {
        this.orderId = orderId;
        this.productName = productName;
        this.price = price;
    }

    public Long getOrderId() {
        return orderId;
    }

    public void setOrderId(Long orderId) {
        this.orderId = orderId;
    }

    public String getProductName() {
        return productName;
    }

    public void setProductName(String productName) {
        this.productName = productName;
    }

    public double getPrice() {
        return price;
    }

    public void setPrice(double price) {
        this.price = price;
    }
}

orderInfo2

public class OrderInfo2 {
    //订单号
    private Long orderId;
    //下单日期
    private String orderDate;
    //下单的地点
    private String address;

    public static OrderInfo2 line2Info2(String line){
        String[] fields = line.split(",");
        OrderInfo2 orderInfo2 = new OrderInfo2();
        orderInfo2.setOrderId(Long.parseLong(fields[0]));
        orderInfo2.setOrderDate(fields[1]);
        orderInfo2.setAddress(fields[2]);
        return orderInfo2;
    }

    public OrderInfo2(){

    }

    @Override
    public String toString() {
        return "OrderInfo2{" +
                "orderId=" + orderId +
                ", orderDate='" + orderDate + '\'' +
                ", address='" + address + '\'' +
                '}';
    }

    public OrderInfo2(Long orderId, String orderDate, String address) {
        this.orderId = orderId;
        this.orderDate = orderDate;
        this.address = address;
    }

    public Long getOrderId() {
        return orderId;
    }

    public void setOrderId(Long orderId) {
        this.orderId = orderId;
    }

    public String getOrderDate() {
        return orderDate;
    }

    public void setOrderDate(String orderDate) {
        this.orderDate = orderDate;
    }

    public String getAddress() {
        return address;
    }

    public void setAddress(String address) {
        this.address = address;
    }
}

2.8.3 Реализация источников данных

В этом нет ничего сложного, после реализации интерфейса SourceFunction нам нужно реализовать два метода, один запускается, а другой отменяется.

Первый параметр - FilePath, почему вам это нужно? Поскольку у нас есть два источника данных, так как мы различаем, какой из них есть? Функция этого параметра - это отличить функцию. В это время нам нужно пройти это значение через конструктор класса

После этого логика метода отмены очень проста, мы должны создать поток для чтения нашего файла, когда поток пуст, просто закрываем его.

Логика метода run несложная.Первый - прочитать файл.Когда данные строки не пустые, выполнить .collect(line) для отправки данных вниз по течению.Конечно, это слишком быстро.Для эффект, я даю ему остановиться., ради правды, я устанавливаю случайное значение (в пределах 0~500 миллисекунд), это вся логика нашего кода

FileSource.java

public class FileSource implements SourceFunction<String> {
    private String filePath;
    BufferedReader reader;
    Random random=new Random();
    public FileSource(String filePath){
        this.filePath=filePath;
    }

    @Override
    public void run(SourceContext<String> sct) throws Exception {
      reader = new BufferedReader(
                new InputStreamReader(
                        new FileInputStream(filePath)));
      String line=null;
      while((line = reader.readLine()) != null){
          //模拟数据源源不断的感觉,所以我让线程sleep一下
          TimeUnit.MILLISECONDS.sleep(random.nextInt(500));
          sct.collect(line);
      }

    }

    @Override
    public void cancel() {
        try{
            if(reader == null ){
                reader.close();
            }
        }catch (Exception e){
            e.printStackTrace();
        }

    }
}

2.8.4 Постоянный класс

Обычно мы также определяем класс для размещения наших констант.Конечно, этот пример очень простой, так что это не большая проблема.В реальном процессе разработки это рекомендуется делать.

Constants.java

public class Constants {
    public static final String ORDER_INFO1_PATH="I:/OrderInfo1.txt";
    public static final String ORDER_INFO2_PATH="I:/OrderInfo2.txt";
}

2.8.5 Назад к OrderETLStream

Первый — получение двух потоков данных

DataStreamSource<String> info1Stream = env.addSource(new FileSource(Constants.ORDER_INFO1_PATH));
DataStreamSource<String> info2Stream = env.addSource(new FileSource(Constants.ORDER_INFO2_PATH));

В этот момент мы подходим к чтению содержимого файла, но не перед лицом этих строк это не очень хорошая операция, тогда мы хотим превратить их в нашу строку над определенными объектами для манипулирования, конечно, внимательный младший партнер заметил, что мой orderInfo1 и orderInfo2 есть статический метод, статический метод помогает нам вырезать строку, а затем преобразовать в соответствующий тип поля объекта

После этого я использовал лямбда выражение.

SingleOutputStreamOperator<OrderInfo1> orderInfo1Stream = info1Stream
            .map(line -> OrderInfo1.line2Info1(line));

SingleOutputStreamOperator<OrderInfo2> orderInfo2Stream = info2Stream
            .map(line -> OrderInfo2.line2Info2(line));

После этого мы используем keyBy для группировки этих двух источников данных и берем порядковый номер для ключевого поля.

KeyedStream<OrderInfo1, Long> keyByInfo1 = orderInfo1Stream
        .keyBy(orderInfo1 -> orderInfo1.getOrderId());

KeyedStream<OrderInfo2, Long> keyByInfo2 = orderInfo2Stream
        .keyBy(orderInfo2 -> orderInfo2.getOrderId());

Затем используйте соединение, чтобы соединить его вместе, но мы рассматриваем проблему временной последовательности и непосредственно выполняем соединение, эффект операции может быть неправильным, поэтому мы должны использовать состояние, которое мы только что узнали, здесь мы берем наиболее распространенное значение состояния для полный

2.8.6

Обратите внимание, что здесь используется RichCoFlatMapFunction, о котором упоминалось в предыдущей статье.Для двух разных источников данных мы используем co, а тип выходных данных — Tuple2, потому что это комбинация двух.

Следующая подпрограмма — это старая операция до того, как сначала открывается, сначала описывается, затем регистрируется состояние после описания и используется после регистрации.

flatMap1, и flatMap2, если в flatMap есть данные 123, а если во втором потоке данные 2, 1, то первый поток может объединить второй 123, но если первый поток придет первым Данные 123. Если второй поток не прибыл, то используйте обновление, чтобы сохранить его.Второй поток такой же, и первый поток оценивается. Это наша логика

EnrichmentFunction.java

/**
 * IN1 第一个类的数据类型
 * IN2 第二个流的数据类型
 * OUT 输出的数据类型
 */
public class EnrichmentFunction
        extends RichCoFlatMapFunction<OrderInfo1,OrderInfo2,
        Tuple2<OrderInfo1,OrderInfo2>> {
    //同一个订单号


    private ValueState<OrderInfo1> orderInfo1ValueState;
    private ValueState<OrderInfo2> orderInfo2ValueState;
    @Override
    public void open(Configuration parameters) throws Exception {
        ValueStateDescriptor<OrderInfo1> descriptor1 = new ValueStateDescriptor<>(
                "info1",
                OrderInfo1.class
        );
        ValueStateDescriptor<OrderInfo2> descriptor2 = new ValueStateDescriptor<>(
                "info2",
                OrderInfo2.class
        );
        orderInfo1ValueState = getRuntimeContext().getState(descriptor1);
        orderInfo2ValueState = getRuntimeContext().getState(descriptor2);
    }
    //第一个流的 key

    //123
    //123
    @Override
    public void flatMap1(OrderInfo1 orderInfo1,
                         Collector<Tuple2<OrderInfo1, OrderInfo2>> out) throws Exception {
    //这个方法要是被运行,那说明第一个流肯定是来数据了。
        OrderInfo2 value2 = orderInfo2ValueState.value();
        if(value2 != null ){
            orderInfo2ValueState.clear();
            out.collect(Tuple2.of(orderInfo1,value2));
        }else{
            orderInfo1ValueState.update(orderInfo1);
        }
    }
    //第二个流的key
    @Override
    public void flatMap2(OrderInfo2 orderInfo2,
                         Collector<Tuple2<OrderInfo1, OrderInfo2>> out) throws Exception {
        OrderInfo1 value1 = orderInfo1ValueState.value();
        if(value1 != null){
            orderInfo1ValueState.clear();;
            out.collect(Tuple2.of(value1,orderInfo2));
        }else{
            orderInfo2ValueState.update(orderInfo2);

        }
    }
}

Выполните код в это время, запускайте нормально, очень хорошо

finally

Только что продемонстрированные состояния — это практики, упомянутые на официальном сайте, я думаю, что обучение должно начинаться с официального сайта, поэтому я взял эти примеры и упомянул их.

Продолжим кодить в следующей статье🤣