Учебная комната Cabbage Java охватывает основные знания
1. Знакомство с каналом
Канал в переводе означает водный путь/труба/канав, основное назначение которого основано наИнкрементный анализ журнала базы данных MySQL,поставкаПодписка на добавочные данные и потребление.
Как работает канал:
- canal имитирует интерактивный протокол подчиненного устройства mysql, притворяется подчиненным устройством mysql и отправляет протокол дампа ведущему устройству mysql.
- Мастер mysql получает запрос на дамп и начинает передавать двоичный журнал подчиненному (то есть каналу)
- канал анализирует двоичные объекты журнала (исходный как поток байтов)
2. Быстро установите MySQL 5.7 с помощью Docker
- Вытяните официальное изображение mysql: 5.7:
docker pull mysql:5.7
- Создайте каталоги конфигурации и сопоставления данных mysql (каталоги могут быть любыми):
cd /docker/mysqld/data
cd /docker/mysqld/conf
- Создайте файл конфигурации config-file.cnf в каталоге /docker/mysqld/conf:
Содержимое config-file.cnf выглядит следующим образом, откройте binlog mysql:
[mysqld]
log-bin=mysql-bin
binlog-format=ROW
server_id=1
character-set-server=utf8mb4
collation-server=utf8mb4_unicode_ci
- Создайте и запустите контейнер, обратите внимание на начальный пароль и сопоставление каталогов:
docker run --name mysql_5.7 -v /docker/mysqld/data:/var/lib/mysql -v /docker/mysqld/conf:/etc/mysql/conf.d -p 3306:3306 -e MYSQL_ROOT_PASSWORD=88021120 -d mysql:5.7
- Введите mysql и проверьте статус открытия binlog.:
SHOW VARIABLES LIKE '%log_bin%';
- Перейдите в каталог /docker/mysqld/data, чтобы убедиться, что binlog уже существует.:
- Создайте новую учетную запись канала и разрешите операции для подготовки к последующим действиям.:
-- 新建用户
CREATE USER canal IDENTIFIED BY 'canal';
-- 授权操作
GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
-- 刷新权限
FLUSH PRIVILEGES;
3. Быстро установите Redis 6.2 с помощью Docker.
- Вытащите официальное изображение redis:6.2:
docker pull redis:6.2
- Создайте каталоги конфигурации Redis и сопоставления данных (каталоги могут быть по вашему вкусу):
cd /docker/redis/data
cd /docker/redis/conf
- Создайте файл конфигурации redis.conf в каталоге /docker/redis/conf.:
redis.conf можно скачать с официального сайта:download.Redis.IO/Redis - Сказал, что хотел бы...
Обратите внимание на изменение двух конфигураций, иначе удаленное соединение не сможет быть установлено.:
# 注释掉这个配置
# bind 127.0.0.1 -::1
# 保护模式‘yes’改为‘no’
protected-mode no
- Создайте и запустите контейнер, обратите внимание на включение сохранения и сопоставления каталогов.:
docker run --name redis_6.2 -v /docker/redis/data:/data -v /docker/redis/conf:/usr/local/etc/redis -p 6379:6379 -d redis:6.2 redis-server /usr/local/etc/redis/redis.conf --appendonly yes
4. Установите Canal-Admin и Canal-Deployer 1.1.4.
- Перейдите на официальный сайт, чтобы загрузить версию 1.1.4 canal.admin-1.1.4.tar.gz и canal.deployer-1.1.4.tar.gz.:
Адрес загрузки официального сайта канала:GitHub.com/Alibaba/Misery…
- Разархивируйте и загрузите в любой каталог:
- Перейдите в каталог /root/canal-admin/conf и импортируйте canal_manager.sql в MySQL.:
- Измените конфигурацию application.yml в каталоге /root/canal-admin/conf.:
server:
port: 8089
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
spring.datasource:
address: 127.0.0.1:3306
database: canal_manager
username: canal
password: canal
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false
hikari:
maximum-pool-size: 30
minimum-idle: 1
canal:
adminUser: admin
adminPasswd: 123456
- Войдите в каталог /root/canal-admin/bin и запустите canal-admin:
sh /root/canal-admin/bin/startup.sh
Не забудьте открыть порты брандмауэра:
firewall-cmd --zone=public --add-port=8089/tcp --permanent
firewall-cmd --reload
- Войдите в консоль canal-admin, логин admin, пароль 123456:
http://(安装机器的IP):8089/
Теперь давайте установим canal-deployer, ресурсы виртуальной машины ограничены, мы используем метод развертывания на одной машине.
- Войдите в каталог /root/canal-deployer/conf и измените конфигурацию canal_local.properties.:
# register ip
canal.register.ip = (远程连接用的IP)
# canal admin config
canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
# admin auto register
canal.admin.register.auto = true
canal.admin.register.cluster =
- Перейдите в каталог /root/canal-deployer/conf и скопируйте папку примера в экземпляр.:
- Перейдите в каталог /root/canal-deployer/bin и запустите canal-deployer с локальной конфигурацией.:
sh /root/canal-deployer/bin/startup.sh local
Не забудьте открыть порты брандмауэра:
firewall-cmd --zone=public --add-port=11110-11112/tcp --permanent
firewall-cmd --reload
- Войдите в консоль canal-admin, если ваша конфигурация правильная, автозапуск появится в списке серверов:
- Создайте новый экземпляр в управлении экземплярами, обратите внимание, что имя должно соответствовать имени только что скопированной папки:
- Загрузите шаблон конфигурации, измените конфигурацию:
#################################################
## mysql serverId , v1.0.26+ will autoGen
# canal.instance.mysql.slaveId=0
# enable gtid use true/false
canal.instance.gtidon=false
# position info
canal.instance.master.address=127.0.0.1:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=
# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=
# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=
# username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==
# table regex
canal.instance.filter.regex=.*\\..*
# table black regex
canal.instance.filter.black.regex=
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch
# mq config
canal.mq.topic=example
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#################################################
- Через несколько секунд, если ваша конфигурация верна, автозапуск автоматически появится в списке экземпляров.:
Нажмите на детали, чтобы просмотреть журнал следующим образом:
2021-03-31 18:03:01.644 [canal-instance-scan-0] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]
2021-03-31 18:03:01.649 [canal-instance-scan-0] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [instance/instance.properties]
2021-03-31 18:03:01.675 [canal-instance-scan-0] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]
2021-03-31 18:03:01.676 [canal-instance-scan-0] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [instance/instance.properties]
2021-03-31 18:03:01.707 [canal-instance-scan-0] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-instance
2021-03-31 18:03:01.708 [canal-instance-scan-0] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^.*\..*$
2021-03-31 18:03:01.708 [canal-instance-scan-0] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table black filter :
2021-03-31 18:03:01.721 [canal-instance-scan-0] INFO c.a.otter.canal.instance.core.AbstractCanalInstance - start successful....
5. Используйте Canal-Client для синхронизации данных в проекте Spring.
- Здесь каждый сам соберет простейший проект SpringBoot, для работы требуется уметь подключать MySQL и Redis, достаточно построить несколько относительно простых таблиц.Автор ниже может дать несколько простых эталонных конфигураций для простой сборки проекта SpingBoot+MybatisPlus+Redis.:
Зависимости файла pom.xml:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.0.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.yly</groupId>
<artifactId>region</artifactId>
<version>1.0.0</version>
<name>region</name>
<description></description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.3.2</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>1.1.10</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.38</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.58</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.10</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.5.0</version>
</dependency>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.4</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
Конфигурация файла application.yml:
server:
port: 8080
tomcat:
uri-encoding: UTF-8
spring:
redis:
# Redis数据库索引,默认为0
database: 0
# Redis端口
port: 6379
# Redis服务器主机
host: 192.168.138.128
lettuce:
pool:
# 连接池最大连接数
max-active: 8
# 连接池最大空闲
max-idle: 8
# 连接池最小空闲
min-idle: 2
# 连接池最大阻塞等待时间
max-wait: 1ms
# 超时时间
shutdown-timeout: 100ms
datasource:
name: yly_region
# 基本属性
url: jdbc:mysql://192.168.138.128:3306/yly_region?useSSL=false&useUnicode=true&characterEncoding=utf8&serverTimezone=Asia/Shanghai&autoReconnect=true&rewriteBatchedStatements=true&allowMultiQueries=true
username: root
password: 88021120
type: com.alibaba.druid.pool.DruidDataSource
# druid相关配置
druid:
# 监控统计拦截的filters
filters: stat
# 配置初始化大小/最小/最大
initial-size: 10
min-idle: 10
max-active: 100
# 获取连接等待超时时间
max-wait: 60000
# 间隔多久进行一次检测,检测需要关闭的空闲连接
time-between-eviction-runs-millis: 60000
# 一个连接在池中最小生存的时间
min-evictable-idle-time-millis: 300000
validation-query: SELECT 'x'
test-while-idle: true
test-on-borrow: false
test-on-return: false
# 打开PSCache,并指定每个连接上PSCache的大小。oracle设为true,mysql设为false。分库分表较多推荐设置为false
pool-prepared-statements: false
max-pool-prepared-statement-per-connection-size: 20
# WebStatFilter配置
web-stat-filter:
enabled: true
url-pattern: /*
exclusions: /druid/*,*.js,*.gif,*.jpg,*.bmp,*.png,*.css,*.ico
session-stat-enable: true
session-stat-max-count: 100
# StatViewServlet配置
stat-view-servlet:
enabled: true
url-pattern: /druid/*
reset-enable: true
login-username: caijiaqi
login-password: 88021120
jackson:
date-format: yyyy-MM-dd HH:mm:ss
mybatis-plus:
mapper-locations: classpath:/mapper/*Mapper.xml
configuration:
log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
Конфигурация файла MybatisPlusConfig.java:
@Configuration
@EnableTransactionManagement
@MapperScan("com.yly.region.mapper")
public class MybatisPlusConfig {
@Bean
public PaginationInterceptor paginationInterceptor() {
PaginationInterceptor paginationInterceptor = new PaginationInterceptor();
// 设置请求的页面大于最大页后操作, true调回到首页,false继续请求
paginationInterceptor.setOverflow(false);
// 设置最大单页限制数量,默认 500 条,-1 不受限制
paginationInterceptor.setLimit(500);
// 开启 count 的 join 优化,只针对部分 left join
paginationInterceptor.setCountSqlParser(new JsqlParserCountOptimize(true));
return paginationInterceptor;
}
}
Конфигурация файла RedisRepositoryConfig.java:
@Configuration
@EnableRedisRepositories
public class RedisRepositoryConfig {
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory connectionFactory) {
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(connectionFactory);
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
redisTemplate.setKeySerializer(stringRedisSerializer);
redisTemplate.setHashKeySerializer(stringRedisSerializer);
Jackson2JsonRedisSerializer<?> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(Object.class);
redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);
redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);
redisTemplate.afterPropertiesSet();
return redisTemplate;
}
}
- Зависимости указаны в файле pom, который включен в приведенный выше пример.:
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.4</version>
</dependency>
- Создайте новый клиент канала и полагайтесь на ApplicationRunner для запуска задачи синхронизации потока демона после запуска контейнера Spring (обратите внимание, что класс в пакете канала выбирается при импорте).:
@Slf4j
@Component
public class CanalClient implements ApplicationRunner {
@Resource
private RedisTemplate<String, String> redisTemplate;
private static final String TABLE_NAME = "yly_rule";
private static final String PRIMARY_KEY = "id";
private static final String SEPARATOR = ":";
private static final String CANAL_SERVER_HOST = "192.168.138.131";
private static final int CANAL_SERVER_PORT = 11111;
private static final String CANAL_INSTANCE = "instance";
private static final String USERNAME = "canal";
private static final String PASSWORD = "canal";
@Override
public void run(ApplicationArguments args) throws Exception {
this.initCanal();
}
public void initCanal() {
// 创建链接
CanalConnector connector = CanalConnectors.newSingleConnector(
new InetSocketAddress(CANAL_SERVER_HOST, CANAL_SERVER_PORT),
CANAL_INSTANCE, USERNAME, PASSWORD);
int batchSize = 1000;
try {
log.info("启动 canal 数据同步...");
connector.connect();
connector.subscribe(".*\\..*");
connector.rollback();
while (true) {
// 获取指定数量的数据
Message message = connector.getWithoutAck(batchSize);
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
try {
// 时间间隔1000毫秒
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
syncEntry(message.getEntries());
}
connector.ack(batchId); // 提交确认
// connector.rollback(batchId); // 处理失败, 回滚数据
}
} finally {
connector.disconnect();
}
}
private void syncEntry(List<CanalEntry.Entry> entrys) {
for (CanalEntry.Entry entry : entrys) {
if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN
|| entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
continue;
}
CanalEntry.RowChange rowChange;
try {
rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR data:" + entry.toString(), e);
}
CanalEntry.EventType eventType = rowChange.getEventType();
log.info("================> binlog[{}:{}] , name[{},{}] , eventType : {}",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
eventType);
String tableName = entry.getHeader().getTableName();
if (!TABLE_NAME.equalsIgnoreCase(tableName)) continue;
for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
if (eventType == CanalEntry.EventType.INSERT) {
printColumn(rowData.getAfterColumnsList());
redisInsert(tableName, rowData.getAfterColumnsList());
} else if (eventType == CanalEntry.EventType.UPDATE) {
printColumn(rowData.getAfterColumnsList());
redisUpdate(tableName, rowData.getAfterColumnsList());
} else if (eventType == CanalEntry.EventType.DELETE) {
printColumn(rowData.getBeforeColumnsList());
redisDelete(tableName, rowData.getBeforeColumnsList());
}
}
}
}
private void redisInsert(String tableName, List<CanalEntry.Column> columns) {
JSONObject json = new JSONObject();
for (CanalEntry.Column column : columns) {
json.put(column.getName(), column.getValue());
}
for (CanalEntry.Column column : columns) {
if (PRIMARY_KEY.equalsIgnoreCase(column.getName())) {
String key = tableName + SEPARATOR + column.getValue();
redisTemplate.opsForValue().set(key, json.toJSONString());
log.info("redis数据同步新增,key:" + key);
break;
}
}
}
private void redisUpdate(String tableName, List<CanalEntry.Column> columns) {
JSONObject json = new JSONObject();
for (CanalEntry.Column column : columns) {
json.put(column.getName(), column.getValue());
}
for (CanalEntry.Column column : columns) {
if (PRIMARY_KEY.equalsIgnoreCase(column.getName())) {
String key = tableName + SEPARATOR + column.getValue();
redisTemplate.opsForValue().set(key, json.toJSONString());
log.info("redis数据同步更新,key:" + key);
break;
}
}
}
private void redisDelete(String tableName, List<CanalEntry.Column> columns) {
for (CanalEntry.Column column : columns) {
if (PRIMARY_KEY.equalsIgnoreCase(column.getName())) {
String key = tableName + SEPARATOR + column.getValue();
redisTemplate.delete(key);
log.info("redis数据同步删除,key:" + key);
break;
}
}
}
private void printColumn(List<CanalEntry.Column> columns) {
for (CanalEntry.Column column : columns) {
log.info(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
}
}
}
- Создайте случайную таблицу для тестирования в соответствии с личными предпочтениями, автор использует таблицу yly_rule:
тест ВСТАВИТЬ:
Теперь давайте добавим новый фрагмент данных:
INSERT INTO `yly_rule`(`id`, `create_time`, `name`, `store_type`, `kilometers`)
VALUES (4, '2021-03-31 18:36:32', '玩具', 4, 10);
Видим выходной лог в проекте:
Перейдем в Redis для просмотра данных, данные были вставлены синхронно:
Тестовое ОБНОВЛЕНИЕ:
Теперь давайте изменим часть данных:
UPDATE `yly_rule` SET `kilometers` = 8 WHERE `id` = 4;
Видим выходной лог в проекте:
Перейдем к Redis для просмотра данных, и данные синхронно обновились:
тест УДАЛИТЬ:
Теперь давайте удалим часть данных:
DELETE FROM `yly_region`.`yly_rule` WHERE `id` = 4;
Видим выходной лог в проекте:
Заходим в Redis для просмотра данных, его уже нет.
6. Ретроспектива и размышление
Преимущество Canal в том, что нет вторжения в бизнес-код, поскольку он основан на мониторинге журналов binlog для синхронизации данных. Производительность в реальном времени также может быть квазиреальной, что является относительно распространенной схемой синхронизации данных для многих предприятий.
Вышеупомянутое является просто тестовым случаем.Canal синхронизирует бинлог MySQL в соответствии с шагом смещения.Правила маршрутизации можно настроить для каждого экземпляра, синхронизировать можно только часть контента.Бизнес-код также можно модифицировать сам по себе, не только для Redis, но и для других носителей данных, не только синхронизировать одни и те же данные, вы можете настроить структуру модели данных для преобразования.
В приведенном выше коде мы получаем прототип объекта данных синхронизации binlog и кратко представляем формат:
EntryProtocol.proto
Entry
Header
logfileName [binlog文件名]
logfileOffset [binlog position]
executeTime [binlog里记录变更发生的时间戳,精确到秒]
schemaName
tableName
eventType [insert/update/delete类型]
entryType [事务头BEGIN/事务尾END/数据ROWDATA]
storeValue [byte数据,可展开,对应的类型为RowChange]
RowChange
isDdl [是否是ddl变更操作,比如create table/drop table]
sql [具体的ddl sql]
rowDatas [具体insert/update/delete的变更数据,可为多条,1个binlog event事件可对应多条变更,比如批处理]
beforeColumns [Column类型的数组,变更前的数据字段]
afterColumns [Column类型的数组,变更后的数据字段]
Column
index
sqlType [jdbc type]
name [column name]
isKey [是否为主键]
updated [是否发生过变更]
isNull [值是否为null]
value [具体的内容,注意为string文本]
Можно видеть, что согласно DDL (язык определения данных) язык определения данных, DML (язык манипулирования данными) язык манипулирования данными Можно различать различные операции,Операции DML также могут получать определенные имена баз данных, имена таблиц, имена полей и различную подробную информацию о данных до и после изменения данных.Мы можем выборочно синхронизировать данные в сочетании с бизнесом..
Конечно, это только введение.В реальных проектах обычно настраивается режим MQ.С RocketMQ или Kafka Canal будет отправлять данные в топик MQ, а затем обрабатывать их через потребителей очереди сообщений. Развертывание Canal также поддерживает кластеры и требует взаимодействия с ZooKeeper для управления кластерами.