предисловие
Прошло некоторое время с момента последней статьи Flink.В то время некоторые операторы были на всем протяжении, так что прежде чем войти в эту статью, давайте разминемся и вспомним код
Теперь мы хотим реализовать такую функцию, которая также является подсчетом слов, но этот подсчет слов должен быть реализован, определено пользовательское пороговое значение и функция печати выполняется каждый раз при достижении порогового значения. Если у вас уже есть определенное представление о Flink, вы должны знать, что нам нужно только настроить нижестоящий
public class TestOperatorState {
public static void main(String[] args) throws Exception{
LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
DataStreamSource<Tuple2<String,Integer>> dataStreamSource = env.fromElements(
Tuple2.of("spark",3),
Tuple2.of("kafka",3),
Tuple2.of("flink",3),
Tuple2.of("hive",3),
Tuple2.of("hbase",3),
Tuple2.of("es",3)
);
dataStreamSource.addSink(new MySink(2));
env.execute("TestOperatorState");
}
}
Затем реализуйте эту функцию через MySink
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import java.util.ArrayList;
import java.util.List;
public class MySink implements SinkFunction<Tuple2<String,Integer>> {
private List<Tuple2<String,Integer>> bufferElements;
// 定义一个阈值
private int threshold;
public MySink(int threshold){
this.threshold = threshold;
bufferElements = new ArrayList<Tuple2<String, Integer>>();
}
public void invoke(Tuple2<String, Integer> value, Context context) throws Exception {
bufferElements.add(value);
if (bufferElements.size() == threshold){
System.out.println("数据:"+bufferElements);
bufferElements.clear();
}
}
}
Он всегда сообщает об этом Не удалось загрузить класс "org.slf4j.impl.StaticLoggerBinder" во время выполнения. Если вам это не очень нравится, вы также можете добавить этот pom
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.25</version>
</dependency>
Но это превратит исходный вывод из 3-х предложений в экран, полный необъяснимых логов, так что считайте сами
Однако с этой программой явно есть проблема.Поскольку данные хранятся в памяти, они будут потеряны при перезапуске программы.Поэтому, чтобы обеспечить отказоустойчивость состояния, Flink необходимо делать контрольные точки состояния.
1. Содержание
1.1 Вернитесь к программе прямо сейчас
Если мы хотим сделать преобразование контрольной точки в программе, которую мы только что написали, мы должны дать ей контрольную точку в MySink.
private ListState<Tuple2<String,Integer>> checkPointState;
Затем реализуйте интерфейс CheckpointedFunction.У этого интерфейса есть два метода, которые необходимо реализовать, один — snapshotState, а другой — initializeState.Оба их очень легко понять, когда они переведены с английского. Один из них — сделать снимок состояния, а другой — инициализировать состояние, как показано на следующем рисунке.
Тогда, если вы видели насВзгляните на различные состояния FlinkПарни на
ок, в это время мы тоже рисуем тыкву, initializeState — это метод восстановления данных при перезапуске. Следует отметить, что тип данных, используемый здесь, должен быть специально запомнен.Эта контрольная точка на самом деле предназначена для поддержания специального состояния для записи своего состояния, которое совпадает с состоянием, замеченным ранее.
public void initializeState(FunctionInitializationContext context) throws Exception {
ListStateDescriptor<Tuple2<String, Integer>> descriptor = new ListStateDescriptor<Tuple2<String, Integer>>(
"buffer", TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {
@Override
public TypeInformation<Tuple2<String, Integer>> getTypeInfo() {
return super.getTypeInfo();
}
})
);
checkPointState = context.getOperatorStateStore().getListState(descriptor);
// 如果任务重启
if (context.isRestored()){
for (Tuple2<String, Integer> lostData : checkPointState.get()) {
bufferElements.add(lostData);
}
}
}
Роль snapshotState состоит в том, чтобы через равные промежутки времени записывать данные из нашей памяти в bufferElements.
public void snapshotState(FunctionSnapshotContext context) throws Exception {
checkPointState.clear();
for (Tuple2<String, Integer> data : bufferElements) {
bufferElements.add(data);
}
}
Этот пример является исходным примером на официальном сайте, код точно такой же, но на самом деле мы обычно не пишем такой код, потому что в этом случае это равносильно тому, что я поддерживаю состояние для записи информации о это состояние, мы нажимаем. Подпрограмма должна быть передана Flink, чтобы управлять ею для нас.
Итак, где именно хранится состояние во Flink?
1.2 Где хранится состояние
Для StateBackend, поддерживаемого Flink, состояние по умолчанию сохраняется в следующих трех местах:
1.2.1 MemoryStateBackend
Состояние хранится в памяти, которая также является архитектурой master-slave, и FLINK запускает службу JobManager, а затем из TaskManager.Информация о состоянии хранится в динамической памяти диспетчера задач, а состояние сохраняется в динамической памяти диспетчера заданий при создании контрольной точки..
А еще мы можем вручную указать в программе, например вот эту
Так что если наша программа в это время зависнет, данные в памяти пропадут, поэтому для решения этой проблемы мы изменим этот параметр
1.2.2 FsStateBackend
FsStateBackend имеет некоторые оптимизации для предыдущего MemoryStateBackend, и его TaskManager будет периодически сохранять состояние в HDFS. То есть при чекпоинте сохранять состояние в указанный файл (HDFS и другие файловые системы)
env.setStateBackend(new FsStateBackend("hdfs path"));
Недостатки: Размер состояния ограничен памятью TaskManager (по умолчанию поддерживается 5M, что можно настроить).Если данные в памяти превысят это значение перед сохранением в HDFS, данные все равно будут потеряны. Преимущество в том, что работа с памятью, скорость доступа к состоянию очень быстрая
1.2.3 RocksDBStateBackend
env.setStateBackend(new RocksDBStateBackend("path"));
Это конфигурация, используемая в производственной среде.Информация о состоянии хранится в базе данных RocksDB (служба хранения данных с ключом и значением) и, наконец, сохраняется в локальном файле.
При сохранении состояния чекпоинта в указанный файл (файловая система HDFS и т.п.),
Недостатком является то, что скорость доступа к состоянию ниже, чем у FsStateBackend. Преимущества: может хранить большое количество информации о состоянии, потому что она также распределена
1.3 Введение в контрольно-пропускной пункт
Чтобы обеспечить отказоустойчивость состояния, Flink необходимо создать контрольную точку состояния.
Контрольная точка — это основная функция механизма отказоустойчивости Flink.Он может периодически создавать моментальные снимки на основе состояния каждого оператора/задачи в потоке в соответствии с конфигурацией, чтобы регулярно и постоянно сохранять эти данные о состоянии.При неожиданном сбое программы Flink , При повторном запуске программы можно выборочно восстанавливаться из этих снимков, тем самым исправляя аномалии данных программы, вызванные сбоями
Предпосылка, что механизм контрольных точек Flink может взаимодействовать с постоянным хранилищем (поток и состояние):
Постоянный источник, который должен поддерживать воспроизведение событий в течение определенного периода времени. Типичными примерами таких источников являются постоянные очереди сообщений (такие как Apache Kafka, RabbitMQ и т. д.) или файловые системы (такие как HDFS, S3, GFS и т. д.).
Для постоянного хранения данных о состоянии, таких как распределенные файловые системы (такие как HDFS, S3, GFS и т. д.)
В задаче Flink много задач, и все они генерируют множество состояний, которые периодически будут храниться в определенном месте, которое на рисунке представлено как checkPointState.
Просто выньте его, когда выздоровеете
1.4 Настройка параметров
Поскольку checkPoint не включен по умолчанию, нам нужно включить его, установив
env.enableCheckpointing(10000);
Здесь установлено значение checkPoint раз в 10 секунд. Но рекомендуемое значение составляет от 20 до 120 секунд.
CheckpointConfig checkpointConfig = env.getCheckpointConfig();
checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
Вот режим, который устанавливает контрольную точку на одноразовую семантику, что также по умолчанию
Производительность хотя бы одного определенно будет, но данные могут повторяться, поэтому используется эталонная сцена, этот блок и потоковая передача Spark все еще различаются, потоковая передача искры выигрывает от механизма отказоустойчивости RDD, поэтому вы можете сделать точно_один раз
checkpointConfig.setMinPauseBetweenCheckpoints(500);
Это минимальный временной интервал между двумя чекпоинтами.В это время друзья могут немного запутаться.Когда я включал чекпоинт,я уже не давал 10 секунд как интервал выполнения чекпоинта?В чем смысл этого параметра ? Объясните здесь, что нам обязательно потребуется определенное количество времени для выполнения checkPoint. Например, мне потребовалось 10 секунд, чтобы выполнить checkPoint в этот раз, или я еще не закончил его выполнение. setMinPauseBetweenCheckpoints(500) означает, что я сделаю эти два чекпоинта.Операция будет иметь как минимум определенный временной интервал, подождите немного смысл последнего чекпоинта
checkpointConfig.setCheckpointTimeout(60000);
Устанавливается время тайм-аута чекпоинта, если текущий чекпоинт не заканчивается в течение одной минуты, то сдаемсяи выполнить следующую контрольную точку
checkpointConfig.setMaxConcurrentCheckpoints(1);
Сохранить количество контрольных точек, по умолчанию 1, то есть сохраняется только последний результат контрольной точки.
env.getCheckpointConfig().enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
Указывает, что после отмены обработчика Flink данные контрольной точки будут сохранены, чтобы их можно было восстановить до указанной контрольной точки в соответствии с фактическими потребностями.Вы также можете настроить автоматическое удаление контрольной точки при остановке программы.
1.5 Используйте CheckPoint для восстановления данных
Восстановление данных на самом деле представляет собой простую задачу перезапуска:
Flink поддерживает различные стратегии перезапуска для управления перезапуском заданий в случае сбоя.Кластер запускается со стратегией перезапуска по умолчанию, которая используется, когда не определена конкретная стратегия перезапуска. Если при отправке задания указана стратегия перезапуска, эта стратегия переопределит стратегию кластера по умолчанию.Стратегию перезапуска по умолчанию можно указать в файле конфигурации Flink flink-conf.yaml. Параметр конфигурации restart-strategy определяет используемую стратегию.
常用的重启策略
(1)固定间隔策略 (Fixed delay)
(2)失败率策略 (Failure rate)
(3)无重启 (No restart)
Если контрольные точки не включены, используется стратегия без перезапуска.
Если контрольные точки включены, но политика перезапуска не настроена, используется политика с фиксированной задержкой,
Значение по умолчанию количества попыток перезапуска: Integer.MAX_VALUE Политику перезапуска можно настроить в файле flink-conf.yaml, который представляет собой глобальную конфигурацию. Его также можно динамически указать в коде приложения, что переопределит глобальную конфигурацию.
1.5.1 Стратегия перезапуска с фиксированным интервалом
第一种:全局配置 flink-conf.yaml
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 3
restart-strategy.fixed-delay.delay: 10 s
Мы также можем использовать код для настройки
第二种:应用代码设置
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3, // 尝试重启的次数
Time.of(10, TimeUnit.SECONDS) // 每次重试的间隔
));
1.5.2 Стратегия частоты отказов (меньше сценариев)
Первое:
全局配置 flink-conf.yaml
restart-strategy: failure-rate
restart-strategy.failure-rate.max-failures-per-interval: 3
restart-strategy.failure-rate.failure-rate-interval: 5 min
restart-strategy.failure-rate.delay: 10 s
Его также можно установить по коду
env.setRestartStrategy(RestartStrategies.failureRateRestart(
3, // 一个时间段内的最大失败次数
Time.of(5, TimeUnit.MINUTES), // 衡量失败次数的时间段
Time.of(10, TimeUnit.SECONDS) // 间隔
));
Здесь это означает, что разрешено 3 перезапуска в течение 5 минут.
1.5.3 Без перезагрузки
Когда задача зависнет, повесьте трубку, отпустите
第一种:全局配置 flink-conf.yaml
restart-strategy: none
Настройки кода
env.setRestartStrategy(RestartStrategies.noRestart());
Конечно, если мы разрабатываем ежедневно, невозможно установить глобальную конфигурацию, и мы все модифицируем эту настройку в соответствии с разными потребностями.
1.6 Настройки нескольких контрольных точек
По умолчанию, если установлен параметр «Контрольная точка», Flink сохраняет только самую последнюю успешно созданную контрольную точку, а в случае сбоя программы Flink он может восстановиться с самой последней контрольной точки. Однако, если мы хотим сохранить несколько контрольных точек и иметь возможность выбрать одну из них для восстановления в соответствии с реальными потребностями, это будет более гибко.Например, мы обнаруживаем, что существует проблема с обработкой записей данных в последних 4 часов, и мы надеемся восстановить все состояние Flink 4 часа назад.Можно сохранить несколько контрольных точек.
Вам необходимо добавить следующую конфигурацию в файл конфигурации Flink conf/flink-conf.yaml, чтобы указать максимальное количество сохраняемых контрольных точек.
state.checkpoints.num-retained: 20
Конечно, мы выберем настройку в коде, который просто
checkpointConfig.setMaxConcurrentCheckpoints(20)
На этом этапе, после автоматического перезапуска задачи, для восстановления данных используется последняя контрольная точка.
После этой настройки просмотрите каталог файлов, сохраненный соответствующей контрольной точкой на HDFS.
hdfs dfs -ls hdfs://namenode:9000/flink/checkpoints
Если вы хотите вернуться к определенной контрольной точке, вам нужно только указать соответствующий путь контрольной точки для достижения этой цели.
1.7 Ручное восстановление данных через CheckPoint
Каждая из наших задач Flink будет иметь эксклюзивный JobID, а затем данные, которые мы сохраняем CheckPoint в HDFS, также будут называться в соответствии с этим JobId, что требует внимания. Если нам нужно вручную восстановить данные через контрольную точку, то нам нужно перейти в каталог HDFS, а затем найти нашу папку контрольной точки --- имя по умолчанию chk-xx, за которым следует число, указывающее, что текущая контрольная точка уже является сначала несколько. Команда выглядит следующим образом
bin/flink run -s hdfs://namenode:9000/flink/checkpoints/467e17d2cc343e6c56255d222bae3421/chk-56/_metadata flink-job.jar
Следует отметить, что мало найти chk-xx, нам нужно еще указать в нем _metadata, под ним также можно понимать информацию о метаданных данных
Конечно, с этим тоже будут проблемы, потому что Flink только что
checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
Это не сквозная одноразовая семантика.Когда бизнес меняется и нам нужно остановить задачу и изменить код, данные могут быть продублированы, когда мы снова выйдем в сеть.
Итак, на данный момент нам нужно использовать savePoint.Flink может использовать функцию Savepoint, чтобы продолжать выполнять вычисления с точки до обновления после обновления программы, чтобы гарантировать, что данные не будут прерваны.
Он может сохранять смещение источника данных, статус операции оператора и другую информацию и может продолжать потреблять с того момента, когда приложение выполнило какую-либо точку сохранения в прошлом.
1:在flink-conf.yaml中配置Savepoint存储位置
不是必须设置,但是设置后,后面创建指定Job的Savepoint时,可以不用在手动执行命令时指定Savepoint的位置
state.savepoints.dir: hdfs://namenode:9000/flink/savepoints
2:触发一个savepoint【直接触发或者在cancel的时候触发】
bin/flink savepoint jobId [targetDirectory] [-yid yarnAppId]【针对on yarn模式需要指定-yid参数】
// 使用这个命令停止的Flink,会在退出前帮你再保存一个checkPoint
bin/flink cancel -s [targetDirectory] jobId [-yid yarnAppId]【针对on yarn模式需要指定-yid参数,也就是application_xxx】
3:从指定的savepoint启动job
bin/flink run -s savepointPath [runArgs]
Ручное выполнение пользователем является указателем на контрольную точку, срок действия которой не ограничен и используется в случае обновления.
Примечание. Для плавного обновления между разными версиями заданий и между разными версиями Flink настоятельно рекомендуется, чтобы программисты вручную назначали идентификаторы операторам с помощью метода uid(String).
Эти идентификаторы будут использоваться для определения диапазона состояний каждого оператора. Если вы не назначите идентификатор каждому оператору вручную, Flink автоматически создаст идентификатор для каждого оператора. Программу можно восстановить из точки сохранения, если эти идентификаторы не изменились. И эти автоматически сгенерированные идентификаторы зависят от структуры программы и чувствительны к изменениям кода. Поэтому настоятельно рекомендуется, чтобы пользователи устанавливали идентификатор вручную.
Finally
watermark