WX искал [внутренние дела программиста], и ответ [666] был замечательным.
1. Что такое канал?
canal
Это инфраструктура, разработанная Alibaba, которая обеспечивает подписку и потребление дополнительных данных на основе анализа добавочного журнала базы данных.JAVA
разработка, в настоящее время поддерживается толькоMysql
иMariaDB
(аналогично mysql).
Так что же такое инкрементный журнал базы данных?
Существует много типов журналов MySQL, в основном в том числе: журналы ошибок, журналы запросов, журналы медленных запросов, журналы транзакций и двоичные журналы. иMySQL
Изменения данных, происходящие в базе данных (DML
(язык манипулирования данными) язык манипулирования данными, то есть мы знакомы с добавлениями, удалениями и изменениями), будет в бинарном логе (binary log
) в виде хранения.
2. Принцип канала
Представляемcanal
Прежде чем принцип, давайте рассмотримMySQL
Принцип синхронизации master-slave, который может дать вам лучшее пониманиеcanal
рабочий механизм.
1. Принцип синхронизации master-slave MySQL:
Синхронизация master-slave MySQL также называется разделением чтения и записи, что может улучшить нагрузку и отказоустойчивость базы данных и обеспечить высокую доступность базы данных.
Давайте сначала проанализируем принципиальную схему синхронизации master-slave MySQL:
Вышеуказанные фотографии взяты из Интернета, если есть какие-либо нарушения, пожалуйста, свяжитесь с нами, чтобы удалить
Процесс работы мастер-узла:
когдаmaster
После изменения данных узла (удаления, обновления, вставки или создания функций, хранимых процедур и т. д.)binary log
Пишите записи в журнал, эти записи еще называют二进制日志事件
(бинарные события журнала).
show binlog events
Эти события последовательно записываются в бинарный журнал. Когда подчиненный узел начинает подключаться к главному узлу, главный узел открывает поток дампа binlog для подчиненного узла (отвечающего за передачу данных binlog).
Как только бинарный журнал главного узла изменится, бинарный журналПоток дампа уведомит подчиненный узел о наличии бинарных журналов, которые можно передать, и отправит соответствующее содержимое бинарного журнала на подчиненный узел.
Процесс работы подчиненного узла:
На ведомом узле будут созданы два потока: одинпоток ввода/вывода,ОдинSQL-поток. Поток ввода/вывода подключается к главному узлу, аbinlog dump
Поток отправит содержимое binlog потоку ввода-вывода.
После того, как поток ввода/вывода получает содержимое binlog, он записывает содержимое в локальный журнал ретрансляции. Поток sql читает журнал ретрансляции, записанный потоком ввода-вывода, и записывает содержимое журнала ретрансляции в подчиненную базу данных.
2, принцип канала
После понимания вышеприведенного принципа синхронизации master-slave в MySQL рабочий механизм канала становится понятным. По сути, канал имитирует протокол взаимодействия между подчиненным узлом и главным узлом в базе данных MySQL, притворяется подчиненным узлом MySQL и отправляет сообщение главному узлу MySQL.dump协议
, главный узел MySQL получает запрос на создание дампа и начинает передавать двоичный журнал подчиненному узлу (то есть,canal
).
Вышеуказанные фотографии взяты из Интернета, если есть какие-либо нарушения, пожалуйста, свяжитесь с нами, чтобы удалить
Просто говорите и не практикуйте фальшивую ручку, давайте сделаем это!
В-третьих, канал реализует «мониторинг» MySQL.
Прежде чем писать код, давайте внесем некоторые изменения в MySQL, я не буду вдаваться в подробности установки MySQL, а основные операции будут выполнены.
1. Проверьте, включена ли в MySQL функция двоичного журнала.
show binary logs
Если он не включен, он находится в состоянии, показанном на картинке.Обычные пользователи не имеют права на эту команду, но я имею, цк цк цк!Если ручная активация не требуется и
my.cnf
конфигурация в файлеbinlog-format
заRow
модель
log-bin=mysq-bin
binlog-format=Row
log-bin
даbinlog
место хранения файловbinlog-format
Установите способ, которым MySQL реплицирует лог-бин
Три метода репликации MySQL:
Репликация на основе операторов (SBR)
- Преимущества: sql, который изменяет данные, хранится в binlog, нет необходимости записывать каждый sql и изменения данных, объем binlog будет небольшим, накладные расходы ввода-вывода низкие, а производительность хорошая.
- Недостаток: это приведет к несогласованности данных в master-slave.
репликация на основе строк (RBR)
- Преимущества: Контекстная информация каждого SQL-оператора не записывается, только какие данные были изменены и что было изменено.
- Недостатки: Объем бинарного журнала очень велик, особенно при использовании атрибута alter table будет сгенерирован большой объем данных бинарного журнала.
Смешанная репликация (MBR)
- Соответственно есть три формата бинлога: STATEMENT, ROW, MIXED.
2. Создайте пользователя с разрешением на работу с MySQL для канала.
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;
3. Установите канал
Адрес загрузки: https://github.com/alibaba/canal/releases
После скачивания выберите версию например: canal.deployer-xxx.tar.gz
4. Настроить канал
Для изменения файла instance.properties необходимо добавить правила мониторинга баз данных и таблиц, Canal может отслеживать всю базу данных или таблицу, что более гибко.
vim conf/example/instance.properties
#################################################
## mysql serverId
canal.instance.mysql.slaveId = 2020
# position info 修改自己的数据库(canal要监听的数据库 地址 )
canal.instance.master.address = 127.0.0.1:3306
canal.instance.master.journal.name =
canal.instance.master.position =
canal.instance.master.timestamp =
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
# username/password 修改成自己 数据库信息的账号 (单独开一个 准备阶段创建的账号)
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
canal.instance.defaultDatabaseName =
canal.instance.connectionCharset = UTF-8
# table regex 表的监听规则
# canal.instance.filter.regex = blogs\.blog_info
canal.instance.filter.regex = .\*\\\\..\*
# table black regex
canal.instance.filter.black.regex =
пусковой канал
sh bin/startup.sh
Посмотрите журнал сервера, чтобы убедиться, что канал запускается нормально.
vi logs/canal/canal.log
Сервер Display Canal работает сейчас успешно
2020-01-08 15:25:33.361 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## start the canal server.
2020-01-08 15:25:33.468 [main] INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[192.168.12.245:11111]
2020-01-08 15:25:34.061 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ......
5. Напишите клиентский код Java для реализации мониторинга каналов.
импортировать пакет зависимостей
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.0</version>
</dependency>
Вот простая реализация
public class MainApp {
public static void main(String... args) throws Exception {
/**
* 创建与
*/
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),
11111), "example", "", "");
int batchSize = 1000;
int emptyCount = 0;
try {
connector.connect();
/**
* 监控数据库中所有表
*/
connector.subscribe(".*\\..*");
/**
* 指定要监控的表,库名.表名
*/
//connector.subscribe("xin-master.jk_order");
connector.rollback();
//120次心跳过后未检测到,跳出
int totalEmptyCount = 120;
while (emptyCount < totalEmptyCount) {
Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
emptyCount++;
System.out.println("empty count : " + emptyCount);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
} else {
emptyCount = 0;
// System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
printEntry(message.getEntries());
}
/**
* 提交确认
*/
connector.ack(batchId);
/**
* 处理失败, 回滚数据
*/
connector.rollback(batchId);
}
System.out.println("empty too many times, exit");
} finally {
connector.disconnect();
/**
* 手动开启事务回滚
*/
//TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
}
}
private static void printEntry(List<CanalEntry.Entry> entrys) {
for (CanalEntry.Entry entry : entrys) {
if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry
.EntryType
.TRANSACTIONEND) {
continue;
}
CanalEntry.RowChange rowChage = null;
try {
rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
e);
}
CanalEntry.EventType eventType = rowChage.getEventType();
System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
eventType));
for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
if (eventType == CanalEntry.EventType.DELETE) {
printColumn(rowData.getBeforeColumnsList());
} else if (eventType == CanalEntry.EventType.INSERT) {
printColumn(rowData.getAfterColumnsList());
} else {
System.out.println("-------> before");
printColumn(rowData.getBeforeColumnsList());
System.out.println("-------> after");
printColumn(rowData.getAfterColumnsList());
}
}
}
}
private static void printColumn(List<CanalEntry.Column> columns) {
for (CanalEntry.Column column : columns) {
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
}
}
}
Здесь написан код, запустим сервис, посмотрим, что получится, так как база не работает, результаты мониторинга пустые.Далее мы выполняем
update
предложение попробовать
update jk_orderset order_no = '1111' where id = 40
Консоль обнаруживает изменения базы данных и создает файл журнала binlog.mysql-bin.000009:3830
Итак, как использовать сгенерированный файл binlog и как преобразовать его в инструкцию SQl?
<!-- mysql binlog解析 -->
<dependency>
<groupId>com.github.shyiko</groupId>
<artifactId>mysql-binlog-connector-java</artifactId>
<version>0.13.0</version>
</dependency>
Загрузите файл binlog прямо сейчас и протестируйте его локально
public static void main(String[] args) throws IOException {
String filePath = "C:\\ProgramData\\MySQL\\MySQL Server 5.7\\Data\\mysql-bin.000009:3830";
File binlogFile = new File(filePath);
EventDeserializer eventDeserializer = new EventDeserializer();
eventDeserializer.setChecksumType(ChecksumType.CRC32);
BinaryLogFileReader reader = new BinaryLogFileReader(binlogFile, eventDeserializer);
try {
for (Event event; (event = reader.readEvent()) != null; ) {
System.out.println(event.toString());
}
} finally {
reader.close();
}
}
Проверьте результат выполнения и обнаружите, что самой последней операцией базы данных является добавление индекса idx_index.
Event{header=EventHeaderV4{timestamp=1551325542000, eventType=ANONYMOUS_GTID, serverId=1, headerLength=19, dataLength=46, nextPosition=8455, flags=0}, data=null}
Event{header=EventHeaderV4{timestamp=1551325542000, eventType=QUERY, serverId=1, headerLength=19, dataLength=190, nextPosition=8664, flags=0}, data=QueryEventData{threadId=25, executionTime=0, errorCode=0, database='xin-master', sql='ALTER TABLE `jk_order`
DROP INDEX `idx_index` ,
ADD INDEX `idx_index` (`user_id`, `service_id`, `real_price`) USING BTREE'}}
Event{header=EventHeaderV4{timestamp=1551438586000, eventType=STOP, serverId=1, headerLength=19, dataLength=4, nextPosition=8687, flags=0}, data=null}
До сих пор мы реализовали мониторинг MySQL,
Четыре сценария применения канала
canal
Сценарии применения примерно следующие:
- Решить проблему задержки синхронизации master-slave в MySQL.
- Резервное копирование базы данных в режиме реального времени
- Многоуровневый индекс (у продавцов и покупателей есть собственный индекс подбазы данных)
- Реализовать обновление бизнес-кэша
- Важные деловые новости, такие как изменение цен
Сосредоточьтесь на анализе того, как канал решает проблему задержки синхронизации master-slave в MySQL.
в производственной средеMySQL
Синхронный режим ведущий-ведомый (maser-slave
) очень распространен, но для кластеров, развернутых в компьютерных залах, будут возникать задержки синхронизации. Возьмите каштан:
Статус заказа не оплачен,master
Нода модифицируется на платную, но по каким-то причинам задержанные данные не могут быть синхронизированы сslave
, то пользователь сразу проверяет статус заказа (запросы идутslave
) показывает, что платеж до сих пор не оплачен, поэтому пользователь не должен паниковать, увидев эту ситуацию.
Почему возникает задержка синхронизации ведущий-ведомый?
Когда основная библиотекаmaster
изTPS
Когда параллелизм высокий,master
Узлы одновременно генерируют операции модификации, в то время какslave
узлаsql线程
Это однопоточная обработка синхронных данных, и задержка генерируется естественным образом.
мы используемcanal
мониторинг в реальном времениmaser
Обновление данных узла (можно отслеживать для таблицы),canal
Сразу после отлова измененного SQLslave
Выполнение узла, чтобы решить проблему задержки ведущий-ведомый.
Однако причин для синхронизации ведущий-подчиненный больше, чем это.Поскольку главный-подчиненный сервер существует между машинами и компьютерными комнатами, в дополнение к причинам пропускной способности сети, стабильность сети и синхронизация между машинами являются основными причинами синхронизации ведущий-подчиненный. следует рассматривать.
Суммировать
В этой статье просто реализована функция мониторинга каналов по базе данных, и ее цель — предоставить вам способ решения проблемы.
Организовал сотни различных технических электронных книг и видеоматериалов,
嘘~
,免费
Отправить, ответить в официальном аккаунте【666
] Самовывоз. Я построил один с моими друзьями技术交流群
, вместе обсуждайте технологии и делитесь технической информацией, стремясь вместе учиться и совершенствоваться, присоединяйтесь к нам, если вам это интересно!