Время события Flink, водяной знак и поздняя обработка данных

Java

Время события и водяной знак

Так называемое время события — это метка времени, записанная самим элементом данных в Flink DataStream, когда оно действительно происходит, имеет бизнес-значение и не зависит от системного времени. Очевидно, что поскольку данные, сгенерированные внешней системой, часто не могут своевременно и по порядку попасть в систему Flink, время события более непредсказуемо, чем время обработки. Для точного отображения хода обработки времени события необходимо использовать водяной знак.

Flink DataStream - это сущность водяного знака в специальном элементе, каждый водяной знак несет срок. Когда The Time для водяного знака появляется t, событие представляет собой время T T. Другими словами, водяной знак - критерий сгибания поздних данных, но и отметки окна триггера.

Чтобы визуализировать роль водяных знаков, рассмотрите следующую диаграмму, которая является примером неупорядоченного потока данных, основанного на времени события.

https://www.ververica.com/blog/how-apache-flink-enables-new-streaming-applications-part-1 Квадраты на рисунке — это элементы данных, числа в них представляют время события, а W( x) is Указывает, что отметка времени является водяным знаком x и имеет скользящее окно длиной 4 единицы времени. Принимая за единицу времени секунды, можно видеть, что элементы со временем события 2, 3 и 1 с войдут в окно с интервалом [1 с, 4 с], а элементы со временем события 7 с войдут в окно с интервалом [1 с, 4 с]. интервал [5 с, 8 с]. Когда приходит водяной знак W(4), это означает, что элементов с t

Однако на данный момент на рисунке не показаны более поздние данные. Если элемент со временем события 6 появляется после W(9), он опаздывает. Обработка поздних данных будет рассмотрена позже.

Приведенный выше пример имеет только одну степень параллелизма, тогда в случае нескольких степеней параллелизма будет несколько потоков, генерирующих водяные знаки Какой водяной знак следует использовать при срабатывании окна? Ответ — тот, у которого наименьшая метка времени среди всех входящих водяных знаков. График из официальной документации может проиллюстрировать проблему.

https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/event_time.html легко понять, если все входящие водяные знаки с наименьшей отметкой времени достигли или превысили время окончания окна , то данные всех потоков должны быть собраны, и расчет окна можно безопасно запускать.

Извлечь время события, создать водяной знак

Намного выше, то как время события извлекается из данных, как находится водяной знак? Flink предоставляет метод Unified DataStream.assigntimestampsandwattermarks () для извлечения времени события и генерировать водяные знаки одновременно, в конце концов, они тесно связаны во время обработки.

Типы параметров, принимаемые методом assignTimestampsAndWatermarks(), — это AssignerWithPeriodicWatermarks и AssignerWithPunctuatedWatermarks, которые соответствуют периодическим водяным знакам и точечным (то есть инициируемым свойствами самого события) водяным знакам. Их диаграммы классов показаны ниже.

Периодический водяной знак

Как следует из названия, при использовании AssignerWithPeriodicWatermarks водяные знаки создаются периодически. Период по умолчанию составляет 200 мс, а новый период также можно указать с помощью метода ExecutionConfig.setAutoWatermarkInterval().

Из диаграммы классов легко увидеть, что нам нужно извлечь время события, реализуя метод extractTimestamp(), и реализовать метод getCurrentWatermark() для создания водяного знака. Но, к счастью, Flink предоставляет 3 встроенных класса реализации, поэтому мы можем использовать их напрямую, избегая проблем.

AscendingTimestampExtractor всегда говорит сухо (?), давайте посмотрим на код. открытый абстрактный длинный экстракт AscendingTimestamp (элемент T);

    @Override
    public final long extractTimestamp(T element, long elementPrevTimestamp) {
        final long newTimestamp = extractAscendingTimestamp(element);
        if (newTimestamp >= this.currentTimestamp) {
            this.currentTimestamp = newTimestamp;
            return newTimestamp;
        } else {
            violationHandler.handleViolation(newTimestamp, this.currentTimestamp);
            return newTimestamp;
        }
    }
    @Override
    public final Watermark getCurrentWatermark() {
        return new Watermark(currentTimestamp == Long.MIN_VALUE ? Long.MIN_VALUE : currentTimestamp - 1);
    }
AscendingTimestampExtractor产生的时间戳和水印必须是单调非递减的,用户通过覆写extractAscendingTimestamp()方法抽取时间戳。如果产生了递减的时间戳,就要使用名为MonotonyViolationHandler的组件处理异常,有两种方式:打印警告日志(默认)和抛出RuntimeException。

Монотонно увеличивающееся время события и не соответствующее реальной ситуации, поэтому AscendingTimestampExtractor использовал много.

BoundedOutOfOrdernessTimestampExtractor имеет очень высокую скорость появления. Или сначала посмотрите код. public BoundedOutOfOrdernessTimestampExtractor(Time maxOutOfOrderness) {if (maxOutOfOrderness.toMilliseconds()

    public abstract long extractTimestamp(T element);
    @Override
    public final Watermark getCurrentWatermark() {
        long potentialWM = currentMaxTimestamp - maxOutOfOrderness;
        if (potentialWM >= lastEmittedWatermark) {
            lastEmittedWatermark = potentialWM;
        }
        return new Watermark(lastEmittedWatermark);
    }
    @Override
    public final long extractTimestamp(T element, long previousElementTimestamp) {
        long timestamp = extractTimestamp(element);
        if (timestamp > currentMaxTimestamp) {
            currentMaxTimestamp = timestamp;
        }
        return timestamp;
    }
如名字所述,BoundedOutOfOrdernessTimestampExtractor产生的时间戳和水印是允许“有界乱序”的,构造它时传入的参数maxOutOfOrderness就是乱序区间的长度,而实际发射的水印为通过覆写extractTimestamp()方法提取出来的时间戳减去乱序区间,相当于让水印把步调“放慢一点”。这是Flink为迟到数据提供的第一重保障。

Конечно, продолжительность интервала нарушения порядка должна быть тщательно установлена ​​в соответствии с реальной средой.Если он установлен слишком коротким, будет потеряно больше данных, а если он установлен слишком длинным, окно вызовет задержку и производительность в реальном времени будет ослаблена.

IngestionTimeExtractor@Overridepublic long extractTimestamp(T element, long previousElementTimestamp) {final long now = Math.max(System.currentTimeMillis(), maxTimestamp);maxTimestamp = now;return now;}

    @Override
    public Watermark getCurrentWatermark() {
        final long now = Math.max(System.currentTimeMillis(), maxTimestamp);
        maxTimestamp = now;
        return new Watermark(now - 1);
    }
IngestionTimeExtractor基于当前系统时钟生成时间戳和水印,其实就是Flink三大时间特征里的摄入时间了。

водяной знак

Точечные водяные знаки используют гораздо меньше, чем периодические водяные знаки, а у Flink нет встроенной реализации, так что напишем самый простой каштан.

    sourceStream.assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<UserActionRecord>() {
      @Nullable
      @Override
      public Watermark checkAndGetNextWatermark(UserActionRecord lastElement, long extractedTimestamp) {
        return lastElement.getUserId().endsWith("0") ? new Watermark(extractedTimestamp - 1) : null;
      }
      @Override
      public long extractTimestamp(UserActionRecord element, long previousElementTimestamp) {
        return element.getTimestamp();
      }
    });
AssignerWithPunctuatedWatermarks适用于需要依赖于事件本身的某些属性决定是否发射水印的情况。我们实现checkAndGetNextWatermark()方法来产生水印,产生的时机完全由用户控制。上面例子中是收取到用户ID末位为0的数据时才发射。

Напомню еще три момента:

Независимо от того, какой метод используется для создания водяного знака, он не должен быть слишком частым. Поскольку все объекты водяных знаков будут перемещаться вниз по течению и фактически будут занимать память, слишком большое количество водяных знаков приведет к снижению производительности системы. Создание водяного знака должно производиться как можно раньше, как правило, после доступа к Источнику или после того, как Источник подвергнется простому преобразованию (сопоставление, фильтрация и т. д.). Если заказчик не заботится о деловой значимости переноса времени события, время обработки может быть использовано напрямую, что просто и удобно. поздняя обработка данных

Как упоминалось выше, интервал не по порядку водяного знака может гарантировать, что некоторые просроченные данные не будут отброшены, но интервал не по порядку часто не очень велик. Есть два способа получить итоговую прибыль, которую можно назвать второй гарантией, которую Flink предоставляет для просроченных данных.

окно разрешить задержку

Flink предоставляет метод WindowedStream.allowedLateness() для установки допустимой задержки окна. Другими словами, при нормальных обстоятельствах окно будет уничтожено после завершения расчета триггера, но после установки допустимой задержки окно будет ожидать время допустимой задержки, прежде чем будет уничтожено. Поздние данные в пределах этого интервала все еще могут попасть в окно и запустить новый расчет. Конечно, окно также является большим ресурсом, поэтому значение allowLateness должно быть соответствующим. Полный пример кода приведен ниже.

      sourceStream.assignTimestampsAndWatermarks(
        new BoundedOutOfOrdernessTimestampExtractor<UserActionRecord>(Time.seconds(30)) {
          private static final long serialVersionUID = 1L;
          @Override
          public long extractTimestamp(UserActionRecord record) {
            return record.getTimestamp();
          }
        }
      )
      .keyBy("platform")
      .window(TumblingEventTimeWindows.of(Time.minutes(1)))
      .allowedLateness(Time.seconds(30))
      .aggregate(new ViewAggregateFunc(), new ViewSumWindowFunc())
      // ......
allowedLateness机制实际上就是DataFlow模型中的回填(backfill)策略的实现。对于滑动窗口和滚动窗口是累积(accumulating)策略,对于会话窗口则是累积与回撤(accumulating & retracting)策略。之前讲DataFlow模型时提到过,不废话了。

боковой вывод поздних данных

Боковой выход - это разгрузочный механизм Flink. Сами просроченные данные можно рассматривать как особый поток, мы отправляем просроченные данные в побочный поток вывода указанного OutputTag, вызывая метод WindowedStream.sideOutputLateData(), после чего переходим к следующему шагу (например, сохранению во внешнее хранилище). или очередь сообщений). код показывает, как показано ниже.

      // 侧输出的OutputTag
      OutputTag<UserActionRecord> lateOutputTag = new OutputTag<>("late_data_output_tag");
      sourceStream.assignTimestampsAndWatermarks(
        new BoundedOutOfOrdernessTimestampExtractor<UserActionRecord>(Time.seconds(30)) {
          private static final long serialVersionUID = 1L;
          @Override
          public long extractTimestamp(UserActionRecord record) {
            return record.getTimestamp();
          }
        }
      )
      .keyBy("platform")
      .window(TumblingEventTimeWindows.of(Time.minutes(1)))
      .allowedLateness(Time.seconds(30))
      .sideOutputLateData(lateOutputTag)   // 侧输出
      .aggregate(new ViewAggregateFunc(), new ViewSumWindowFunc())
      // ......
      // 获取迟到数据并写入对应Sink
      stream.getSideOutput(lateOutputTag).addSink(lateDataSink);

The End

Автор: LittleMagic Ссылка: https://www.jianshu.com/p/c612e95a5028

Обратите внимание на мой официальный аккаунт и ответьте на [JAVAPDF] в фоновом режиме, чтобы получить 200 страниц тестовых вопросов!Большие данные, на которые обращают внимание 50 000 человек, — это дорога к Богу, почему бы вам не прийти и не узнать об этом?50 000 человек обращают внимание на то, как большие данные становятся богом, разве вы не хотите узнать об этом?50 000 человек обращают внимание на то, как стать богом больших данных, вы уверены, что действительно не хотите прийти и узнать об этом?

приветствую ваше внимание«Дорога к большим данным становится Богом»

大数据技术与架构