Во-первых, встроенный источник данных
Источник данных Flink используется для определения источника данных программ Flink.Flink официально предоставляет различные методы сбора данных, которые помогают разработчикам легко и быстро создавать входные потоки, а именно:
1.1 Сборка из файлов
1. readTextFile(path): прочитать текстовый файл в соответствии с форматом TextInputFormat и вернуть его содержимое в виде строки. Пример выглядит следующим образом:
env.readTextFile(filePath).print();
2. readFile(fileInputFormat, path): прочитать файл в соответствии с указанным форматом.
3. readFile(inputFormat, filePath, watchType, interval, typeInformation): Периодически читать файл в соответствии с указанным форматом. Значение каждого параметра следующее:
- inputFormat: формат ввода потока данных.
- filePath: Путь файла, который может быть путь в локальной файловой системе или файловом пути на HDFS.
-
watchType: режим чтения, он имеет два необязательных значения, которые
FileProcessingMode.PROCESS_ONCEа такжеFileProcessingMode.PROCESS_CONTINUOUSLY: первое означает, что данные по указанному пути считываются только один раз, а затем завершаются; второе означает, что путь регулярно сканируется и читается. Обратите внимание, что если для параметра watchType установлено значениеPROCESS_CONTINUOUSLY, то при изменении файла все его содержимое (как старое, так и новое) будет повторно обработано, что нарушит работу Flink.exactly-onceсемантика. - interval: интервал периодического сканирования.
- typeInformation: Тип элемента во входном потоке.
Пример использования следующий:
final String filePath = "D:\\log4j.properties";
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.readFile(new TextInputFormat(new Path(filePath)),
filePath,
FileProcessingMode.PROCESS_ONCE,
1,
BasicTypeInfo.STRING_TYPE_INFO).print();
env.execute();
1.2 Создание коллекций
1. fromCollection(Collection): создан на основе коллекции, все элементы которой должны быть одного типа. Пример выглядит следующим образом:
env.fromCollection(Arrays.asList(1,2,3,4,5)).print();
2. fromElements(T ...): Создан на основе элемента, все элементы должны быть одного типа. Пример выглядит следующим образом:
env.fromElements(1,2,3,4,5).print();
3. generateSequence(from, to): построить на основе заданного интервала последовательности. Пример выглядит следующим образом:
env.generateSequence(0,100);
4. fromCollection(Iterator, Class): Сборка на основе итераторов. Первый параметр используется для определения итератора, а второй параметр используется для определения типа выходного элемента. Пример использования следующий:
env.fromCollection(new CustomIterator(), BasicTypeInfo.INT_TYPE_INFO).print();
Среди них CustomIterator — пользовательский итератор.Вот пример генерации данных в диапазоне от 1 до 100. Исходный код выглядит следующим образом. Следует отметить, что помимо реализации интерфейса Iterator, пользовательский итератор также должен реализовывать интерфейс сериализации Serializable, иначе будет выброшено исключение о сбое сериализации:
import java.io.Serializable;
import java.util.Iterator;
public class CustomIterator implements Iterator<Integer>, Serializable {
private Integer i = 0;
@Override
public boolean hasNext() {
return i < 100;
}
@Override
public Integer next() {
i++;
return i;
}
}
5. fromParallelCollection(SplittableIterator, Class): метод получает два параметра, второй параметр используется для определения типа выходного элемента, а первый параметр SplittableIterator — это абстрактный базовый класс итератора, который используется для разделения значения исходного итератора на несколько непересекающихся итераций. в устройстве.
1.3 Использование сокета
Flink предоставляет метод socketTextStream для создания потоков данных на основе сокетов.Метод socketTextStream имеет следующие четыре основных параметра:
- hostname:имя процессора;
- port: номер порта, если установлено значение 0, это означает, что номер порта назначается автоматически;
- delimiter: разделитель, используемый для разделения каждой записи;
- maxRetry: Сокет временно отключается, когда максимальный интервал повтора программы в секундах. Значение 0 указывает на отсутствие повторных попыток; отрицательное значение указывает на повторную попытку. Пример выглядит следующим образом:
env.socketTextStream("192.168.0.229", 9999, "\n", 3).print();
2. Пользовательский источник данных
2.1 SourceFunction
Помимо встроенных источников данных, пользователи также могут использоватьaddSourceметод для добавления пользовательского источника данных. Пользовательский источник данных должен реализовывать интерфейс SourceFunction. Вот пример генерации данных в интервале [0, 1000). Код выглядит следующим образом:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.addSource(new SourceFunction<Long>() {
private long count = 0L;
private volatile boolean isRunning = true;
public void run(SourceContext<Long> ctx) {
while (isRunning && count < 1000) {
// 通过collect将输入发送出去
ctx.collect(count);
count++;
}
}
public void cancel() {
isRunning = false;
}
}).print();
env.execute();
2.2 ParallelSourceFunction и RichParallelSourceFunction
Источник данных, реализованный SourceFunction выше, не имеет параллелизма, то есть не поддерживает вызов результирующего DataStream.setParallelism(n)метод, будет выдано следующее исключение:
Exception in thread "main" java.lang.IllegalArgumentException: Source: 1 is not a parallel source
Если вы хотите реализовать входной поток с параллелизмом, вам необходимо реализовать интерфейс ParallelSourceFunction или RichParallelSourceFunction, и его связь с SourceFunction выглядит следующим образом:
3. Потоковые коннекторы
3.1 Встроенные разъемы
Помимо пользовательских источников данных, Flink также имеет встроенные соединители для большинства сценариев сбора данных. Текущая поддержка встроенных соединителей выглядит следующим образом:
- Apache Kafka (поддерживает источник и приемник)
- Apache Cassandra (sink)
- Amazon Kinesis Streams (source/sink)
- Elasticsearch (sink)
- Hadoop FileSystem (sink)
- RabbitMQ (source/sink)
- Apache NiFi (source/sink)
- Twitter Streaming API (source)
- Google PubSub (source/sink)
В дополнение к указанным выше соединителям вы также можете расширить Flink через соединители Apache Bahir. Apache Bahir нацелен на функциональное расширение для распределенных систем анализа данных (таких как Spark, Flink) и т. д. В настоящее время поддерживаются следующие коннекторы, связанные с Flink:
- Apache ActiveMQ (source/sink)
- Apache Flume (sink)
- Redis (sink)
- Akka (sink)
- Netty (source)
С постоянным развитием Flink ожидается, что он будет поддерживать все больше и больше типов соединителей.Для последующей разработки соединителей вы можете проверить его официальные документы:Streaming Connectors. Среди всех соединителей DataSource широко используется Kafka, поэтому здесь мы берем его в качестве примера, чтобы представить этапы интеграции соединителей.
3.2 Интеграция Кафки
1. Импорт зависимостей
При интеграции Kafka необходимо обращать внимание на используемую версию Kafka.Зависимости Maven, необходимые для разных версий, и классы, вызываемые во время разработки, различаются следующим образом:
| Зависимости Maven | Флинк версия | Имя класса Consumer и Producer | Кафка версия |
|---|---|---|---|
| flink-connector-kafka-0.8_2.11 | 1.0.0 + | FlinkKafkaConsumer08 FlinkKafkaProducer08 |
0.8.x |
| flink-connector-kafka-0.9_2.11 | 1.0.0 + | FlinkKafkaConsumer09 FlinkKafkaProducer09 |
0.9.x |
| flink-connector-kafka-0.10_2.11 | 1.2.0 + | FlinkKafkaConsumer010 FlinkKafkaProducer010 |
0.10.x |
| flink-connector-kafka-0.11_2.11 | 1.4.0 + | FlinkKafkaConsumer011 FlinkKafkaProducer011 |
0.11.x |
| flink-connector-kafka_2.11 | 1.7.0 + | FlinkKafkaConsumer FlinkKafkaProducer |
>= 1.0.0 |
Используемая здесь версия Kafka — kafka_2.12-2.2.0, и добавлены следующие зависимости:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.9.0</version>
</dependency>
2. Разработка кода
Возьмем для примера простейший сценарий, данные по Kafka принимаются и распечатываются, код выглядит следующим образом:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
// 指定Kafka的连接位置
properties.setProperty("bootstrap.servers", "hadoop001:9092");
// 指定监听的主题,并定义Kafka字节消息到Flink对象之间的转换规则
DataStream<String> stream = env
.addSource(new FlinkKafkaConsumer<>("flink-stream-in-topic", new SimpleStringSchema(), properties));
stream.print();
env.execute("Flink Streaming");
3.3 Интеграционное тестирование
1. Запустите Кафку
Работа Kafka зависит от zookeeper, который нужно запустить заранее.Вы можете запустить встроенный zookeeper Kafka, или можете запустить собственную установку:
# zookeeper启动命令
bin/zkServer.sh start
# 内置zookeeper启动命令
bin/zookeeper-server-start.sh config/zookeeper.properties
Запускаем одноузловую кафку для тестирования:
# bin/kafka-server-start.sh config/server.properties
2. Создать тему
# 创建用于测试主题
bin/kafka-topics.sh --create \
--bootstrap-server hadoop001:9092 \
--replication-factor 1 \
--partitions 1 \
--topic flink-stream-in-topic
# 查看所有主题
bin/kafka-topics.sh --list --bootstrap-server hadoop001:9092
3. Запустите продюсер
Здесь запускается производитель Kafka для отправки тестовых данных:
bin/kafka-console-producer.sh --broker-list hadoop001:9092 --topic flink-stream-in-topic
4. Результаты испытаний
Введите произвольные тестовые данные на Producer и посмотрите вывод консоли программы:
использованная литература
- источники данных:this.apache.org/projects/legal…
- Потоковые коннекторы:this.apache.org/projects/legal…
- Коннектор Apache Kafka:this.apache.org/projects/legal…
Другие статьи серии больших данных можно найти в проекте с открытым исходным кодом GitHub.:Руководство для начинающих по большим данным