Серия Flink (7) — управление состоянием Flink и механизм контрольных точек

Flink

1. Классификация статуса

По сравнению с другими платформами потоковых вычислений важной особенностью Flink является поддержка вычислений с отслеживанием состояния. То есть вы можете сохранить промежуточные результаты расчета и предоставить их для последующих расчетов:

https://github.com/heibaiying

В частности, Flink делит состояние на Keyed State и Operator State:

2.1 Состояние оператора

Состояние оператора: как следует из названия, состояние привязано к оператору, и другие операторы не могут получить доступ к состоянию оператора. Объяснение состояния оператора в официальной документации:each operator state is bound to one parallel operator instance, поэтому точнее сказать, что состояние оператора привязано к параллельному экземпляру оператора, то есть, если параллелизм оператора равен 2, то у него должно быть два соответствующих состояния оператора:

https://github.com/heibaiying

2.2 Состояние ключа

Состояние с ключом: это особое состояние оператора, то есть состояние различается в соответствии со значением ключа, и Flink будет поддерживать экземпляр состояния для каждого типа значения ключа. Как показано на рисунке ниже, каждый цвет представляет отдельное значение ключа, соответствующее четырем разным экземплярам состояния. Обратите внимание, что ключевое состояние может быть толькоKeyedStreamчтобы использовать его, мы можем передатьstream.keyBy(...)получитьKeyedStream.

https://github.com/heibaiying

2. Государственное программирование

2.1 Состояние ключа

Flink предоставляет следующие форматы данных для управления и хранения Keyed State:

  • ValueState: Сохраняет состояние одного типа значения. можно использоватьupdate(T)обновить и пройтиT value()искать.
  • ListState: Сохраняет состояние типа списка. можно использоватьadd(T)илиaddAll(List)добавить элементы и передатьget()Получить весь список.
  • ReducingState: используется для хранения результата, вычисленного с помощью функции ReduceFunction, используйтеadd(T)Добавьте элементы.
  • AggregatingState: используется для хранения результата, рассчитанного AggregatingState, используйтеadd(IN)Добавьте элементы.
  • FoldingState: Помечен как устаревший и будет удален в будущих версиях.Официально рекомендуется использоватьAggregatingStateзаменять.
  • MapState: поддерживает состояние типа карты.

Все вышеперечисленные добавления, удаления, модификации и методы поиска не нужно запоминать, и их можно вызвать с помощью синтаксических подсказок при их использовании. Вот конкретный пример использования: Предположим, мы разрабатываем систему мониторинга.Когда данные мониторинга превышают пороговое значение определенное количество раз, необходимо выдать аварийное сообщение. Причина, по которой он должен достигать определенного количества раз, заключается в том, что по случайным причинам случайное превышение порога один раз ничего не значит, поэтому для срабатывания тревоги необходимо достичь определенного количества раз, что требует использования Государственное программирование Флинка. Соответствующий код выглядит следующим образом:

public class ThresholdWarning extends 
    RichFlatMapFunction<Tuple2<String, Long>, Tuple2<String, List<Long>>> {

    // 通过ListState来存储非正常数据的状态
    private transient ListState<Long> abnormalData;
    // 需要监控的阈值
    private Long threshold;
    // 触发报警的次数
    private Integer numberOfTimes;

    ThresholdWarning(Long threshold, Integer numberOfTimes) {
        this.threshold = threshold;
        this.numberOfTimes = numberOfTimes;
    }

    @Override
    public void open(Configuration parameters) {
        // 通过状态名称(句柄)获取状态实例,如果不存在则会自动创建
        abnormalData = getRuntimeContext().getListState(
            new ListStateDescriptor<>("abnormalData", Long.class));
    }

    @Override
    public void flatMap(Tuple2<String, Long> value, Collector<Tuple2<String, List<Long>>> out)
        throws Exception {
        Long inputValue = value.f1;
        // 如果输入值超过阈值,则记录该次不正常的数据信息
        if (inputValue >= threshold) {
            abnormalData.add(inputValue);
        }
        ArrayList<Long> list = Lists.newArrayList(abnormalData.get().iterator());
        // 如果不正常的数据出现达到一定次数,则输出报警信息
        if (list.size() >= numberOfTimes) {
            out.collect(Tuple2.of(value.f0 + " 超过指定阈值 ", list));
            // 报警信息输出后,清空状态
            abnormalData.clear();
        }
    }
}

Вызов пользовательского мониторинга состояния, здесь мы используем A и B для представления различных типов данных мониторинга, отслеживаем его данные:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Tuple2<String, Long>> tuple2DataStreamSource = env.fromElements(
    Tuple2.of("a", 50L), Tuple2.of("a", 80L), Tuple2.of("a", 400L),
    Tuple2.of("a", 100L), Tuple2.of("a", 200L), Tuple2.of("a", 200L),
    Tuple2.of("b", 100L), Tuple2.of("b", 200L), Tuple2.of("b", 200L),
    Tuple2.of("b", 500L), Tuple2.of("b", 600L), Tuple2.of("b", 700L));
tuple2DataStreamSource
    .keyBy(0)
    .flatMap(new ThresholdWarning(100L, 3))  // 超过100的阈值3次后就进行报警
    .printToErr();
env.execute("Managed Keyed State");

Результат выглядит следующим образом:

https://github.com/heibaiying

2.2 Срок действия статуса

Любой из вышеперечисленных типов ключевого состояния поддерживает настройку периода действия (TTL), примеры следующие:

StateTtlConfig ttlConfig = StateTtlConfig
    // 设置有效期为 10 秒
    .newBuilder(Time.seconds(10))  
    // 设置有效期更新规则,这里设置为当创建和写入时,都重置其有效期到规定的10秒
    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) 
    /*设置只要值过期就不可见,另外一个可选值是ReturnExpiredIfNotCleanedUp,
     代表即使值过期了,但如果还没有被物理删除,就是可见的*/
    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
    .build();
ListStateDescriptor<Long> descriptor = new ListStateDescriptor<>("abnormalData", Long.class);
descriptor.enableTimeToLive(ttlConfig);

2.3 Состояние оператора

По сравнению с состоянием с ключом, состояние оператора в настоящее время поддерживает только следующие три типа хранения:

  • ListState: Сохраняет состояние типа списка.
  • UnionListState: Хранит состояние типа списка. Разница из ListState заключается в том, что если степень параллелизма изменяется, ListState будет объединять все одновременные случаи состояния оператора, а затем распределяют их одинаково к новым задачам; экземпляры агрегированы, а также специфические задачи; Поведение разделения определяется пользователем.
  • BroadcastState: Состояние оператора, используемое для трансляции.

Здесь мы продолжаем использовать приведенный выше пример, предполагая, что нам не нужно различать типы данных мониторинга в это время, пока данные мониторинга превышают пороговое значение и достигают заданного количества раз, будет выдаваться сигнал тревоги. код выглядит следующим образом:

public class ThresholdWarning extends RichFlatMapFunction<Tuple2<String, Long>, 
Tuple2<String, List<Tuple2<String, Long>>>> implements CheckpointedFunction {

    // 非正常数据
    private List<Tuple2<String, Long>> bufferedData;
    // checkPointedState
    private transient ListState<Tuple2<String, Long>> checkPointedState;
    // 需要监控的阈值
    private Long threshold;
    // 次数
    private Integer numberOfTimes;

    ThresholdWarning(Long threshold, Integer numberOfTimes) {
        this.threshold = threshold;
        this.numberOfTimes = numberOfTimes;
        this.bufferedData = new ArrayList<>();
    }

    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        // 注意这里获取的是OperatorStateStore
        checkPointedState = context.getOperatorStateStore().
            getListState(new ListStateDescriptor<>("abnormalData",
                TypeInformation.of(new TypeHint<Tuple2<String, Long>>() {
                })));
        // 如果发生重启,则需要从快照中将状态进行恢复
        if (context.isRestored()) {
            for (Tuple2<String, Long> element : checkPointedState.get()) {
                bufferedData.add(element);
            }
        }
    }

    @Override
    public void flatMap(Tuple2<String, Long> value, 
                        Collector<Tuple2<String, List<Tuple2<String, Long>>>> out) {
        Long inputValue = value.f1;
        // 超过阈值则进行记录
        if (inputValue >= threshold) {
            bufferedData.add(value);
        }
        // 超过指定次数则输出报警信息
        if (bufferedData.size() >= numberOfTimes) {
             // 顺便输出状态实例的hashcode
             out.collect(Tuple2.of(checkPointedState.hashCode() + "阈值警报!", bufferedData));
            bufferedData.clear();
        }
    }

    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        // 在进行快照时,将数据存储到checkPointedState
        checkPointedState.clear();
        for (Tuple2<String, Long> element : bufferedData) {
            checkPointedState.add(element);
        }
    }
}

Чтобы вызвать пользовательское состояние оператора, вам нужно установить параллелизм равным 1:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 开启检查点机制
env.enableCheckpointing(1000);
// 设置并行度为1
DataStreamSource<Tuple2<String, Long>> tuple2DataStreamSource = env.setParallelism(1).fromElements(
    Tuple2.of("a", 50L), Tuple2.of("a", 80L), Tuple2.of("a", 400L),
    Tuple2.of("a", 100L), Tuple2.of("a", 200L), Tuple2.of("a", 200L),
    Tuple2.of("b", 100L), Tuple2.of("b", 200L), Tuple2.of("b", 200L),
    Tuple2.of("b", 500L), Tuple2.of("b", 600L), Tuple2.of("b", 700L));
tuple2DataStreamSource
    .flatMap(new ThresholdWarning(100L, 3))
    .printToErr();
env.execute("Managed Keyed State");
}

На данный момент вывод следующий:

https://github.com/heibaiying

В приведенном выше вызывающем коде мы установили параллелизм программы на 1, и мы видим, что все хэш-коды экземпляров состояния в трех выходных данных согласованы, доказывая, что все они являются одним и тем же экземпляром состояния. Если предположить, что степень параллелизма установлена ​​равной 2, результат будет следующим:

https://github.com/heibaiying

Видно, что хэш-коды экземпляров состояния в двух выходных данных несовместимы, что означает, что они не являются одним и тем же экземпляром состояния, что также упоминалось выше.Состояние оператора привязано к параллельному экземпляру оператора. При этом выводится только два раза, потому что в случае параллельной обработки поток 1 может получить 5 аномальных значений, а поток 2 может получить 4 аномальных значения, потому что для вывода требуется более 3 раз, поэтому в этом случае В этом случае будут выведены только две записи, поэтому нужно установить параллелизм программы на 1.

3. Механизм контрольно-пропускного пункта

3.1 CheckPoints

Чтобы состояние Flink имело хорошую отказоустойчивость, Flink предоставляет механизм контрольных точек (CheckPoints). Через механизм контрольных точек Flink периодически создает барьеры контрольных точек в потоке данных.Когда оператор получает барьер, он создает моментальный снимок на основе текущего состояния, а затем передает барьер нижестоящему оператору, и нижестоящий оператор получает После достижения барьера также генерируется снимок на основе текущего состояния, который по очереди передается последнему стоковому оператору. При возникновении исключения Flink может восстановить все операторы в предыдущее состояние на основе последних данных моментального снимка.

https://github.com/heibaiying

3.2 Включить контрольную точку

По умолчанию механизм чекпоинта выключен и его нужно включить в программе:

// 开启检查点机制,并指定状态检查点之间的时间间隔
env.enableCheckpointing(1000); 

// 其他可选配置如下:
// 设置语义
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 设置两个检查点之间的最小时间间隔
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// 设置执行Checkpoint操作时的超时时间
env.getCheckpointConfig().setCheckpointTimeout(60000);
// 设置最大并发执行的检查点的数量
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// 将检查点持久化到外部存储
env.getCheckpointConfig().enableExternalizedCheckpoints(
    ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// 如果有更近的保存点时,是否将作业回退到该检查点
env.getCheckpointConfig().setPreferCheckpointForRecovery(true);

3.3 Механизм точки сохранения

Механизм точек сохранения (Savepoints) — это специальная реализация механизма контрольных точек, которая позволяет запускать контрольные точки вручную и постоянно сохранять результат по указанному пути, что в основном используется для избежания перезапуска или обновления кластера Flink. потеря состояния. Пример выглядит следующим образом:

# 触发指定id的作业的Savepoint,并将结果存储到指定目录下
bin/flink savepoint :jobId [:targetDirectory]

Для получения дополнительных команд и конфигураций обратитесь к официальной документации:savepoints

4. Бэкенд статуса

4.1 Классификация государственных менеджеров

По умолчанию все состояния хранятся в куче памяти JVM. В случае слишком большого количества данных состояния этот метод может вызвать переполнение памяти. Поэтому Flink должен предоставлять другие способы хранения данных состояния. Эти методы хранения в совокупности называются бэкэнды состояний (или менеджеры состояний):

https://github.com/heibaiying

Существует три основных типа:

1. MemoryStateBackend

Метод по умолчанию, основанный на куче памяти JVM для хранения, в основном подходит для локальной разработки и отладки.

2. FsStateBackend

Хранилище основано на файловой системе, которая может быть локальной файловой системой или распределенной файловой системой, такой как HDFS. Следует отметить, что хотя выбран FsStateBackend, текущие данные все равно хранятся в памяти TaskManager, а снимок состояния будет записываться в указанную файловую систему только в контрольной точке.

3. RocksDBStateBackend

ROCKSDBSTATEBACKEND ROCKSDBSTATEBACKEND встроена в третий штатный менеджер, встроенная база данных ROCKSDB-значения клавишных данных хранится. К тому времени контрольно-пропускной пункт, затем данные в нем в указанную постоянную файловую систему, необходимость настроить ROCKSDBSTATEBACKEND при использовании постоянной файловой системы хранения. Это сделано, потому что ROCKSDB в качестве встроенной безопасности базы данных относительно низкая, но по сравнению с тем, как всю файловую систему, скорость чтения быстрее; по сравнению с тем, как память полна, большая память, так что это более сбалансированная программа Отказ

4.2 Метод настройки

Flink поддерживает два способа настройки бэкэнд-менеджера:

первый способ: настроить на основе кода и применить только к текущему заданию:

// 配置 FsStateBackend
env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints"));
// 配置 RocksDBStateBackend
env.setStateBackend(new RocksDBStateBackend("hdfs://namenode:40010/flink/checkpoints"));

При настройке RocksDBStateBackend необходимо импортировать следующие дополнительные зависимости:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-statebackend-rocksdb_2.11</artifactId>
    <version>1.9.0</version>
</dependency>

второй способ: на основеflink-conf.yamlФайл конфигурации настроен и действует для всех заданий, развернутых в кластере:

state.backend: filesystem
state.checkpoints.dir: hdfs://namenode:40010/flink/checkpoints

Примечание. Адрес загрузки всех примеров кодов в этой статье:flink-state-management

использованная литература

Другие статьи серии больших данных можно найти в проекте с открытым исходным кодом GitHub.:Руководство для начинающих по большим данным