Углубленный анализ Apache-Flink — Kafka of DataStream-Connectors

Flink


Знакомство с Кафкой

Apache Kafka — это распределенная система обмена сообщениями с публикацией и подпиской. Первоначально он был разработан корпорацией LinkedIn, которая в 2010 году внесла свой вклад в Apache Foundation и стала проектом с открытым исходным кодом высшего уровня. Kafka используется для создания конвейеров данных в реальном времени и потоковых приложений. Он обладает горизонтальной масштабируемостью, отказоустойчивостью, чрезвычайно высокой скоростью и широко используется.

Kafka — это не только система распределенных сообщений, но и поддержка потоковых вычислений, поэтому, прежде чем представить приложение Kafka в Apache Flink, давайте возьмем простой пример Kafka, чтобы интуитивно понять, что такое Kafka.

Установить

Эта статья не является систематическим и подробным введением в Kafka, а предназначена для того, чтобы каждый мог интуитивно понять Kafka, чтобы его можно было хорошо применить в Apahe Flink, поэтому мы устанавливаем Kafka самым простым способом.

  • Скачать бинарный пакет

curl -L -O http://mirrors.shu.edu.cn/apache/kafka/2.1.0/kafka_2.11-2.1.0.tgz
  • Разархивируйте установку
    Для установки Kafka нужно только распаковать загруженный tgz следующим образом:

jincheng:kafka jincheng.sunjc$ tar -zxf kafka_2.11-2.1.0.tgz 
jincheng:kafka jincheng.sunjc$ cd kafka_2.11-2.1.0
jincheng:kafka_2.11-2.1.0 jincheng.sunjc$ ls
LICENSE        NOTICE        bin        config        libs        site-docs

Среди них bin содержит все команды управления Kafka, такие как сервер Kafka, который мы начнем дальше.

  • Запустить KafkaServer
    Kafka — это система публикации и подписки, и подписка на сообщения должна сначала иметь службу. Мы запускаем экземпляр Kafka Server. Kafka должен использовать ZooKeeper.Для производственного развертывания нам нужно установить кластер ZooKeeper, что не входит в рамки этой статьи, поэтому мы используем сценарий, предоставленный Kafka, для установки экземпляра ZooKeeper только с одним узлом. следующее:

jincheng:kafka_2.11-2.1.0 jincheng.sunjc$ bin/zookeeper-server-start.sh config/zookeeper.properties &

[2019-01-13 09:06:19,985] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
....
....
[2019-01-13 09:06:20,061] INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory)

После запуска ZooKeeper привязывается к порту 2181 (по умолчанию). Затем мы запускаем сервер Kafka следующим образом:

jincheng:kafka_2.11-2.1.0 jincheng.sunjc$ bin/kafka-server-start.sh config/server.properties
[2019-01-13 09:09:16,937] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
[2019-01-13 09:09:17,267] INFO starting (kafka.server.KafkaServer)
[2019-01-13 09:09:17,267] INFO Connecting to zookeeper on localhost:2181 (kafka.server.KafkaServer)
[2019-01-13 09:09:17,284] INFO [ZooKeeperClient] Initializing a new session to localhost:2181. (kafka.zookeeper.ZooKeeperClient)
...
...
[2019-01-13 09:09:18,253] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)

Если все пойдет хорошо, установка Kafka завершена.

Создать тему

Kafka — это система подписки на сообщения.Сначала создайте тему, на которую можно подписаться.flink-tipicТема, в новом терминале выполните следующую команду:

jincheng:kafka_2.11-2.1.0 jincheng.sunjc$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic flink-tipic

Created topic "flink-tipic".

Следующая информация об успешном создании также будет выведена в терминал Kafka Server:

...
[2019-01-13 09:13:31,156] INFO Created log for partition flink-tipic-0 in /tmp/kafka-logs with properties {compression.type -> producer, message.format.version -> 2.1-IV2, file.delete.delay.ms -> 60000, max.message.bytes -> 1000012, min.compaction.lag.ms -> 0, message.timestamp.type -> CreateTime, message.downconversion.enable -> true, min.insync.replicas -> 1, segment.jitter.ms -> 0, preallocate -> false, min.cleanable.dirty.ratio -> 0.5, index.interval.bytes -> 4096, unclean.leader.election.enable -> false, retention.bytes -> -1, delete.retention.ms -> 86400000, cleanup.policy -> [delete], flush.ms -> 9223372036854775807, segment.ms -> 604800000, segment.bytes -> 1073741824, retention.ms -> 604800000, message.timestamp.difference.max.ms -> 9223372036854775807, segment.index.bytes -> 10485760, flush.messages -> 9223372036854775807}. (kafka.log.LogManager)
...

показано вышеflink-topicКонфигурация основных атрибутов, таких как метод сжатия сообщений, формат сообщений, количество резервных копий и т. д.

В дополнение к просмотру журнала мы можем использовать команду display для запроса, успешно ли мы создалиflink-topic,следующее:

jincheng:kafka_2.11-2.1.0 jincheng.sunjc$ bin/kafka-topics.sh --list --zookeeper localhost:2181

flink-tipic

если выводflink-tipic, значит наша тема успешно создана.

Так где тема сохранена? Как Кафка публикует сообщения и подписывается на них? Для наглядности давайте посмотрим на следующую схематическую диаграмму архитектуры Kafka для простого понимания:

Чтобы кратко представить, Kafka использует ZooKeeper для хранения информации о кластере, то есть экземпляре сервера Kafka, который мы начали выше.В кластере может быть несколько экземпляров сервера Kafka.Сервер Kafka называется Broker, и темы, которые мы создаем, могут быть в одном или более Брокеры. Kafka использует режим Push для отправки сообщений и режим Pull для получения сообщений.

Отправить сообщение

Как отправить сообщение в существующую тему?Конечно, мы можем написать код для отправки сообщения по пути API. В то же время вы также можете использовать метод команды для удобной отправки сообщений следующим образом:

jincheng:kafka_2.11-2.1.0 jincheng.sunjc$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic flink-topic
>Kafka test msg 
>Kafka connector

Выше мы отправили два сообщенияKafka test msgиKafka connectorприбытьflink-topicТема.

прочитать сообщение

Что делать, если сообщение указанной темы прочитано? То же самое можно сделать как API, так и командным способом, читаем в командном режимеflink-topicсообщение следующим образом:

jincheng:kafka_2.11-2.1.0 jincheng.sunjc$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic flink-topic --from-beginning
Kafka test msg
Kafka connector

в--from-beginningОписывает, где мы читаем сообщение с начала темы.

Flink Kafka Connector

Ранее мы установили среду Kafka самым простым способом, а затем представили использование Flink Kafka Connector в указанной выше среде. Базовые знания, связанные с соединителем Flink, будут представлены в «Серии обсуждений Apache Flink (14) — соединители», здесь мы непосредственно представляем содержимое, связанное с соединителем Kafka.

Apache Flink предоставляет несколько версий Kafka Connector.В этой статье в качестве примера используется flink-1.7.0.

maven-зависимости

Чтобы использовать Kakfa Connector, нам нужно добавить зависимость от Kafka Connector в наш pom следующим образом:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-kafka_2.11</artifactId>
  <version>1.7.0</version>
</dependency>

Потребитель Flink Kafka должен знать, как преобразовывать двоичные данные в Kafka в объекты Java/Scala.DeserializationSchemaПозволяет пользователю указать такой режим. Вызовите метод T deserialize(byte[] message) для каждого сообщения Kafka, передав значение из Kafka.

Examples

В нашем примере данные считываются из Kafka, а затем после простой обработки записываются в Kafka. Нам нужно создать еще одну тему для написания, а именно:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic flink-tipic-output

Итак, в примере, который мы используем, источникflink-topic, для раковиныslink-topic-output.

Simple ETL

Мы предполагаем, что в Kafka хранится простая строка, поэтому нам нуженserializeиdeserializeреализации, то есть мы хотим определить реализациюDeserializationSchemaиSerializationSchemaКлассы сериализации и десериализации. Поскольку наш пример представляет собой строку, мы определяемKafkaMsgSchemaРеализуйте класс, а затем напишите основную программу Flink.

  • KafkaMsgSchema — полный код

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Preconditions;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.nio.charset.Charset;

public class KafkaMsgSchema implements DeserializationSchema<String>, SerializationSchema<String> {
    private static final long serialVersionUID = 1L;
    private transient Charset charset;

    public KafkaMsgSchema() {
        // 默认UTF-8编码
        this(Charset.forName("UTF-8"));
    }

    public KafkaMsgSchema(Charset charset) {
        this.charset = Preconditions.checkNotNull(charset);
    }

    public Charset getCharset() {
        return this.charset;
    }

    public String deserialize(byte[] message) {
        // 将Kafka的消息反序列化为java对象
        return new String(message, charset);
    }

    public boolean isEndOfStream(String nextElement) {
        // 流永远不结束
        return false;
    }

    public byte[] serialize(String element) {
       // 将java对象序列化为Kafka的消息
        return element.getBytes(this.charset);
    }

    public TypeInformation<String> getProducedType() {
        // 定义产生的数据Typeinfo
        return BasicTypeInfo.STRING_TYPE_INFO;
    }

    private void writeObject(ObjectOutputStream out) throws IOException {
        out.defaultWriteObject();
        out.writeUTF(this.charset.name());
    }

    private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
        in.defaultReadObject();
        String charsetName = in.readUTF();
        this.charset = Charset.forName(charsetName);
    }
}
  • Основная программа - полный код

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;

import java.util.Properties;

public class KafkaExample {
    public static void main(String[] args) throws Exception {
        // 用户参数获取
        final ParameterTool parameterTool = ParameterTool.fromArgs(args);
        // Stream 环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Source的topic
        String sourceTopic = "flink-topic";
        // Sink的topic
        String sinkTopic = "flink-topic-output";
        // broker 地址
        String broker = "localhost:9092";

        // 属性参数 - 实际投产可以在命令行传入
        Properties p = parameterTool.getProperties();
        p.putAll(parameterTool.getProperties());
        p.put("bootstrap.servers", broker);

        env.getConfig().setGlobalJobParameters(parameterTool);

        // 创建消费者
        FlinkKafkaConsumer consumer = new FlinkKafkaConsumer<String>(
                sourceTopic,
                new KafkaMsgSchema(),
                p);
        // 设置读取最早的数据
//        consumer.setStartFromEarliest();

        // 读取Kafka消息
        DataStream<String> input = env.addSource(consumer);


        // 数据处理
        DataStream<String> result = input.map(new MapFunction<String, String>() {
            public String map(String s) throws Exception {
                String msg = "Flink study ".concat(s);
                System.out.println(msg);
                return msg;
            }
        });

        // 创建生产者
        FlinkKafkaProducer producer = new FlinkKafkaProducer<String>(
                sinkTopic,
                new KeyedSerializationSchemaWrapper<String>(new KafkaMsgSchema()),
                p,
                FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);

        // 将数据写入Kafka指定Topic中
        result.addSink(producer);

        // 执行job
        env.execute("Kafka Example");
    }
}

Запустите основную программу следующим образом:


Процесс моей тестовой операции выглядит следующим образом:

  1. запускатьflink-topicиflink-topic-outputтяга потребления;

  2. по командеflink-topicдобавить тестовое сообщениеonly for test;

  3. Подтвердите добавленное тестовое сообщение с помощью команды printonly for test;

  4. Простейший FlinkJobsource->map->sinkСопоставьте тестовое сообщение:"Flink study ".concat(s);

  5. Распечатать данные стока через команду;

#### Встроенные схемы
Apache Flink предоставляет следующие три встроенные схемы для распространенных форматов сообщений:

  • TypeInformationSerializationSchema (and TypeInformationKeyValueSerializationSchema)Он создает схему на основе TypeInformation Flink. Это полезно, если данные записываются и считываются Flink.

  • JsonDeserializationSchema (and JSONKeyValueDeserializationSchema)Он преобразует сериализованный JSON в объект ObjectNode, из которого можно получить доступ к полям с помощью objectNode.get("field") as (Int/String/...)(). Объектный узел KeyValue содержит поля «ключ» и «значение», которые содержат все поля и необязательное поле «метаданные», которое предоставляет смещение/раздел/тему этого сообщения.

  • AvroDeserializationSchemaОн считывает данные, сериализованные с использованием формата Avro, используя статически предоставленную схему. Он может вывести схему из классов, сгенерированных Avro (AvroDeserializationSchema.forSpecific(...)), или может использовать предоставленную вручную схему с GenericRecords (используя AvroDeserializationSchema.forGeneric(...))

Чтобы использовать встроенные схемы, вам необходимо добавить следующие зависимости:

 <dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-avro</artifactId>
  <version>1.7.0</version>
</dependency>

прочитать конфигурацию местоположения

Когда мы используем данные Kafka, нам может потребоваться указать место потребления.FlinkKafkaConsumerПредоставляет множество удобных настроек местоположения, например:

  • Consumer.setStartFromEarliest() — начать с самой старой записи;

  • Consumer.setStartFromLatest() — начать с последней записи;

  • Consumer.setStartFromTimestamp(...);// начать с указанной временной метки эпохи (миллисекунды);

  • Consumer.setStartFromGroupOffsets(); // Поведение по умолчанию, продолжение потребления со смещения последнего потребления.

Приведенная выше спецификация позиции может быть точной для каждого раздела, например следующий код:

Map<KafkaTopicPartition, Long> specificStartOffsets = new HashMap<>();
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 0), 23L); // 第一个分区从23L开始
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 1), 31L);// 第二个分区从31L开始
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 2), 43L);// 第三个分区从43L开始

consumer.setStartFromSpecificOffsets(specificStartOffsets);

Это по-прежнему значение по умолчанию для неуказанных разделов.setStartFromGroupOffsetsСпособ.

Открытие темы

Kafka поддерживает автоматическое обнаружение темы, то есть создается обычным способом.FlinkKafkaConsumer,Например:

// 创建消费者
FlinkKafkaConsumer consumer = new FlinkKafkaConsumer<String>(            java.util.regex.Pattern.compile(sourceTopic.concat("-[0-9]")),
new KafkaMsgSchema(),
p);

В приведенном выше примере, когда задание запускается, потребитель подпишется на все темы, имена которых соответствуют указанному регулярному выражению (начиная сsourceTopicНачальное и конечное значения одного числа).

Определить водяной знак (окно)

Применение Kafka Connector не ограничивается описанным выше простым извлечением данных.Мы чаще ожидаем оконных операций с данными Kafka во время события, поэтому нам необходимо определить водяной знак в Flink Kafka Source.

Чтобы определить Event-time, во-первых, нужно передать атрибут времени в данные Kafka, предполагая, что наши данныеString#Longформат, напримерonly for test#1000. тогда мы будемLongкак столбец времени.

  • KafkaWithTsMsgSchema — полный код
    Чтобы проанализировать вышеуказанный формат данных Kafka, нам нужно разработать пользовательскую схему, например вызовKafkaWithTsMsgSchema,будетString#LongРазбор как javaTuple2<String, Long>, полный код выглядит следующим образом:

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.util.Preconditions;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.nio.charset.Charset;

public class KafkaWithTsMsgSchema implements DeserializationSchema<Tuple2<String, Long>>, SerializationSchema<Tuple2<String, Long>> {
    private static final long serialVersionUID = 1L;
    private transient Charset charset;

    public KafkaWithTsMsgSchema() {
        this(Charset.forName("UTF-8"));
    }

    public KafkaWithTsMsgSchema(Charset charset) {
        this.charset = Preconditions.checkNotNull(charset);
    }

    public Charset getCharset() {
        return this.charset;
    }

    public Tuple2<String, Long> deserialize(byte[] message) {
        String msg = new String(message, charset);
        String[] dataAndTs = msg.split("#");
        if(dataAndTs.length == 2){
            return new Tuple2<String, Long>(dataAndTs[0], Long.parseLong(dataAndTs[1].trim()));
        }else{
            // 实际生产上需要抛出runtime异常
            System.out.println("Fail due to invalid msg format.. ["+msg+"]");
            return new Tuple2<String, Long>(msg, 0L);
        }
    }

    @Override
    public boolean isEndOfStream(Tuple2<String, Long> stringLongTuple2) {
        return false;
    }

    public byte[] serialize(Tuple2<String, Long> element) {
        return "MAX - ".concat(element.f0).concat("#").concat(String.valueOf(element.f1)).getBytes(this.charset);
    }

    private void writeObject(ObjectOutputStream out) throws IOException {
        out.defaultWriteObject();
        out.writeUTF(this.charset.name());
    }

    private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
        in.defaultReadObject();
        String charsetName = in.readUTF();
        this.charset = Charset.forName(charsetName);
    }

    @Override
    public TypeInformation<Tuple2<String, Long>> getProducedType() {
        return new TupleTypeInfo<Tuple2<String, Long>>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO);
    }
}
  • Генерация водяных знаков

Извлечение временных меток и создание водяных знаков требует реализации собственного извлечения времени и генератора водяных знаков. Внутри Apache Flink есть 2 способа:

  • AssignerWithPunctuatedWatermarks — водяные знаки создаются для каждой записи.

  • AssignerWithPeriodicWatermarks — периодическое создание водяных знаков.

    мы начинаем сAssignerWithPunctuatedWatermarksВ качестве примера напишите собственное извлечение времени и генератор водяных знаков. код показывает, как показано ниже:

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;

import javax.annotation.Nullable;

public class KafkaAssignerWithPunctuatedWatermarks
        implements AssignerWithPunctuatedWatermarks<Tuple2<String, Long>> {
    @Nullable
    @Override
    public Watermark checkAndGetNextWatermark(Tuple2<String, Long> o, long l) {
        // 利用提取的时间戳创建Watermark
        return new Watermark(l);
    }

    @Override
    public long extractTimestamp(Tuple2<String, Long> o, long l) {
       // 提取时间戳
        return o.f1;
    }
}
  • Основная программа - Полная программа
    Мы вычисляем окно Tumble размером 1 секунду и вычисляем наибольшее значение в пределах окна. Полная процедура выглядит следующим образом:

import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;

import java.util.Properties;

public class KafkaWithEventTimeExample {
    public static void main(String[] args) throws Exception {
        // 用户参数获取
        final ParameterTool parameterTool = ParameterTool.fromArgs(args);
        // Stream 环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 设置 Event-time
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        // Source的topic
        String sourceTopic = "flink-topic";
        // Sink的topic
        String sinkTopic = "flink-topic-output";
        // broker 地址
        String broker = "localhost:9092";

        // 属性参数 - 实际投产可以在命令行传入
        Properties p = parameterTool.getProperties();
        p.putAll(parameterTool.getProperties());
        p.put("bootstrap.servers", broker);

        env.getConfig().setGlobalJobParameters(parameterTool);
        // 创建消费者
        FlinkKafkaConsumer consumer = new FlinkKafkaConsumer<Tuple2<String, Long>>(
                sourceTopic,
                new KafkaWithTsMsgSchema(),
                p);

        // 读取Kafka消息
        TypeInformation<Tuple2<String, Long>> typeInformation = new TupleTypeInfo<Tuple2<String, Long>>(
                BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO);

        DataStream<Tuple2<String, Long>> input = env
                .addSource(consumer).returns(typeInformation)
                // 提取时间戳,并生产Watermark
                .assignTimestampsAndWatermarks(new KafkaAssignerWithPunctuatedWatermarks());

        // 数据处理
        DataStream<Tuple2<String, Long>> result = input
                .windowAll(TumblingEventTimeWindows.of(Time.seconds(1)))
                .max(0);

        // 创建生产者
        FlinkKafkaProducer producer = new FlinkKafkaProducer<Tuple2<String, Long>>(
                sinkTopic,
                new KeyedSerializationSchemaWrapper<Tuple2<String, Long>>(new KafkaWithTsMsgSchema()),
                p,
                FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);

        // 将数据写入Kafka指定Topic中
        result.addSink(producer);

        // 执行job
        env.execute("Kafka With Event-time Example");
    }
}

Тест работает следующим образом

Чтобы объяснить кратко, мы вводим числа следующим образом:

Msg Watermark
E#1000000 1000000
A#3000000 3000000
B#5000000 5000000
C#5000100 5000100
E#5000120 5000120
A#7000000 7000000

что мы видим5000000~7000000данные, между которымиB#5000000, C#5000100иE#5000120является содержимым того же окна. Вычислить значение MAX, сравнить по строке, выводится самое большое сообщениеE#5000120.

Кафка несет временные метки

В Kafka-0.10+ сообщения могут содержать метки времени, а это означает, что нет необходимости добавлять столбец данных в виде меток времени отдельно в msg. Это проще только при использовании Flink как для записи, так и для чтения. В общем, вышеприведенного примера достаточно.

резюме

В этой статье рассказывается о том, как Kafka применяется во Flink. Она начинается с введения в простую установку Kafka и демонстрации команд для отправки и получения сообщений, а затем используется простое извлечение данных и пример окна во время события. чтобы вы интуитивно почувствовали, как использовать Kafka с Apache Flink.


Статьи, которые могут вас заинтересовать:

Более практические случаи будут обновлены позже...