Серия Flink (3) - Источник данных Flink

Flink

Во-первых, встроенный источник данных

Источник данных Flink используется для определения источника данных программ Flink.Flink официально предоставляет различные методы сбора данных, которые помогают разработчикам легко и быстро создавать входные потоки, а именно:

1.1 Сборка из файлов

1. readTextFile(path): прочитать текстовый файл в соответствии с форматом TextInputFormat и вернуть его содержимое в виде строки. Пример выглядит следующим образом:

env.readTextFile(filePath).print();

2. readFile(fileInputFormat, path): прочитать файл в соответствии с указанным форматом.

3. readFile(inputFormat, filePath, watchType, interval, typeInformation): Периодически читать файл в соответствии с указанным форматом. Значение каждого параметра следующее:

  • inputFormat: формат ввода потока данных.
  • filePath: Путь файла, который может быть путь в локальной файловой системе или файловом пути на HDFS.
  • watchType: режим чтения, он имеет два необязательных значения, которыеFileProcessingMode.PROCESS_ONCEа такжеFileProcessingMode.PROCESS_CONTINUOUSLY: первое означает, что данные по указанному пути считываются только один раз, а затем завершаются; второе означает, что путь регулярно сканируется и читается. Обратите внимание, что если для параметра watchType установлено значениеPROCESS_CONTINUOUSLY, то при изменении файла все его содержимое (как старое, так и новое) будет повторно обработано, что нарушит работу Flink.exactly-onceсемантика.
  • interval: интервал периодического сканирования.
  • typeInformation: Тип элемента во входном потоке.

Пример использования следующий:

final String filePath = "D:\\log4j.properties";
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.readFile(new TextInputFormat(new Path(filePath)),
             filePath,
             FileProcessingMode.PROCESS_ONCE,
             1,
             BasicTypeInfo.STRING_TYPE_INFO).print();
env.execute();

1.2 Создание коллекций

1. fromCollection(Collection): создан на основе коллекции, все элементы которой должны быть одного типа. Пример выглядит следующим образом:

env.fromCollection(Arrays.asList(1,2,3,4,5)).print();

2. fromElements(T ...): Создан на основе элемента, все элементы должны быть одного типа. Пример выглядит следующим образом:

env.fromElements(1,2,3,4,5).print();

3. generateSequence(from, to): построить на основе заданного интервала последовательности. Пример выглядит следующим образом:

env.generateSequence(0,100);

4. fromCollection(Iterator, Class): Сборка на основе итераторов. Первый параметр используется для определения итератора, а второй параметр используется для определения типа выходного элемента. Пример использования следующий:

env.fromCollection(new CustomIterator(), BasicTypeInfo.INT_TYPE_INFO).print();

Среди них CustomIterator — пользовательский итератор.Вот пример генерации данных в диапазоне от 1 до 100. Исходный код выглядит следующим образом. Следует отметить, что помимо реализации интерфейса Iterator, пользовательский итератор также должен реализовывать интерфейс сериализации Serializable, иначе будет выброшено исключение о сбое сериализации:

import java.io.Serializable;
import java.util.Iterator;

public class CustomIterator implements Iterator<Integer>, Serializable {
    private Integer i = 0;

    @Override
    public boolean hasNext() {
        return i < 100;
    }

    @Override
    public Integer next() {
        i++;
        return i;
    }
}

5. fromParallelCollection(SplittableIterator, Class): метод получает два параметра, второй параметр используется для определения типа выходного элемента, а первый параметр SplittableIterator — это абстрактный базовый класс итератора, который используется для разделения значения исходного итератора на несколько непересекающихся итераций. в устройстве.

1.3 Использование сокета

Flink предоставляет метод socketTextStream для создания потоков данных на основе сокетов.Метод socketTextStream имеет следующие четыре основных параметра:

  • hostname:имя процессора;
  • port: номер порта, если установлено значение 0, это означает, что номер порта назначается автоматически;
  • delimiter: разделитель, используемый для разделения каждой записи;
  • maxRetry: Сокет временно отключается, когда максимальный интервал повтора программы в секундах. Значение 0 указывает на отсутствие повторных попыток; отрицательное значение указывает на повторную попытку. Пример выглядит следующим образом:
 env.socketTextStream("192.168.0.229", 9999, "\n", 3).print();

2. Пользовательский источник данных

2.1 SourceFunction

Помимо встроенных источников данных, пользователи также могут использоватьaddSourceметод для добавления пользовательского источника данных. Пользовательский источник данных должен реализовывать интерфейс SourceFunction. Вот пример генерации данных в интервале [0, 1000). Код выглядит следующим образом:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.addSource(new SourceFunction<Long>() {
    
    private long count = 0L;
    private volatile boolean isRunning = true;

    public void run(SourceContext<Long> ctx) {
        while (isRunning && count < 1000) {
            // 通过collect将输入发送出去 
            ctx.collect(count);
            count++;
        }
    }

    public void cancel() {
        isRunning = false;
    }

}).print();
env.execute();

2.2 ParallelSourceFunction и RichParallelSourceFunction

Источник данных, реализованный SourceFunction выше, не имеет параллелизма, то есть не поддерживает вызов результирующего DataStream.setParallelism(n)метод, будет выдано следующее исключение:

Exception in thread "main" java.lang.IllegalArgumentException: Source: 1 is not a parallel source

Если вы хотите реализовать входной поток с параллелизмом, вам необходимо реализовать интерфейс ParallelSourceFunction или RichParallelSourceFunction, и его связь с SourceFunction выглядит следующим образом:

https://github.com/heibaiying
ParallelSourceFunction напрямую наследуется от ParallelSourceFunction и имеет функцию параллелизма. RichParallelSourceFunction наследуется от AbstractRichFunction и реализует интерфейс ParallelSourceFunction, поэтому помимо функции параллелизма предоставляет также дополнительные методы, связанные с жизненным циклом, такие как open(), close().

3. Потоковые коннекторы

3.1 Встроенные разъемы

Помимо пользовательских источников данных, Flink также имеет встроенные соединители для большинства сценариев сбора данных. Текущая поддержка встроенных соединителей выглядит следующим образом:

  • Apache Kafka (поддерживает источник и приемник)
  • Apache Cassandra (sink)
  • Amazon Kinesis Streams (source/sink)
  • Elasticsearch (sink)
  • Hadoop FileSystem (sink)
  • RabbitMQ (source/sink)
  • Apache NiFi (source/sink)
  • Twitter Streaming API (source)
  • Google PubSub (source/sink)

В дополнение к указанным выше соединителям вы также можете расширить Flink через соединители Apache Bahir. Apache Bahir нацелен на функциональное расширение для распределенных систем анализа данных (таких как Spark, Flink) и т. д. В настоящее время поддерживаются следующие коннекторы, связанные с Flink:

  • Apache ActiveMQ (source/sink)
  • Apache Flume (sink)
  • Redis (sink)
  • Akka (sink)
  • Netty (source)

С постоянным развитием Flink ожидается, что он будет поддерживать все больше и больше типов соединителей.Для последующей разработки соединителей вы можете проверить его официальные документы:Streaming Connectors. Среди всех соединителей DataSource широко используется Kafka, поэтому здесь мы берем его в качестве примера, чтобы представить этапы интеграции соединителей.

3.2 Интеграция Кафки

1. Импорт зависимостей

При интеграции Kafka необходимо обращать внимание на используемую версию Kafka.Зависимости Maven, необходимые для разных версий, и классы, вызываемые во время разработки, различаются следующим образом:

Зависимости Maven Флинк версия Имя класса Consumer и Producer Кафка версия
flink-connector-kafka-0.8_2.11 1.0.0 + FlinkKafkaConsumer08
FlinkKafkaProducer08
0.8.x
flink-connector-kafka-0.9_2.11 1.0.0 + FlinkKafkaConsumer09
FlinkKafkaProducer09
0.9.x
flink-connector-kafka-0.10_2.11 1.2.0 + FlinkKafkaConsumer010
FlinkKafkaProducer010
0.10.x
flink-connector-kafka-0.11_2.11 1.4.0 + FlinkKafkaConsumer011
FlinkKafkaProducer011
0.11.x
flink-connector-kafka_2.11 1.7.0 + FlinkKafkaConsumer
FlinkKafkaProducer
>= 1.0.0

Используемая здесь версия Kafka — kafka_2.12-2.2.0, и добавлены следующие зависимости:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_2.11</artifactId>
    <version>1.9.0</version>
</dependency>

2. Разработка кода

Возьмем для примера простейший сценарий, данные по Kafka принимаются и распечатываются, код выглядит следующим образом:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
// 指定Kafka的连接位置
properties.setProperty("bootstrap.servers", "hadoop001:9092");
// 指定监听的主题,并定义Kafka字节消息到Flink对象之间的转换规则
DataStream<String> stream = env
    .addSource(new FlinkKafkaConsumer<>("flink-stream-in-topic", new SimpleStringSchema(), properties));
stream.print();
env.execute("Flink Streaming");

3.3 Интеграционное тестирование

1. Запустите Кафку

Работа Kafka зависит от zookeeper, который нужно запустить заранее.Вы можете запустить встроенный zookeeper Kafka, или можете запустить собственную установку:

# zookeeper启动命令
bin/zkServer.sh start

# 内置zookeeper启动命令
bin/zookeeper-server-start.sh config/zookeeper.properties

Запускаем одноузловую кафку для тестирования:

# bin/kafka-server-start.sh config/server.properties

2. Создать тему

# 创建用于测试主题
bin/kafka-topics.sh --create \
                    --bootstrap-server hadoop001:9092 \
                    --replication-factor 1 \
                    --partitions 1  \
                    --topic flink-stream-in-topic

# 查看所有主题
 bin/kafka-topics.sh --list --bootstrap-server hadoop001:9092

3. Запустите продюсер

Здесь запускается производитель Kafka для отправки тестовых данных:

bin/kafka-console-producer.sh --broker-list hadoop001:9092 --topic flink-stream-in-topic

4. Результаты испытаний

Введите произвольные тестовые данные на Producer и посмотрите вывод консоли программы:

https://github.com/heibaiying
Вывод консоли программы следующий:

https://github.com/heibaiying
Вы можете видеть, что соответствующие данные были успешно получены и распечатаны.

использованная литература

  1. источники данных:this.apache.org/projects/legal…
  2. Потоковые коннекторы:this.apache.org/projects/legal…
  3. Коннектор Apache Kafka:this.apache.org/projects/legal…

Другие статьи серии больших данных можно найти в проекте с открытым исходным кодом GitHub.:Руководство для начинающих по большим данным