Дебезиум использует cdc (включено)

визуализация данных
Дебезиум использует cdc (включено)

Оригинальный адрес поста в блоге:Дебезиум использует cdc (включено)

Введение

debezium — это распределенная платформа с открытым исходным кодом для регистрации изменений данных (cdc). Запустить и указать на базу данных, когда другие приложения выполняются с этой базой данныхinserts,updates,deleteВо время работы приложение быстро реагирует. debezium устойчив и отзывчив, поэтому ваше приложение может реагировать быстро, не теряя ни одного события. Запись debezium — это событие изменения на уровне строки для таблицы базы данных. В то же время debezium построен на kafka и тесно связан с kafka, поэтому он предоставляет соединитель kafka для использования, приемник debezium. Поддерживаемые базы данных: mysql, MongoDB, PostgreSQL, Oracle, SQL-сервер. В этой статье mysql используется в качестве источника данных для реализации функции, и ее необходимо изменить для отслеживания бинарного журнала msyql. Текущая версия 0.9.5.Final, а версия 0.10 находится в разработке.

настроить

В этой статье в основном используетсяEmbeddingФорма прослушивает события и синхронно обновляет базу данных.

Следующая часть в основном использует коннектор kafka для синхронизации обновлений в базе данных.

MySQL должен открыть binlog следующим образом. Но если вы используете образ debezium/mysql, он настраивается автоматически.

log-bin=mysql-bin #添加这一行就ok
binlog-format=ROW #选择row模式
server_id=1 #配置mysql replaction需要定义,不能和canal的slaveId重复

Tutorial

Первый эффект заключается в настройке коннектора kafka для получения записей о событиях debezium. Требуется 3 сервиса, зоокипер, какфа и дебезиум коннектор. Для начала здесь используется Docker, поэтому сначала вам нужно следовать за docker.

начать смотритель зоопарка

docker run -it --rm --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 debezium/zookeeper:0.9

начать кафку

docker run -it --rm --name kafka -p 9092:9092 --link zookeeper:zookeeper debezium/kafka:0.9

запустить mysql

docker run -it --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw debezium/example-mysql:0.9

запустить кафку подключить

docker run -it --rm --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses --link zookeeper:zookeeper --link kafka:kafka --link mysql:mysql debezium/connect:0.9

Создайте коннектор debezium через http-запрос подключения

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ "name": "inventory-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "mysql", "database.port": "3306", "database.user": "debezium", "database.password": "dbz", "database.server.id": "184054", "database.server.name": "dbserver1", "database.whitelist": "inventory", "database.history.kafka.bootstrap.servers": "kafka:9092", "database.history.kafka.topic": "dbhistory.inventory" } }'

работа клиента mysql

пройти черезinvertoryДанные из любой таблицы базы данных

Создание слушателя может просмотреть журнал событий Debezium

docker run -it --name watcher --rm --link zookeeper:zookeeper --link kafka:kafka debezium/kafka:0.9 watch-topic -a -k dbserver1.inventory.customers

Встроенный

Здесь мы в основном используем встроенный метод для получения событий cdc без использования kafka и напрямую потребляем поток событий debezium. Сценарий заключается в том, что таблица в базе данных mysql изменяется, и эти изменения синхронизируются с другой базой данных mysql. На этот раз мы используем мониторингinventoryбазу данных и синхронизировать данные сinventory_back.

###конфигурация дебезиума

connector.class=io.debezium.connector.mysql.MySqlConnector
offset.storage=org.apache.kafka.connect.storage.FileOffsetBackingStore
offset.storage.file.filename=offset.dat
offset.flush.interval.ms=60000

name=debezium-kafka-source
database.hostname=localhost
database.port=3306
database.user=debezium
database.password=dbz
#database.dbname=inventory
database.whitelist=inventory
#database.whitelist=inventory,inventory_back
server.id=184054
database.server.name=dbserver1
#transforms=unwrap
#transforms.unwrap.type=io.debezium.transforms.UnwrapFromEnvelope
#transforms.unwrap.drop.tombstones=false

database.history=io.debezium.relational.history.FileDatabaseHistory
database.history.file.filename=dbhistory.dat

Свойства и конвертировать конфигурацию

@Slf4j
@Configuration
public class DebeziumEmbeddedAutoConfiguration {

    @Bean
    public Properties embeddedProperties() {
        Properties propConfig = new Properties();
        try(InputStream propsInputStream = getClass().getClassLoader().getResourceAsStream("config.properties")) {
            propConfig.load(propsInputStream);
        } catch (IOException e) {
            log.error("Couldn't load properties", e);
        }
        PropertyLoader.loadEnvironmentValues(propConfig);
        return propConfig;
    }

    @Bean
    public io.debezium.config.Configuration embeddedConfig(Properties embeddedProperties) {
         return io.debezium.config.Configuration.from(embeddedProperties);
    }

    @Bean
    public JsonConverter keyConverter(io.debezium.config.Configuration embeddedConfig) {
        JsonConverter converter = new JsonConverter();
        converter.configure(embeddedConfig.asMap(), true);
        return converter;
    }

    @Bean
    public JsonConverter valueConverter(io.debezium.config.Configuration embeddedConfig) {
        JsonConverter converter = new JsonConverter();
        converter.configure(embeddedConfig.asMap(), false);
        return converter;
    }

}

Синхронизация DDL и DML

Это в основном используется для использования функции CommandLineRunner для запуска механизма EmbeddedEngine debezium, и после получения события cdchandleRecordДля обработки DDL и DML вам необходимо разобрать события cdc.SourceRecordключ и значение.

@Slf4j
@Order(2)
@Component
public class DebeziumEmbeddedRunner implements CommandLineRunner {

    @Autowired
    private io.debezium.config.Configuration embeddedConfig;

    @Autowired
    private JdbcTemplate jdbcTemplate;

    @Autowired
    private NamedParameterJdbcTemplate namedTemplate;

    @Autowired
    private JsonConverter keyConverter;

    @Autowired
    private JsonConverter valueConverter;

    @Override
    public void run(String... args) throws Exception {
        EmbeddedEngine engine = EmbeddedEngine.create()
                .using(embeddedConfig)
                .using(this.getClass().getClassLoader())
                .using(Clock.SYSTEM)
                .notifying(this::handleRecord)
                .build();

        ExecutorService executor = Executors.newSingleThreadExecutor();

        executor.execute(engine);

        shutdownHook(engine);

        awaitTermination(executor);
    }

    /**
     * For every record this method will be invoked.
     */
    private void handleRecord(SourceRecord record) {
        logRecord(record);

        Struct payload = (Struct) record.value();
        if (Objects.isNull(payload)) {
            return;
        }
        String table = Optional.ofNullable(DebeziumRecordUtils.getRecordStructValue(payload, "source"))
                .map(s->s.getString("table")).orElse(null);

//        // 处理数据DML
        Envelope.Operation operation = DebeziumRecordUtils.getOperation(payload);
        if (Objects.nonNull(operation)) {
            Struct key = (Struct) record.key();
            handleDML(key, payload, table, operation);
            return;
        }
//
//        // 处理结构DDL
        String ddl = getDDL(payload);
        if (StringUtils.isNotBlank(ddl)) {
            handleDDL(ddl);
        }
    }

    private String getDDL(Struct payload) {
        String ddl = DebeziumRecordUtils.getDDL(payload);
        if (StringUtils.isBlank(ddl)) {
            return null;
        }
        String db = DebeziumRecordUtils.getDatabaseName(payload);
        if (StringUtils.isBlank(db)) {
            db = embeddedConfig.getString(MySqlConnectorConfig.DATABASE_WHITELIST);
        }
        ddl = ddl.replace(db + ".", "");
        ddl = ddl.replace("`" + db + "`.", "");
        return ddl;
    }

    /**
     * 执行数据库ddl语句
     *
     * @param ddl
     */
    private void handleDDL(String ddl) {
        log.info("ddl语句 : {}", ddl);
        try {
            jdbcTemplate.execute(ddl);
        } catch (Exception e) {
            log.error("数据库操作DDL语句失败,", e);
        }
    }

    /**
     * 处理insert,update,delete等DML语句
     *
     * @param key       表主键修改事件结构
     * @param payload   表正文响应
     * @param table     表名
     * @param operation DML操作类型
     */
    private void handleDML(Struct key, Struct payload, String table, Envelope.Operation operation) {
        AbstractDebeziumSqlProvider provider = DebeziumSqlProviderFactory.getProvider(operation);
        if (Objects.isNull(provider)) {
            log.error("没有找到sql处理器提供者.");
            return;
        }

        String sql = provider.getSql(key, payload, table);
        if (StringUtils.isBlank(sql)) {
            log.error("找不到sql.");
            return;
        }

        try {
            log.info("dml语句 : {}", sql);
            namedTemplate.update(sql, provider.getSqlParameterMap());
        } catch (Exception e) {
            log.error("数据库DML操作失败,", e);
        }
    }

    /**
     * 打印消息
     *
     * @param record
     */
    private void logRecord(SourceRecord record) {
        final byte[] payload = valueConverter.fromConnectData("dummy", record.valueSchema(), record.value());
        final byte[] key = keyConverter.fromConnectData("dummy", record.keySchema(), record.key());
        log.info("Publishing Topic --> {}", record.topic());
        log.info("Key --> {}", new String(key));
        log.info("Payload --> {}", new String(payload));
    }

    private void shutdownHook(EmbeddedEngine engine) {
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            log.info("Requesting embedded engine to shut down");
            engine.stop();
        }));
    }

    private void awaitTermination(ExecutorService executor) {
        try {
            while (!executor.awaitTermination(10L, TimeUnit.SECONDS)) {
                log.info("Waiting another 10 seconds for the embedded engine to shut down");
            }
        } catch (InterruptedException e) {
            Thread.interrupted();
        }
    }

}

Парсеров провайдеров и полей таблиц слишком много, поэтому я не буду перечислять их здесь по одному.Как показано на рисунке ниже, поддерживается большинство типов полей MySQL. При необходимости вы можете следить за публичной учетной записью WeChat или электронной почтой и отвечать на комментарии.

image-20190624161745770

Структура тестовой таблицы

CREATE TABLE `demo` (
  `id` int(10) NOT NULL AUTO_INCREMENT,
  `bigint_id` bigint(20) NOT NULL,
  `var_name` varchar(255) NOT NULL,
  `ex_tinyint` tinyint(4) DEFAULT NULL,
  `ex_char` char(255) DEFAULT NULL,
  `ex_json` json DEFAULT NULL COMMENT '水电费',
  `ex_text` text,
  `ex_year` year(4) DEFAULT NULL,
  `ex_time` time DEFAULT NULL,
  `ex_date` date DEFAULT NULL,
  `ex_datetime` datetime DEFAULT NULL,
  `ex_timestamp` timestamp NULL DEFAULT NULL,
  `ex_blob` blob,
  `ex_tinyblob` tinyblob,
  `ex_binary` binary(255) DEFAULT NULL,
  `ex_double` double(10,4) DEFAULT NULL,
  `ex_float` float(10,2) DEFAULT NULL,
  `ex_decimal` decimal(10,2) DEFAULT NULL,
  `ex_numeric` decimal(10,4) DEFAULT NULL,
  `ex_real` double(10,4) DEFAULT NULL,
  `ex_bit` bit(1) DEFAULT NULL,
  `ex_enum` enum('123','@@','22','水电费') DEFAULT '123',
  `ex_set` set('a','b','c','d') DEFAULT NULL,
  `ex_geometry` geometry DEFAULT NULL,
  `ex_point` point DEFAULT NULL,
  `ex_linestring` linestring DEFAULT NULL,
  `ex_polygon` polygon DEFAULT NULL,
  `ex_geometrycollection` geometrycollection DEFAULT NULL,
  `ex_multipoint` multipoint DEFAULT NULL,
  PRIMARY KEY (`id`,`bigint_id`,`var_name`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8;

результат

####события DDL

Видно, что таблица базы данныхbigint_idДлина поля изменена на 21. После прослушивания события: выполняется оператор ddl,inventory_backв библиотекеdemoТаблицаbigint_idДлина поля изменена на 21.

Publishing Topic --> dbserver1
2019-06-24 16:22:21.230  INFO 14995 --- [pool-1-thread-1] c.e.embedded.DebeziumEmbeddedRunner      : Key --> {"schema":{"type":"struct","fields":[{"type":"string","optional":false,"field":"databaseName"}],"optional":false,"name":"io.debezium.connector.mysql.SchemaChangeKey"},"payload":{"databaseName":"inventory"}}
2019-06-24 16:22:21.230  INFO 14995 --- [pool-1-thread-1] c.e.embedded.DebeziumEmbeddedRunner      : Payload --> {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"string","optional":true,"field":"version"},{"type":"string","optional":true,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"server_id"},{"type":"int64","optional":false,"field":"ts_sec"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"boolean","optional":true,"default":false,"field":"snapshot"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"db"},{"type":"string","optional":true,"field":"table"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"databaseName"},{"type":"string","optional":false,"field":"ddl"}],"optional":false,"name":"io.debezium.connector.mysql.SchemaChangeValue"},"payload":{"source":{"version":"0.9.3.Final","connector":"mysql","name":"dbserver1","server_id":223344,"ts_sec":1561364540,"gtid":null,"file":"mysql-bin.000006","pos":22530,"row":0,"snapshot":false,"thread":null,"db":null,"table":null,"query":null},"databaseName":"inventory","ddl":"ALTER TABLE `inventory`.`demo` \nMODIFY COLUMN `bigint_id` bigint(21) NOT NULL AFTER `id`"}}
2019-06-24 16:22:21.230 ERROR 14995 --- [pool-1-thread-1] c.example.embedded.DebeziumRecordUtils   : not find op field.
2019-06-24 16:22:21.231  INFO 14995 --- [pool-1-thread-1] c.e.embedded.DebeziumEmbeddedRunner      : ddl语句 : ALTER TABLE `demo` 
MODIFY COLUMN `bigint_id` bigint(21) NOT NULL AFTER `id`

image-20190624162446146

####событие вставки DML

существуетinventoryв библиотекеdemoПосле добавления новой записи есть следующие записи журнала, вы можете просмотреть оператор вставки темы, ключа, полезной нагрузки и DML. Результат синхронизирует данные дляinventory_backв библиотекеdemo.

2019-06-24 16:27:14.735  INFO 14995 --- [pool-1-thread-1] i.debezium.connector.mysql.BinlogReader  : 1 records sent during previous 00:04:53.506, last recorded offset: {ts_sec=1561364834, file=mysql-bin.000006, pos=23002, row=1, server_id=223344, event=2}
2019-06-24 16:27:14.737  INFO 14995 --- [pool-1-thread-1] c.e.embedded.DebeziumEmbeddedRunner      : Publishing Topic --> dbserver1.inventory.demo
2019-06-24 16:27:14.737  INFO 14995 --- [pool-1-thread-1] c.e.embedded.DebeziumEmbeddedRunner      : Key --> {"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"bigint_id"},{"type":"string","optional":false,"field":"var_name"}],"optional":false,"name":"dbserver1.inventory.demo.Key"},"payload":{"id":2,"bigint_id":1,"var_name":"老王"}}
2019-06-24 16:27:14.738  INFO 14995 --- [pool-1-thread-1] c.e.embedded.DebeziumEmbeddedRunner      : Payload --> {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"bigint_id"},{"type":"string","optional":false,"field":"var_name"},{"type":"int16","optional":true,"field":"ex_tinyint"},{"type":"string","optional":true,"field":"ex_char"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"ex_json"},{"type":"string","optional":true,"field":"ex_text"},{"type":"int32","optional":true,"name":"io.debezium.time.Year","version":1,"field":"ex_year"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTime","version":1,"field":"ex_time"},{"type":"int32","optional":true,"name":"io.debezium.time.Date","version":1,"field":"ex_date"},{"type":"int64","optional":true,"name":"io.debezium.time.Timestamp","version":1,"field":"ex_datetime"},{"type":"string","optional":true,"name":"io.debezium.time.ZonedTimestamp","version":1,"field":"ex_timestamp"},{"type":"bytes","optional":true,"field":"ex_blob"},{"type":"bytes","optional":true,"field":"ex_tinyblob"},{"type":"bytes","optional":true,"field":"ex_binary"},{"type":"double","optional":true,"field":"ex_double"},{"type":"double","optional":true,"field":"ex_float"},{"type":"bytes","optional":true,"name":"org.apache.kafka.connect.data.Decimal","version":1,"parameters":{"scale":"2","connect.decimal.precision":"10"},"field":"ex_decimal"},{"type":"bytes","optional":true,"name":"org.apache.kafka.connect.data.Decimal","version":1,"parameters":{"scale":"4","connect.decimal.precision":"10"},"field":"ex_numeric"},{"type":"double","optional":true,"field":"ex_real"},{"type":"boolean","optional":true,"field":"ex_bit"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"123,22"},"default":"123","field":"ex_enum"},{"type":"string","optional":true,"name":"io.debezium.data.EnumSet","version":1,"parameters":{"allowed":"a,b,c,d"},"field":"ex_set"},{"type":"struct","fields":[{"type":"bytes","optional":false,"field":"wkb"},{"type":"int32","optional":true,"field":"srid"}],"optional":true,"name":"io.debezium.data.geometry.Geometry","version":1,"doc":"Geometry","field":"ex_geometry"},{"type":"struct","fields":[{"type":"double","optional":false,"field":"x"},{"type":"double","optional":false,"field":"y"},{"type":"bytes","optional":true,"field":"wkb"},{"type":"int32","optional":true,"field":"srid"}],"optional":true,"name":"io.debezium.data.geometry.Point","version":1,"doc":"Geometry (POINT)","field":"ex_point"},{"type":"struct","fields":[{"type":"bytes","optional":false,"field":"wkb"},{"type":"int32","optional":true,"field":"srid"}],"optional":true,"name":"io.debezium.data.geometry.Geometry","version":1,"doc":"Geometry","field":"ex_linestring"},{"type":"struct","fields":[{"type":"bytes","optional":false,"field":"wkb"},{"type":"int32","optional":true,"field":"srid"}],"optional":true,"name":"io.debezium.data.geometry.Geometry","version":1,"doc":"Geometry","field":"ex_polygon"},{"type":"struct","fields":[{"type":"bytes","optional":false,"field":"wkb"},{"type":"int32","optional":true,"field":"srid"}],"optional":true,"name":"io.debezium.data.geometry.Geometry","version":1,"doc":"Geometry","field":"ex_geometrycollection"},{"type":"struct","fields":[{"type":"bytes","optional":false,"field":"wkb"},{"type":"int32","optional":true,"field":"srid"}],"optional":true,"name":"io.debezium.data.geometry.Geometry","version":1,"doc":"Geometry","field":"ex_multipoint"}],"optional":true,"name":"dbserver1.inventory.demo.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"bigint_id"},{"type":"string","optional":false,"field":"var_name"},{"type":"int16","optional":true,"field":"ex_tinyint"},{"type":"string","optional":true,"field":"ex_char"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"ex_json"},{"type":"string","optional":true,"field":"ex_text"},{"type":"int32","optional":true,"name":"io.debezium.time.Year","version":1,"field":"ex_year"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTime","version":1,"field":"ex_time"},{"type":"int32","optional":true,"name":"io.debezium.time.Date","version":1,"field":"ex_date"},{"type":"int64","optional":true,"name":"io.debezium.time.Timestamp","version":1,"field":"ex_datetime"},{"type":"string","optional":true,"name":"io.debezium.time.ZonedTimestamp","version":1,"field":"ex_timestamp"},{"type":"bytes","optional":true,"field":"ex_blob"},{"type":"bytes","optional":true,"field":"ex_tinyblob"},{"type":"bytes","optional":true,"field":"ex_binary"},{"type":"double","optional":true,"field":"ex_double"},{"type":"double","optional":true,"field":"ex_float"},{"type":"bytes","optional":true,"name":"org.apache.kafka.connect.data.Decimal","version":1,"parameters":{"scale":"2","connect.decimal.precision":"10"},"field":"ex_decimal"},{"type":"bytes","optional":true,"name":"org.apache.kafka.connect.data.Decimal","version":1,"parameters":{"scale":"4","connect.decimal.precision":"10"},"field":"ex_numeric"},{"type":"double","optional":true,"field":"ex_real"},{"type":"boolean","optional":true,"field":"ex_bit"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"123,22"},"default":"123","field":"ex_enum"},{"type":"string","optional":true,"name":"io.debezium.data.EnumSet","version":1,"parameters":{"allowed":"a,b,c,d"},"field":"ex_set"},{"type":"struct","fields":[{"type":"bytes","optional":false,"field":"wkb"},{"type":"int32","optional":true,"field":"srid"}],"optional":true,"name":"io.debezium.data.geometry.Geometry","version":1,"doc":"Geometry","field":"ex_geometry"},{"type":"struct","fields":[{"type":"double","optional":false,"field":"x"},{"type":"double","optional":false,"field":"y"},{"type":"bytes","optional":true,"field":"wkb"},{"type":"int32","optional":true,"field":"srid"}],"optional":true,"name":"io.debezium.data.geometry.Point","version":1,"doc":"Geometry (POINT)","field":"ex_point"},{"type":"struct","fields":[{"type":"bytes","optional":false,"field":"wkb"},{"type":"int32","optional":true,"field":"srid"}],"optional":true,"name":"io.debezium.data.geometry.Geometry","version":1,"doc":"Geometry","field":"ex_linestring"},{"type":"struct","fields":[{"type":"bytes","optional":false,"field":"wkb"},{"type":"int32","optional":true,"field":"srid"}],"optional":true,"name":"io.debezium.data.geometry.Geometry","version":1,"doc":"Geometry","field":"ex_polygon"},{"type":"struct","fields":[{"type":"bytes","optional":false,"field":"wkb"},{"type":"int32","optional":true,"field":"srid"}],"optional":true,"name":"io.debezium.data.geometry.Geometry","version":1,"doc":"Geometry","field":"ex_geometrycollection"},{"type":"struct","fields":[{"type":"bytes","optional":false,"field":"wkb"},{"type":"int32","optional":true,"field":"srid"}],"optional":true,"name":"io.debezium.data.geometry.Geometry","version":1,"doc":"Geometry","field":"ex_multipoint"}],"optional":true,"name":"dbserver1.inventory.demo.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":true,"field":"version"},{"type":"string","optional":true,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"server_id"},{"type":"int64","optional":false,"field":"ts_sec"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"boolean","optional":true,"default":false,"field":"snapshot"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"db"},{"type":"string","optional":true,"field":"table"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"}],"optional":false,"name":"dbserver1.inventory.demo.Envelope"},"payload":{"before":null,"after":{"id":2,"bigint_id":1,"var_name":"老王","ex_tinyint":1,"ex_char":"a","ex_json":"{\"abc\":123}","ex_text":"ert","ex_year":2019,"ex_time":59224000000,"ex_date":null,"ex_datetime":null,"ex_timestamp":null,"ex_blob":null,"ex_tinyblob":null,"ex_binary":null,"ex_double":null,"ex_float":null,"ex_decimal":null,"ex_numeric":null,"ex_real":null,"ex_bit":null,"ex_enum":"123","ex_set":null,"ex_geometry":null,"ex_point":null,"ex_linestring":null,"ex_polygon":null,"ex_geometrycollection":null,"ex_multipoint":null},"source":{"version":"0.9.3.Final","connector":"mysql","name":"dbserver1","server_id":223344,"ts_sec":1561364834,"gtid":null,"file":"mysql-bin.000006","pos":23194,"row":0,"snapshot":false,"thread":9,"db":"inventory","table":"demo","query":null},"op":"c","ts_ms":1561364834477}}
2019-06-24 16:27:14.738  INFO 14995 --- [pool-1-thread-1] c.e.embedded.DebeziumEmbeddedRunner      : dml语句 : insert into demo values (:id,:bigint_id,:var_name,:ex_tinyint,:ex_char,:ex_json,:ex_text,:ex_year,:ex_time,:ex_date,:ex_datetime,:ex_timestamp,:ex_blob,:ex_tinyblob,:ex_binary,:ex_double,:ex_float,:ex_decimal,:ex_numeric,:ex_real,:ex_bit,:ex_enum,:ex_set,:ex_geometry,:ex_point,:ex_linestring,:ex_polygon,:ex_geometrycollection,:ex_multipoint) 

image-20190624162949375

Событие обновления DML

существуетinventoryв библиотекеdemoПосле изменения записи только что добавлена ​​следующие записи журнала, вы можете просматривать тему, ключ, полезную нагрузку и сначала удалить, а затем вставить операторы. Результат синхронизирует данные сinventory_backв библиотекеdemo.

image-20190624163323988

image-20190624163443342

Событие удаления DML

существуетinventoryв библиотекеdemoПосле изменения вновь измененной записи и ее удаления можно найти следующие записи журнала, а также просмотреть тему, ключ, полезные данные и первый оператор удаления. Результат синхронизирует данные сinventory_backв библиотекеdemoудали это.这里有2个事件,第二条事件是一种标致,这里不处理。

бревно:

image-20190624163653205

image-20190624163939806

Ссылаться на

официальный сайт дебезиума

Tutorial

Embedding Debezium

Добро пожаловать

Обратите внимание, чтобы знать последние динамические обновления

微信公众号