Инструмент синхронизации данных - канал

Java

предисловие

Около двух лет назад я столкнулся с проблемой синхронизации данных в проекте.

В то время в системе были развернуты десятки экземпляров, которые были разделены на одну центральную платформу и N подцентровых платформ, и каждой системе соответствовал отдельный экземпляр базы данных.

На уровне базы данных есть такое требование:

  • База данных центральной платформы должна содержать данные всех системных платформ.
  • База данных вспомогательного центра содержит только данные этой системной платформы.
  • Может быть добавлен или изменен на центральной платформеМинутаДанные центральной платформы, но данные должны быть синхронизированы с соответствующей базой данных платформы вспомогательного центра в режиме реального времени.

Между этими десятками экземпляров базы данных нет четких отношений «главный-подчиненный», и необходимость синхронизации зависит от источника данных, поэтому синхронизацию «главный-подчиненный» MySQL использовать нельзя.

В то время автор экспериментировал с несколькими методами, и окончательный метод был основан на методе механизма перехватчика Mybatis + очередь сообщений.

Общий принцип заключается в перехвате транзакционных операций, таких как добавление, изменение и удаление, с помощью перехватчика Mybatis, инкапсуляции их в объект в соответствии с первичным ключом настраиваемых данных (идентификация источника и назначения данных) и доставке их в тема, соответствующая очереди сообщений. Затем каждая система слушает разные темы, использует данные и синхронизирует их с базой данных.

В следующий период времени я узнал о канале, компоненте с открытым исходным кодом. Оказалось, что это более просто, он может анализировать данные из бинарного журнала MySQL, отправлять их в очередь сообщений или в другое место.

1. Знакомство с каналом

Говоря о канале, это также бизнес-требование Alibaba для синхронизации данных. Поэтому с 2010 года компании, базирующиеся на Alibaba, постепенно пытались анализировать журналы на основе базы данных, чтобы получать дополнительные изменения для синхронизации, что привело к увеличению бизнеса по подписке и потреблению.

Службы, поддерживаемые инкрементальной подпиской и потреблением журналов:

  • Зеркальное отображение базы данных
  • Резервное копирование базы данных в режиме реального времени
  • Многоуровневый индекс (у продавцов и покупателей есть собственный индекс подбазы данных)
  • search build
  • Обновление бизнес-кэша
  • Важные деловые новости, такие как изменение цен

Мы можем выполнить ряд услуг, таких как синхронизация данных и обновление кеша, на основе механизма канала.

2. Стартовый канал

1, измените конфигурацию MySQL

Для самостоятельно созданной службы MySQL вам необходимо сначала включить функцию записи Binlog и настроить формат binlog в режим ROW.Конфигурация в my.cnf выглядит следующим образом:

[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

Затем создайте учетную запись для подключения к MySQL в качестве ведомого устройства MySQL.

CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;

2. Скачать

Скачать канал очень просто, посетитестраница релизовВыберите необходимый пакет для загрузки, а затем разархивируйте загруженный пакет в указанный каталог.

tar -zxvf canal.deployer-1.1.4.tar.gz -C /canal

После завершения добычи мы можем увидеть такое каталог:

3, изменить конфигурацию

Перед запуском вам также необходимо изменить некоторую информацию о конфигурации.

Во-первых, найдитеcanal/conf/example,редактироватьinstance.propertiesВ файле конфигурации есть несколько ключевых моментов:

canal.instance.mysql.slaveId=1234               # canal模拟slaveid
canal.instance.master.address=127.0.0.1:3306    # MySQL数据库地址
canal.instance.dbUsername=canal                 # 作为slave角色的账户
canal.instance.dbPassword=canal                 # 作为slave角色的账户密码
canal.instance.connectionCharset = UTF-8        # 数据库编码方式对应Java中的编码类型
canal.instance.filter.regex=.*\\..*             # 表过滤的表达式
canal.mq.topic=example                          # MQ 主题名称

Данные, которые мы хотим, чтобы канал прослушивал и отправлял в очередь сообщений, также необходимо изменить.canal.propertiesфайл, здесь в основном конфигурация MQ. Здесь я использую версию RocketMQ для Alibaba Cloud со следующими параметрами:

# 配置ak/sk
canal.aliyun.accessKey = XXX
canal.aliyun.secretKey = XXX
# 配置topic
canal.mq.accessChannel = cloud
canal.mq.servers = 内网接入点
canal.mq.producerGroup = GID_**group(在后台创建)
canal.mq.namespace = rocketmq实例id
canal.mq.topic=(在后台创建)

4, начало

Просто запустите сценарий запуска напрямую, чтобы запустить:./canal/bin/startup.sh. затем откройтеlogs/canal/canal.logфайл, вы можете увидеть эффект запуска.

2020-02-26 21:12:36.715 [main] INFO  com.alibaba.otter.canal.deployer.CanalStarter - ## start the canal server.
2020-02-26 21:12:36.746 [main] INFO  com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[192.168.44.128(192.168.44.128):11111]
2020-02-26 21:12:37.406 [main] INFO  com.alibaba.otter.canal.deployer.CanalStarter - ## the canal server is running now ......

3. Запустите мониторинг MQ

Мы помещаем данные, отслеживаемые каналом, в очередь сообщений, затем следующим шагом является написание слушателя для использования данных.

Для удобства автор напрямую использует Ali Cloud версии Rocketmq, тестовый код выглядит следующим образом:

public static void main(String[] args) {
	Properties properties = new Properties();
	// 您在控制台创建的 Group ID
	properties.put(PropertyKeyConst.GROUP_ID, "GID_CANAL");
	// AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建
	properties.put(PropertyKeyConst.AccessKey, "accessKey");
	// SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建
	properties.put(PropertyKeyConst.SecretKey, "secretKey");
	// 设置 TCP 接入域名,到控制台的实例基本信息中查看
	properties.put(PropertyKeyConst.NAMESRV_ADDR,"http://MQ_INST_xxx.mq-internet.aliyuncs.com:80");
	// 集群订阅方式(默认)
	// properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING);
	Consumer consumer = ONSFactory.createConsumer(properties);
	consumer.subscribe("example","*",new CanalListener());
	consumer.start();
	logger.info("Consumer Started");
}

В-четвертых, тест

После развертывания среды мы переходим к этапу тестирования, чтобы увидеть реальный эффект.

Мы беремt_accountНапример, в таблицу записывается идентификатор учетной записи и баланс учетной записи.

Сначала мы добавляем новую запись,insert into t_account (id,user_id,amount) values (4,4,200);

На данный момент данные, потребляемые MQ, следующие:

{
	"data": [{
		"id": "4",
		"user_id": "4",
		"amount": "200.0"
	}],
	"database": "seata",
	"es": 1582723607000,
	"id": 2,
	"isDdl": false,
	"mysqlType": {
		"id": "int(11)",
		"user_id": "varchar(255)",
		"amount": "double(14,2)"
	},
	"old": null,
	"pkNames": ["id"],
	"sql": "",
	"sqlType": {
		"id": 4,
		"user_id": 12,
		"amount": 8
	},
	"table": "t_account",
	"ts": 1582723607656,
	"type": "INSERT"
}

Из данных видно, что имя базы данных, имя таблицы, поля таблицы и содержание новых данных записываются подробно.

Затем мы также можем изменить эти данные:update t_account set amount = 150 where id = 4;

На данный момент данные, потребляемые MQ, следующие:

{
	"data": [{
		"id": "4",
		"user_id": "4",
		"amount": "150.0"
	}],
	"database": "seata",
	"es": 1582724016000,
	"id": 3,
	"isDdl": false,
	"mysqlType": {
		"id": "int(11)",
		"user_id": "varchar(255)",
		"amount": "double(14,2)"
	},
	"old": [{
		"amount": "200.0"
	}],
	"pkNames": ["id"],
	"sql": "",
	"sqlType": {
		"id": 4,
		"user_id": 12,
		"amount": 8
	},
	"table": "t_account",
	"ts": 1582724016353,
	"type": "UPDATE"
}

Видно, что помимо измененного контента, канал также используетoldПоле записывает значение поля перед модификацией.

Наконец, мы удаляем эти данные:delete from t_account where id = 4;

Соответственно, данные, потребляемые MQ, выглядят следующим образом:

{
	"data": [{
		"id": "4",
		"user_id": "4",
		"amount": "150.0"
	}],
	"database": "seata",
	"es": 1582724155000,
	"id": 4,
	"isDdl": false,
	"mysqlType": {
		"id": "int(11)",
		"user_id": "varchar(255)",
		"amount": "double(14,2)"
	},
	"old": null,
	"pkNames": ["id"],
	"sql": "",
	"sqlType": {
		"id": 4,
		"user_id": 12,
		"amount": 8
	},
	"table": "t_account",
	"ts": 1582724155370,
	"type": "DELETE"
}

После отслеживания изменений в таблице базы данных вы можете обрабатывать данные в соответствии со своими бизнес-сценариями.

V. Резюме

Использование компонента канала может легко отслеживать завершение изменений данных. Если вы используете синхронизацию данных очереди сообщений, нужно лишь мало что нужно уделять особое внимание этому вопросам последовательности сообщений.

Сам бинлог упорядочен, а вот как обеспечить порядок после записи в mq - вопрос.

существуетпроблема последовательности mqЗдесь вы можете увидеть решения, связанные с порядком потребления канала.