Преобразование динамической таблицы Flink для потоковой передачи реального боя

Flink

Dynamic tableЭто логичная концепция в flink, и это динамическая таблица, которая позволяет передавать потоковые данные для поддержки табличного API и SQL. На следующем рисунке показана связь преобразования между потоком и динамической таблицей.Сначала поток преобразуется в динамическую таблицу, затем выполняются операции SQL на основе динамической таблицы для создания новой динамической таблицы, и, наконец, динамическая таблица преобразуется в поток. В этой статье основное внимание будет уделено 3 способам преобразования динамической таблицы в поток.

Примечание. Весь код, использованный в этой статье, основан на flink 1.9.0 иflink-table-planner-blink_2.11

Append-only stream

Официальное определение выглядит следующим образом:

A dynamic table that is only modified by INSERT changes can be converted into a stream by emitting the inserted rows.

То есть, еслиdynamic tableВключается только операция вставки новых данных, затем их можно преобразовать вappend-only stream, все данные добавляются к потоку.

Образец кода:

public class AppendOnlyExample {
	public static void main(String[] args) throws Exception {
		EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
		env.setParallelism(1);

		DataStream<Tuple2<String, String>> data = env.fromElements(
				new Tuple2<>("Mary", "./home"),
				new Tuple2<>("Bob", "./cart"),
				new Tuple2<>("Mary", "./prod?id=1"),
				new Tuple2<>("Liz", "./home"),
				new Tuple2<>("Bob", "./prod?id=3")
		);

		Table clicksTable = tEnv.fromDataStream(data, "user,url");

		tEnv.registerTable("clicks", clicksTable);
		Table rTable = tEnv.sqlQuery("select user,url from clicks where user='Mary'");

		DataStream ds = tEnv.toAppendStream(rTable, TypeInformation.of(new TypeHint<Tuple2<String, String>>(){}));
		ds.print();
		env.execute();
	}
}

результат операции:

(Mary,./prod?id=8)
(Mary,./prod?id=6)

Retract stream

официальное определение

A retract stream is a stream with two types of messages, add messages and retract messages. A dynamic table is converted into an retract stream by encoding an INSERT change as add message, a DELETE change as retract message, and an UPDATE change as a retract message for the updated (previous) row and an add message for the updating (new) row. The following figure visualizes the conversion of a dynamic table into a retract stream.

Образец кода:

public class RetractExample {
	public static void main(String[] args) throws Exception {
		EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
		env.setParallelism(1);

		DataStream<Tuple2<String, String>> data = env.fromElements(
				new Tuple2<>("Mary", "./home"),
				new Tuple2<>("Bob", "./cart"),
				new Tuple2<>("Mary", "./prod?id=1"),
				new Tuple2<>("Liz", "./home"),
				new Tuple2<>("Bob", "./prod?id=3")
		);

		Table clicksTable = tEnv.fromDataStream(data, "user,url");

		tEnv.registerTable("clicks", clicksTable);
		Table rTable = tEnv.sqlQuery("SELECT user,COUNT(url) FROM clicks GROUP BY user");

		DataStream ds = tEnv.toRetractStream(rTable, TypeInformation.of(new TypeHint<Tuple2<String, Long>>(){}));
		ds.print();
		env.execute();
	}
}

результат операции:

(true,(Mary,1))
(true,(Bob,1))
(false,(Mary,1))
(true,(Mary,2))
(true,(Liz,1))
(false,(Bob,1))
(true,(Bob,2))

заtoRetractStreamВозвращаемое значение представляет собойTuple2<Boolean, T>тип, первый элементtrueУказывает, что эти данные являются новыми данными для вставки,falseУказывает на старый фрагмент данных, который необходимо удалить. То есть можно разложить обновление определенного фрагмента данных в таблице на сначала удаление старого фрагмента данных, а затем вставку нового фрагмента данных.

Upsert stream

Официальное определение:

An upsert stream is a stream with two types of messages, upsert messages and delete messages. A dynamic table that is converted into an upsert stream requires a (possibly composite) unique key. A dynamic table with unique key is converted into a stream by encoding INSERT and UPDATE changes as upsert messages and DELETE changes as delete messages. The stream consuming operator needs to be aware of the unique key attribute in order to apply messages correctly. The main difference to a retract stream is that UPDATE changes are encoded with a single message and hence more efficient. The following figure visualizes the conversion of a dynamic table into an upsert stream.

Разница между этим режимом и двумя описанными выше режимами заключается в том, что для преобразования динамической таблицы в поток Upsert необходимо реализовать UpsertStreamTableSink, а не использовать его напрямую.StreamTableEnvironmentдля преобразования.

Образец кода:

public class UpsertExample {
	public static void main(String[] args) throws Exception {
		EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
		env.setParallelism(1);

		DataStream<Tuple2<String, String>> data = env.fromElements(
				new Tuple2<>("Mary", "./home"),
				new Tuple2<>("Bob", "./cart"),
				new Tuple2<>("Mary", "./prod?id=1"),
				new Tuple2<>("Liz", "./home"),
				new Tuple2<>("Liz", "./prod?id=3"),
				new Tuple2<>("Mary", "./prod?id=7")
		);

		Table clicksTable = tEnv.fromDataStream(data, "user,url");

		tEnv.registerTable("clicks", clicksTable);
		Table rTable = tEnv.sqlQuery("SELECT user,COUNT(url) FROM clicks GROUP BY user");
		tEnv.registerTableSink("MemoryUpsertSink", new MemoryUpsertSink(rTable.getSchema()));
		rTable.insertInto("MemoryUpsertSink");

		env.execute();
	}

	private static class MemoryUpsertSink implements UpsertStreamTableSink<Tuple2<String, Long>> {
		private TableSchema schema;
		private String[] keyFields;
		private boolean isAppendOnly;

		private String[] fieldNames;
		private TypeInformation<?>[] fieldTypes;

		public MemoryUpsertSink() {

		}

		public MemoryUpsertSink(TableSchema schema) {
			this.schema = schema;
		}

		@Override
		public void setKeyFields(String[] keys) {
			this.keyFields = keys;
		}

		@Override
		public void setIsAppendOnly(Boolean isAppendOnly) {
			this.isAppendOnly = isAppendOnly;
		}

		@Override
		public TypeInformation<Tuple2<String, Long>> getRecordType() {
			return TypeInformation.of(new TypeHint<Tuple2<String, Long>>(){});
		}

		@Override
		public void emitDataStream(DataStream<Tuple2<Boolean, Tuple2<String, Long>>> dataStream) {
			consumeDataStream(dataStream);
		}

		@Override
		public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, Tuple2<String, Long>>> dataStream) {
			return dataStream.addSink(new DataSink()).setParallelism(1);
		}

		@Override
		public TableSink<Tuple2<Boolean, Tuple2<String, Long>>> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
			MemoryUpsertSink memoryUpsertSink = new MemoryUpsertSink();
			memoryUpsertSink.setFieldNames(fieldNames);
			memoryUpsertSink.setFieldTypes(fieldTypes);
			memoryUpsertSink.setKeyFields(keyFields);
			memoryUpsertSink.setIsAppendOnly(isAppendOnly);

			return memoryUpsertSink;
		}

		@Override
		public String[] getFieldNames() {
			return schema.getFieldNames();
		}

		public void setFieldNames(String[] fieldNames) {
			this.fieldNames = fieldNames;
		}

		@Override
		public TypeInformation<?>[] getFieldTypes() {
			return schema.getFieldTypes();
		}

		public void setFieldTypes(TypeInformation<?>[] fieldTypes) {
			this.fieldTypes = fieldTypes;
		}
	}

	private static class DataSink extends RichSinkFunction<Tuple2<Boolean, Tuple2<String, Long>>> {
		public DataSink() {
		}

		@Override
		public void invoke(Tuple2<Boolean, Tuple2<String, Long>> value, Context context) throws Exception {
			System.out.println("send message:" + value);
		}
	}
}

результат операции:

send message:(true,(Mary,1))
send message:(true,(Bob,1))
send message:(true,(Mary,2))
send message:(true,(Liz,1))
send message:(true,(Liz,2))
send message:(true,(Mary,3))

Возвращаемое значение этого режима также являетсяTuple2<Boolean, T>Отличие от Retract в том, что обновление определенного фрагмента данных в таблице не возвращает удаление старых данных и вставку новых данных, а кажется, что определенный фрагмент данных действительно обновлен.

Побочная история Upsert Stream

Весь упомянутый выше контент взят с официального сайта flink, но приложены соответствующие образцы, которые могут позволить читателям более интуитивно почувствовать выходной эффект каждого режима. В Интернете также много переводов официальных документов, но почти не используются статьи или примеры.UpsertStreamTableSinkКогда и при каких обстоятельствах возвращать значениеTuple2<Boolean, T>Первый элементfalse? Нечего сказать, сразу переходите к примеру, просто измените sql в приведенном выше примере на следующий

String sql = "SELECT user, cnt " +
             "FROM (" +
                    "SELECT user,COUNT(url) as cnt FROM clicks GROUP BY user" +
                   ")" +
             "ORDER BY cnt LIMIT 2";

Возвращаемый результат:

send message:(true,(Mary,1))
send message:(true,(Bob,1))
send message:(false,(Mary,1))
send message:(true,(Bob,1))
send message:(true,(Mary,2))
send message:(true,(Liz,1))
send message:(false,(Liz,1))
send message:(true,(Mary,2))
send message:(false,(Mary,2))
send message:(true,(Liz,2))

Конкретный принцип можно посмотреть в исходном коде,org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecRankа такжеorg.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSortLimitПри парсинге sql получаются разные стратегии следующими методами, что влияет на необходимость удаления исходных данных.

  def getStrategy(forceRecompute: Boolean = false): RankProcessStrategy = {
    if (strategy == null || forceRecompute) {
      strategy = RankProcessStrategy.analyzeRankProcessStrategy(
        inputRel, ImmutableBitSet.of(), sortCollation, cluster.getMetadataQuery)
    }
    strategy
  }

Знайте, когда генерировать ложные атрибутивные данные, для пониманияJDBCUpsertTableSinkа такжеHBaseUpsertTableSinkбудет большим подспорьем.

Суммировать

Читая документы flink, я всегда хочу испытать принципы на практике кода.Многие статьи переводят описание официального сайта на китайский язык, но эта статья «переводит» описание официального сайта в код, надеясь помочь читателям понять официальный веб-сайт, значение в исходном тексте.