А. Приемники данных
При использовании Flink для обработки данных данные поступают через источник данных, а затем преобразуются с помощью серии преобразований, и, наконец, результаты вычислений могут выводиться через приемник.Приемники данных Flink используются для определения конечного местоположения вывода данных. поток данных. Flink предоставляет несколько более простых API-интерфейсов приемника для ежедневной разработки, а именно:
1.1 writeAsText
writeAsText
Он используется для записи результатов расчета параллельно в указанную папку в текстовом режиме. Помимо обязательного параметра пути, этот метод также может определить режим вывода, указав второй параметр. Он имеет следующие два необязательных значения. :
- WriteMode.NO_OVERWRITE: Операция записи выполняется только при отсутствии файла по указанному пути;
- WriteMode.OVERWRITE: Независимо от того, есть ли файл по указанному пути, выполняется операция записи; если есть существующий файл, он будет перезаписан.
Пример использования следующий:
streamSource.writeAsText("D:\\out", FileSystem.WriteMode.OVERWRITE);
Приведенная выше запись записывается в несколько файлов параллельно.Если вы хотите записать все результаты вывода в один файл, вам нужно установить параллелизм равным 1:
streamSource.writeAsText("D:\\out", FileSystem.WriteMode.OVERWRITE).setParallelism(1);
1.2 writeAsCsv
writeAsCsv
Он используется для записи результата расчета в указанный каталог в формате файла CSV.В дополнение к обязательному параметру пути этот метод также поддерживает ввод трех дополнительных параметров: режим вывода, разделитель строк и разделитель полей.Его определение метода как следует:
writeAsCsv(String path, WriteMode writeMode, String rowDelimiter, String fieldDelimiter)
1.3 print \ printToErr
print \ printToErr
Это наиболее часто используемый метод тестирования, который используется для вывода результатов вычислений на консоль в виде стандартного потока вывода или потока вывода ошибок.
1.4 writeUsingOutputFormat
Результаты расчета записываются в пользовательском формате вывода, как описано выше.writeAsText
иwriteAsCsv
Нижний слой вызывает этот метод, а исходный код выглядит следующим образом:
public DataStreamSink<T> writeAsText(String path, WriteMode writeMode) {
TextOutputFormat<T> tof = new TextOutputFormat<>(new Path(path));
tof.setWriteMode(writeMode);
return writeUsingOutputFormat(tof);
}
1.5 writeToSocket
writeToSocket
Используется для записи результата вычисления в Socket в указанном формате, пример использования следующий:
streamSource.writeToSocket("192.168.0.226", 9999, new SimpleStringSchema());
2. Потоковые коннекторы
В дополнение к вышеупомянутым API, Flink также имеет встроенную серию соединителей, которые используются для ввода результатов вычислений в общие системы хранения или промежуточное программное обеспечение сообщений, как показано ниже:
- Apache Kafka (поддерживает источник и приемник)
- Apache Cassandra (sink)
- Amazon Kinesis Streams (source/sink)
- Elasticsearch (sink)
- Hadoop FileSystem (sink)
- RabbitMQ (source/sink)
- Apache NiFi (source/sink)
- Google PubSub (source/sink)
Помимо встроенных соединителей, вы можете расширить Flink через соединители Apache Bahir. Apache Bahir стремится обеспечить функциональное расширение для распределенных систем анализа данных (таких как Spark, Flink) и т. д. В настоящее время поддерживаются следующие коннекторы, связанные с Flink Sink:
- Apache ActiveMQ (source/sink)
- Apache Flume (sink)
- Redis (sink)
- Akka (sink)
Здесь, на основе интеграции источников Kafka, представленной в главе «Источники данных», также интегрируется Kafka Sink.Конкретные шаги заключаются в следующем.
3. Интеграция Kafka Sink
3.1 addSink
Flink предоставляет метод addSink для вызова пользовательского приемника или стороннего соединителя. Если вы хотите записать результат расчета в Kafka, вам необходимо использовать этот метод для вызова производителя Kafka FlinkKafkaProducer. Конкретный код выглядит следующим образом:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 1.指定Kafka的相关配置属性
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "192.168.200.0:9092");
// 2.接收Kafka上的数据
DataStream<String> stream = env
.addSource(new FlinkKafkaConsumer<>("flink-stream-in-topic", new SimpleStringSchema(), properties));
// 3.定义计算结果到 Kafka ProducerRecord 的转换
KafkaSerializationSchema<String> kafkaSerializationSchema = new KafkaSerializationSchema<String>() {
@Override
public ProducerRecord<byte[], byte[]> serialize(String element, @Nullable Long timestamp) {
return new ProducerRecord<>("flink-stream-out-topic", element.getBytes());
}
};
// 4. 定义Flink Kafka生产者
FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>("flink-stream-out-topic",
kafkaSerializationSchema,
properties,
FlinkKafkaProducer.Semantic.AT_LEAST_ONCE, 5);
// 5. 将接收到输入元素*2后写出到Kafka
stream.map((MapFunction<String, String>) value -> value + value).addSink(kafkaProducer);
env.execute("Flink Streaming");
3.2 Создать выходную тему
Создайте тему для вывода тестов:
bin/kafka-topics.sh --create \
--bootstrap-server hadoop001:9092 \
--replication-factor 1 \
--partitions 1 \
--topic flink-stream-out-topic
# 查看所有主题
bin/kafka-topics.sh --list --bootstrap-server hadoop001:9092
3.3 Запуск потребителя
Запустите потребителя Kafka, чтобы просмотреть вывод программы Flink:
bin/kafka-console-consumer.sh --bootstrap-server hadoop001:9092 --topic flink-stream-out-topic
3.4 Результаты испытаний
Отправьте сообщение программе Flink производителя Kafka и просмотрите преобразованный вывод программы Flink следующим образом:
Видно, что данные, отправленные производителем Kafka, нормально принимаются программой Flink и после конвертации выводятся в топик, соответствующий Kafka.
4. Пользовательская раковина
Помимо использования встроенных коннекторов сторонних производителей, Flink также поддерживает использование пользовательских приемников для удовлетворения разнообразных потребностей вывода. Чтобы реализовать собственный приемник, вам необходимо прямо или косвенно реализовать интерфейс SinkFunction. Обычно мы реализуем его абстрактный класс RichSinkFunction, который предоставляет больше методов, связанных с жизненным циклом, чем SinkFunction. Отношения между ними следующие:
Здесь мы берем пользовательский FlinkToMySQLSink в качестве примера для записи результатов вычислений в базу данных MySQL.Конкретные шаги заключаются в следующем:
4.1 Импорт зависимостей
Во-первых, вам нужно импортировать зависимости, связанные с MySQL:
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.16</version>
</dependency>
4.2 Пользовательский приемник
Унаследовано от RichSinkFunction для реализации пользовательского приемника:
public class FlinkToMySQLSink extends RichSinkFunction<Employee> {
private PreparedStatement stmt;
private Connection conn;
@Override
public void open(Configuration parameters) throws Exception {
Class.forName("com.mysql.cj.jdbc.Driver");
conn = DriverManager.getConnection("jdbc:mysql://192.168.0.229:3306/employees" +
"?characterEncoding=UTF-8&serverTimezone=UTC&useSSL=false",
"root",
"123456");
String sql = "insert into emp(name, age, birthday) values(?, ?, ?)";
stmt = conn.prepareStatement(sql);
}
@Override
public void invoke(Employee value, Context context) throws Exception {
stmt.setString(1, value.getName());
stmt.setInt(2, value.getAge());
stmt.setDate(3, value.getBirthday());
stmt.executeUpdate();
}
@Override
public void close() throws Exception {
super.close();
if (stmt != null) {
stmt.close();
}
if (conn != null) {
conn.close();
}
}
}
4.3 Использование пользовательского приемника
Если вы хотите использовать собственный приемник, вам также необходимо вызвать метод addSink следующим образом:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Date date = new Date(System.currentTimeMillis());
DataStreamSource<Employee> streamSource = env.fromElements(
new Employee("hei", 10, date),
new Employee("bai", 20, date),
new Employee("ying", 30, date));
streamSource.addSink(new FlinkToMySQLSink());
env.execute();
4.4 Результаты испытаний
Запустите программу и наблюдайте, как база данных пишет:
База данных успешно записана, что означает успешную интеграцию пользовательского приемника.
Исходный код все вышеперечисленные случаи использования можно найти в этом репозитории:flink-kafka-integration
использованная литература
- приемники данных:this.apache.org/projects/legal…
- Потоковые коннекторы:this.apache.org/projects/legal…
- Коннектор Apache Kafka:this.apache.org/projects/legal…
Другие статьи серии больших данных можно найти в проекте с открытым исходным кодом GitHub.:Руководство для начинающих по большим данным