карта разума
Эта статья была размещена на сайте личного блога (I love station B): me.lovebilibili.com
предисловие
Все мы знаем, что самое главное в системе — это данные, а данные хранятся в базе данных. Но во многих случаях он хранится не только в базе данных, но и синхронно хранится в Elastic Search, HBase, Redis и т. д.
В это время я заметил, что фреймворк Али с открытым исходным кодомCanal, он легко можетСинхронизируйте добавочные данные базы данных с другими приложениями хранения. Поэтому я подытожу его здесь и поделюсь с читателями для справки~
1. Что такое канал
Давайте сначала посмотрим на введение официального сайта
канал, переводится как водный путь/труба/канав, основное назначение которого основано наИнкрементный анализ журнала базы данных MySQL,поставкаПодписка на добавочные данные и потребление.
Во введении есть несколько ключевых слов:Инкрементный журнал, инкрементная подписка на данные и потребление.
Здесь мы можем просто понимать Канал какИнструмент для синхронизации инкрементных данных.
Далее давайте посмотрим на принципиальную схему, предоставленную официальным сайтом:
Как работает каналЗамаскируйте себя под ведомое устройство MySQL, смоделируйте интерактивный протокол ведомого устройства MySQL и отправьте протокол дампа в MySQL Mater, MySQL Mater получает запрос дампа, отправленный каналом, начинает передавать двоичный журнал в канал, а затем канал анализирует двоичный журнал и отправляет его в место хранения, такие как mysql, кафка, эластичный поиск и т. Д.
2. Что может сделать канал
Следующие ссылкиофициальный сайт канала.
Вместо того, чтобы спрашивать, что может сделать канал, что делает синхронизация данных.
Но синхронизация данных каналане полный, а пошаговый. Основываясь на инкрементной подписке и потреблении двоичного журнала, канал может:
- Зеркальное отображение базы данных
- Резервное копирование базы данных в режиме реального времени
- Создание индекса и обслуживание в реальном времени
- Обновление бизнес-кэша (кеша)
- Инкрементальная обработка данных с бизнес-логикой
3. Как построить канал
3.1 Сначала есть сервер MySQL
Текущий канал поддерживает исходные версии MySQL, включая 5.1.x, 5.5.x, 5.6.x, 5.7.x, 8.0.x.
Сервер MySQL, установленный на моем сервере Linux, имеет версию 5.7.
Здесь не будет продемонстрирована установка MySQL, она относительно проста, и в Интернете есть множество учебных пособий.
Затем в MySQL нужно создать пользователя и авторизовать:
-- 使用命令登录:mysql -u root -p
-- 创建用户 用户名:canal 密码:Canal@123456
create user 'canal'@'%' identified by 'Canal@123456';
-- 授权 *.*表示所有库
grant SELECT, REPLICATION SLAVE, REPLICATION CLIENT on *.* to 'canal'@'%' identified by 'Canal@123456';
Затем установите следующую информацию в файле конфигурации MySQL my.cnf:
[mysqld]
# 打开binlog
log-bin=mysql-bin
# 选择ROW(行)模式
binlog-format=ROW
# 配置MySQL replaction需要定义,不要和canal的slaveId重复
server_id=1
После изменения файла конфигурации перезапустите MySQL и с помощью команды проверьте, включен ли режим binlog:
Просмотрите список файлов журнала binlog:
Просмотрите записываемый в данный момент файл binlog:
Сервер MySQL сделан здесь, очень просто.
3.2 Установить канал
Перейдите на страницу загрузки официального сайта, чтобы скачать:GitHub.com/Alibaba/Misery…
Я скачал версию 1.1.4 здесь:
распаковатьcanal.deployer-1.1.4.tar.gzМы можем видеть четыре папки внутри:
Затем откройте файл конфигурации conf/example/instance.properties, информация о конфигурации выглядит следующим образом:
## mysql serverId , v1.0.26+ will autoGen
## v1.0.26版本后会自动生成slaveId,所以可以不用配置
# canal.instance.mysql.slaveId=0
# 数据库地址
canal.instance.master.address=127.0.0.1:3306
# binlog日志名称
canal.instance.master.journal.name=mysql-bin.000001
# mysql主库链接时起始的binlog偏移量
canal.instance.master.position=154
# mysql主库链接时起始的binlog的时间戳
canal.instance.master.timestamp=
canal.instance.master.gtid=
# username/password
# 在MySQL服务器授权的账号密码
canal.instance.dbUsername=canal
canal.instance.dbPassword=Canal@123456
# 字符集
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
# table regex .*\\..*表示监听所有表 也可以写具体的表名,用,隔开
canal.instance.filter.regex=.*\\..*
# mysql 数据解析表的黑名单,多个表用,隔开
canal.instance.filter.black.regex=
Здесь я использую систему win10, поэтому найдите файл startup.bat в каталоге bin для запуска:
Сообщается об ошибке при запуске, pit:
Чтобы изменить сценарий запуска startup.bat:
Затем снова запустите скрипт:
Это началось успешно.
Операции Java Client
Впервые представленный maven зависит от:
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.4</version>
</dependency>
Затем создайте проект канала и соберите его с помощью SpringBoot, как показано ниже:
Используйте функцию жизненного цикла Spring Bean afterPropertiesSet() в классе CannalClient:
@Component
public class CannalClient implements InitializingBean {
private final static int BATCH_SIZE = 1000;
@Override
public void afterPropertiesSet() throws Exception {
// 创建链接
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1", 11111), "example", "", "");
try {
//打开连接
connector.connect();
//订阅数据库表,全部表
connector.subscribe(".*\\..*");
//回滚到未进行ack的地方,下次fetch的时候,可以从最后一个没有ack的地方开始拿
connector.rollback();
while (true) {
// 获取指定数量的数据
Message message = connector.getWithoutAck(BATCH_SIZE);
//获取批量ID
long batchId = message.getId();
//获取批量的数量
int size = message.getEntries().size();
//如果没有数据
if (batchId == -1 || size == 0) {
try {
//线程休眠2秒
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
//如果有数据,处理数据
printEntry(message.getEntries());
}
//进行 batch id 的确认。确认之后,小于等于此 batchId 的 Message 都会被确认。
connector.ack(batchId);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
connector.disconnect();
}
}
/**
* 打印canal server解析binlog获得的实体类信息
*/
private static void printEntry(List<Entry> entrys) {
for (Entry entry : entrys) {
if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
//开启/关闭事务的实体类型,跳过
continue;
}
//RowChange对象,包含了一行数据变化的所有特征
//比如isDdl 是否是ddl变更操作 sql 具体的ddl sql beforeColumns afterColumns 变更前后的数据字段等等
RowChange rowChage;
try {
rowChage = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);
}
//获取操作类型:insert/update/delete类型
EventType eventType = rowChage.getEventType();
//打印Header信息
System.out.println(String.format("================》; binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
eventType));
//判断是否是DDL语句
if (rowChage.getIsDdl()) {
System.out.println("================》;isDdl: true,sql:" + rowChage.getSql());
}
//获取RowChange对象里的每一行数据,打印出来
for (RowData rowData : rowChage.getRowDatasList()) {
//如果是删除语句
if (eventType == EventType.DELETE) {
printColumn(rowData.getBeforeColumnsList());
//如果是新增语句
} else if (eventType == EventType.INSERT) {
printColumn(rowData.getAfterColumnsList());
//如果是更新的语句
} else {
//变更前的数据
System.out.println("------->; before");
printColumn(rowData.getBeforeColumnsList());
//变更后的数据
System.out.println("------->; after");
printColumn(rowData.getAfterColumnsList());
}
}
}
}
private static void printColumn(List<Column> columns) {
for (Column column : columns) {
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
}
}
}
Вышеприведенное завершает код клиента Java. Здесь нет конкретной обработки, просто печать, сначала есть интуитивное ощущение.
Наконец, мы начинаем тест, сначала запускаем MySQL, Canal Server и только что написанный проект Spring Boot. Затем создайте таблицу:
CREATE TABLE `tb_commodity_info` (
`id` varchar(32) NOT NULL,
`commodity_name` varchar(512) DEFAULT NULL COMMENT '商品名称',
`commodity_price` varchar(36) DEFAULT '0' COMMENT '商品价格',
`number` int(10) DEFAULT '0' COMMENT '商品数量',
`description` varchar(2048) DEFAULT '' COMMENT '商品描述',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='商品信息表';
Затем мы можем увидеть следующую информацию в консоли:
Если вы добавите новый фрагмент данных в таблицу:
INSERT INTO tb_commodity_info VALUES('3e71a81fd80711eaaed600163e046cc3','叉烧包','3.99',3,'又大又香的叉烧包,老人小孩都喜欢');
Консоль может видеть следующую информацию:
Суммировать
Преимущество канала в том, чтоНет вторжения в бизнес-код,потому чтоСинхронизация данных на основе мониторинга журналов binlog. Производительность в реальном времени также может быть квазиреальной, что является относительно распространенной схемой синхронизации данных для многих предприятий.
Изучив вышеизложенное, мы должны понять, что такое канал, его принцип и использование. На самом деле это только начало, потому что в реальном проекте мы так не играем...
Актуальный проект мыНастройте режим MQ, сотрудничайте с RocketMQ или Kafka, канал будет отправлять данные в тему MQ, а затем обрабатывать их через потребителей очереди сообщений..
Развертывание Canal также поддерживает кластеры и требует взаимодействия с ZooKeeper для управления кластерами.
Canal также имеет простой интерфейс веб-администрирования.
Я расскажу об этом в следующей статьеКластерное развертывание Canal с Kafka для синхронизации данных с Redis.
Использованная литература:официальный сайт канала
болтовня
Код для всех приведенных выше примеров был загружен на Github:
Если вы считаете, что эта статья была вам полезна, ставьте лайк~
Ваши лайки - самая большая мотивация для моего творчества~
Если вы хотите впервые увидеть мои обновленные статьи, вы можете выполнить поиск в общедоступной учетной записи на WeChat "java技术爱好者
",Не хочу быть соленой рыбой, я программист, стремящийся запомниться всем. Увидимся в следующий раз! ! !
Возможности ограничены, если есть какие-то ошибки или неуместности, просьба критиковать и исправлять их, учиться и обмениваться вместе!