Практика работы CDC Connectors в Flink1.11

Flink

Flink 1.11 представил коннектор CDC, который может легко фиксировать измененные данные и значительно упрощает процесс обработки данных. Разъем CDC Flink1.11 в основном включает в себя:MySQL CDCиPostgres CDC, а у КафкиConnectorслужба поддержкиcanal-jsonиdebezium-jsonа такжеchangelog-jsonформат. Эта статья в основном имеет следующее содержание:

  • Введение в CDC
  • формат таблицы предоставлен Flink
  • Примечания во время использования
  • Практика работы с Mysql-CDC
  • Практика работы canal-json
  • Практика работы с changelog-json

Введение

Flink CDC Connector — это набор соединителей источников данных для Apache Flink, который использует **сбор данных об изменениях (CDC))** для извлечения данных об изменениях из разных баз данных. Соединитель Flink CDC интегрирует Debezium в качестве механизма для регистрации изменений данных. Таким образом, он может полностью использовать возможности Debezium.

Функции

  • Поддерживает чтение моментальных снимков базы данных и может непрерывно читать журнал изменений базы данных даже в случае сбоя.exactly-onceсемантика обработки

  • Для соединителя CDC API DataStream пользователи могут использовать измененные данные в нескольких базах данных и таблицах в одном задании без развертывания Debezium и Kafka.

  • Для соединителя CDC API Table/SQL пользователи могут использовать SQL DDL для создания источника данных CDC для отслеживания изменений данных в одной таблице.

сцены, которые будут использоваться

  • Инкрементальная синхронизация данных между базами данных
  • Журнал аудита
  • Материализованные представления в реальном времени поверх базы данных
  • Объединение таблиц измерений на основе CDC

формат таблицы предоставлен Flink

Flink предоставляет ряд форматов таблиц, которые можно использовать для соединителей таблиц, а именно:

Formats Supported Connectors
CSV Apache Kafka, Filesystem
JSON Apache Kafka, Filesystem, Elasticsearch
Apache Avro Apache Kafka, Filesystem
Debezium CDC Apache Kafka
Canal CDC Apache Kafka
Apache Parquet Filesystem
Apache ORC Filesystem

Примечания во время использования

Примечания по использованию MySQL CDC

Если вы хотите использовать коннектор MySQL CDC, для программы необходимо добавить следующие зависимости:

<dependency>
  <groupId>com.alibaba.ververica</groupId>
  <artifactId>flink-connector-mysql-cdc</artifactId>
  <version>1.0.0</version>
</dependency>

Если вы хотите использовать клиент Flink SQL, вам необходимо добавить следующий пакет jar:flink-sql-connector-mysql-cdc-1.0.0.jar, поместите пакет jar в папку lib каталога установки Flink.

Примечания по использованию canal-json

Если вы хотите использовать canal-json Kafka, для программы вам нужно добавить следующие зависимости:

<!-- universal -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_2.11</artifactId>
    <version>1.11.0</version>
</dependency>


Если вы хотите использовать клиент Flink SQL, вам необходимо добавить следующий пакет jar:flink-sql-connector-kafka_2.11-1.11.0.jar, поместите пакет jar в папку lib каталога установки Flink. Поскольку пакет jar не указан в каталоге lib установочного пакета Flink1.11, пакет зависимостей необходимо добавить вручную, иначе будет сообщено о следующей ошибке:

[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'kafka' that implements 'org.apache.flink.table.factories.DynamicTableSourceFactory' in the classpath.

Available factory identifiers are:

datagen
mysql-cdc

Примечания по использованию журнала изменений-json

Если вы хотите использовать формат журнала изменений Kafka-json, для программы вам необходимо добавить следующие зависимости:

<dependency>
  <groupId>com.alibaba.ververica</groupId>
  <artifactId>flink-format-changelog-json</artifactId>
  <version>1.0.0</version>
</dependency>

Если вы хотите использовать клиент Flink SQL, вам необходимо добавить следующий пакет jar:flink-format-changelog-json-1.0.0.jar, поместите пакет jar в папку lib каталога установки Flink.

Практика работы с Mysql-CDC

Создать таблицу источника данных MySQL

Перед созданием таблицы MySQL CDC вам необходимо создать таблицу данных MySQL следующим образом:

-- MySQL
/*Table structure for table `order_info` */
DROP TABLE IF EXISTS `order_info`;
CREATE TABLE `order_info` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '编号',
  `consignee` varchar(100) DEFAULT NULL COMMENT '收货人',
  `consignee_tel` varchar(20) DEFAULT NULL COMMENT '收件人电话',
  `total_amount` decimal(10,2) DEFAULT NULL COMMENT '总金额',
  `order_status` varchar(20) DEFAULT NULL COMMENT '订单状态,1表示下单,2表示支付',
  `user_id` bigint(20) DEFAULT NULL COMMENT '用户id',
  `payment_way` varchar(20) DEFAULT NULL COMMENT '付款方式',
  `delivery_address` varchar(1000) DEFAULT NULL COMMENT '送货地址',
  `order_comment` varchar(200) DEFAULT NULL COMMENT '订单备注',
  `out_trade_no` varchar(50) DEFAULT NULL COMMENT '订单交易编号(第三方支付用)',
  `trade_body` varchar(200) DEFAULT NULL COMMENT '订单描述(第三方支付用)',
  `create_time` datetime DEFAULT NULL COMMENT '创建时间',
  `operate_time` datetime DEFAULT NULL COMMENT '操作时间',
  `expire_time` datetime DEFAULT NULL COMMENT '失效时间',
  `tracking_no` varchar(100) DEFAULT NULL COMMENT '物流单编号',
  `parent_order_id` bigint(20) DEFAULT NULL COMMENT '父订单编号',
  `img_url` varchar(200) DEFAULT NULL COMMENT '图片路径',
  `province_id` int(20) DEFAULT NULL COMMENT '地区',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='订单表';
-- ----------------------------
-- Records of order_info
-- ----------------------------
INSERT INTO `order_info` 
VALUES (476, 'lAXjcL', '13408115089', 433.00, '2', 10, '2', 'OYyAdSdLxedceqovndCD', 'ihjAYsSjrgJMQVdFQnSy', '8728720206', '', '2020-06-18 02:21:38', NULL, NULL, NULL, NULL, NULL, 9);
INSERT INTO `order_info`
VALUES (477, 'QLiFDb', '13415139984', 772.00, '1', 90, '2', 'OizYrQbKuWvrvdfpkeSZ', 'wiBhhqhMndCCgXwmWVQq', '1679381473', '', '2020-06-18 09:12:25', NULL, NULL, NULL, NULL, NULL, 3);
INSERT INTO `order_info`
VALUES (478, 'iwKjQD', '13320383859', 88.00, '1', 107, '1', 'cbXLKtNHWOcWzJVBWdAs', 'njjsnknHxsxhuCCeNDDi', '0937074290', '', '2020-06-18 15:56:34', NULL, NULL, NULL, NULL, NULL, 7);

/*Table structure for table `order_detail` */
CREATE TABLE `order_detail` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '编号',
  `order_id` bigint(20) DEFAULT NULL COMMENT '订单编号',
  `sku_id` bigint(20) DEFAULT NULL COMMENT 'sku_id',
  `sku_name` varchar(200) DEFAULT NULL COMMENT 'sku名称(冗余)',
  `img_url` varchar(200) DEFAULT NULL COMMENT '图片名称(冗余)',
  `order_price` decimal(10,2) DEFAULT NULL COMMENT '购买价格(下单时sku价格)',
  `sku_num` varchar(200) DEFAULT NULL COMMENT '购买个数',
  `create_time` datetime DEFAULT NULL COMMENT '创建时间',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='订单明细表';

-- ----------------------------
-- Records of order_detail
-- ----------------------------
INSERT INTO `order_detail` 
VALUES (1329, 476, 8, 'Apple iPhone XS Max (A2104) 256GB 深空灰色 移动联通电信4G手机 双卡双待', 'http://XLMByOyZDTJQYxphQHNTgYAFzJJCKTmCbzvEJIpz', 8900.00, '3', '2020-06-18 02:21:38');
INSERT INTO `order_detail` 
VALUES (1330, 477, 9, '荣耀10 GT游戏加速 AIS手持夜景 6GB+64GB 幻影蓝全网通 移动联通电信', 'http://ixOCtlYmlxEEgUfPLiLdjMftzrleOEIBKSjrhMne', 2452.00, '4', '2020-06-18 09:12:25');
INSERT INTO `order_detail`
VALUES (1331, 478, 4, '小米Play 流光渐变AI双摄 4GB+64GB 梦幻蓝 全网通4G 双卡双待 小水滴全面屏拍照游戏智能手机', 'http://RqfEFnAOqnqRnNZLFRvBuwXxwNBtptYJCILDKQYv', 1442.00, '1', '2020-06-18 15:56:34');
INSERT INTO `order_detail` 
VALUES (1332, 478, 8, 'Apple iPhone XS Max (A2104) 256GB 深空灰色 移动联通电信4G手机 双卡双待', 'http://IwhuCDlsiLenfKjPzbJrIoxswdfofKhJLMzlJAKV', 8900.00, '3', '2020-06-18 15:56:34');
INSERT INTO `order_detail` 
VALUES (1333, 478, 8, 'Apple iPhone XS Max (A2104) 256GB 深空灰色 移动联通电信4G手机 双卡双待', 'http://bbfwTbAzTWapywODzOtDJMJUEqNTeRTUQuCDkqXP', 8900.00, '1', '2020-06-18 15:56:34');

Flink SQL Cli для создания источника данных CDC

Запустите кластер Flink, запустите SQL CLI и выполните следующие команды:

-- 创建订单信息表
CREATE TABLE order_info(
    id BIGINT,
    user_id BIGINT,
    create_time TIMESTAMP(0),
    operate_time TIMESTAMP(0),
    province_id INT,
    order_status STRING,
    total_amount DECIMAL(10, 5)
  ) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = 'kms-1',
    'port' = '3306',
    'username' = 'root',
    'password' = '123qwe',
    'database-name' = 'mydw',
    'table-name' = 'order_info'
);

Запросите данные этой таблицы в Flink SQL Cli: режим результата: таблица, + означает вставку данных

在这里插入图片描述

Создание таблицы сведений о заказе в SQL CLI в:

CREATE TABLE order_detail(
    id BIGINT,
    order_id BIGINT,
    sku_id BIGINT,
    sku_name STRING,
    sku_num BIGINT,
    order_price DECIMAL(10, 5),
	create_time TIMESTAMP(0)
 ) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = 'kms-1',
    'port' = '3306',
    'username' = 'root',
    'password' = '123qwe',
    'database-name' = 'mydw',
    'table-name' = 'order_detail'
);

Результаты запроса следующие:

在这里插入图片描述

Выполните операцию JOIN:

SELECT
    od.id,
    oi.id order_id,
    oi.user_id,
    oi.province_id,
    od.sku_id,
    od.sku_name,
    od.sku_num,
    od.order_price,
    oi.create_time,
    oi.operate_time
FROM
   (
    SELECT * 
    FROM order_info
    WHERE 
	     order_status = '2'-- 已支付
   ) oi
   JOIN
  (
    SELECT *
    FROM order_detail
  ) od 
  ON oi.id = od.order_id;

Практика работы canal-json

Для использования cannal вы можете обратиться к моей другой статье:Инкрементальная синхронизация данных в реальном времени на основе Canal и Flink (1). Я синхронизировал следующую таблицу с kafka через канал, конкретный формат:

{
    "data":[
        {
            "id":"1",
            "region_name":"华北"
        },
        {
            "id":"2",
            "region_name":"华东"
        },
        {
            "id":"3",
            "region_name":"东北"
        },
        {
            "id":"4",
            "region_name":"华中"
        },
        {
            "id":"5",
            "region_name":"华南"
        },
        {
            "id":"6",
            "region_name":"西南"
        },
        {
            "id":"7",
            "region_name":"西北"
        }
    ],
    "database":"mydw",
    "es":1597128441000,
    "id":102,
    "isDdl":false,
    "mysqlType":{
        "id":"varchar(20)",
        "region_name":"varchar(20)"
    },
    "old":null,
    "pkNames":null,
    "sql":"",
    "sqlType":{
        "id":12,
        "region_name":12
    },
    "table":"base_region",
    "ts":1597128441424,
    "type":"INSERT"
}

Создайте таблицу формата canal-json в SQL CLI в:

CREATE TABLE region (
  id BIGINT,
  region_name STRING
) WITH (
 'connector' = 'kafka',
 'topic' = 'mydw.base_region',
 'properties.bootstrap.servers' = 'kms-3:9092',
 'properties.group.id' = 'testGroup',
 'format' = 'canal-json' ,
 'scan.startup.mode' = 'earliest-offset' 
);

Результаты запроса следующие:

在这里插入图片描述

Практика работы с changelog-json

Создать источник данных MySQL

см. вышеorder_info

Flink SQL Cli создает таблицу изменений-json

CREATE TABLE order_gmv2kafka (
  day_str STRING,
  gmv DECIMAL(10, 5)
) WITH (
    'connector' = 'kafka',
    'topic' = 'order_gmv_kafka',
    'scan.startup.mode' = 'earliest-offset',
    'properties.bootstrap.servers' = 'kms-3:9092',
    'format' = 'changelog-json'
);

INSERT INTO order_gmv2kafka
SELECT DATE_FORMAT(create_time, 'yyyy-MM-dd') as day_str, SUM(total_amount) as gmv
FROM order_info
WHERE order_status = '2' -- 订单已支付
GROUP BY DATE_FORMAT(create_time, 'yyyy-MM-dd'); 

Запросите таблицу, чтобы увидеть результаты:

在这里插入图片描述

Еще раз проверьте данные Kafka:

{"data":{"day_str":"2020-06-18","gmv":433},"op":"+I"}

При обновлении статуса order_status двух других ордеров на 2,总金额=443+772+88=1293Еще раз просмотрите данные:

在这里插入图片描述

Посмотрите еще раз на данные в kafka:

在这里插入图片描述

Суммировать

На основе Flink1.11 SQL описано использование недавно добавленного коннектора CDC. Включая коннектор MySQL CDC, формат canal-json и changelog-json, а также обратил внимание во время использования. Кроме того, использование бумаги дает полный пример, если у вас есть существующая среда, вы можете напрямую тестировать.

Официальный аккаунт «Технологии больших данных и хранилище данных», ответьте «Данные», чтобы получить пакет больших данных