предисловие
В предыдущей статье мы узнали, что канал может воспринимать изменения данных из MySQL. Это связано с тем, что он имитирует протокол взаимодействия подчиненного устройства MySQL, маскируя себя под подчиненное устройство MySQL, тем самым реализуя репликацию ведущий-ведомый.
Зная это, у меня есть два вопроса, которые преследуют меня:
- Как он имитирует протокол взаимодействия с ведомым устройством MySQL?
- Это как разрешить binlog войти в него?
Сегодня автор сосредоточится на этих двух вопросах, копаясь в коде канала, чтобы выяснить это.
1. Репликация MySQL master-slave
Прежде чем говорить о канале, необходимо вернуться к принципу репликации MySQL master-slave.
Процесс суммирования приведенного выше рисунка выглядит следующим образом:
- Мастер MySQL записывает изменения данных в двоичный журнал (двоичный журнал, где записи называются событиями двоичного журнала);
- MySQL Slave копирует бинарный журнал Master в своем журнале реле;
- Ведомое устройство MySQL сбрасывает события в журнале реле, отражая изменения данных в своей базе данных.
2. Принцип канала
Вышеприведенная картина ярко описывает роль канала. Его принцип также очень прост:
- canal имитирует интерактивный протокол подчиненного устройства mysql, притворяется подчиненным устройством mysql и отправляет протокол дампа ведущему устройству mysql;
- Ведущее устройство mysql получает запрос на создание дампа и начинает передавать двоичный журнал подчиненному устройству (то есть каналу);
- canal анализирует двоичный объект журнала (оригинал — поток байтов);
- canal распределяет проанализированные объекты в MySQL, RocketMQ или ES в соответствии с бизнес-сценарием.
3. Запустите исходный код
После прочтения принципа репликации master-slave и канала MySQL, чтобы облегчить отладку, автор разветвил исходный код на GitHub и импортировал его локально.
можно найтиcom.alibaba.otter.canal.deployer.CanalLauncher
класс, который является начальным классом, с которого начинается автономная версия канала.
Здесь канал запускается прямым запуском основного метода, а в/canal/bin/startup.sh
такой же эффект.
На самом деле, канал имеет много кода и разделен на множество модульных конструкций в архитектуре, таких как анализатор событий, потребление событий, хранение в памяти, экземпляр службы, метаданные, высокая доступность и т. д.
В этой статье нет цели представить каждую реализацию во всех подробностях, поэтому необходимо серьезно написать серию каналов. В основном по двум вопросам, которые мы подняли в начале.
4. Как имитировать ведомого?
Мы сказали выше,CanalLauncher
Это начальный класс, с которого начинается канал.
После запуска основного метода канал сначала выполняет большую подготовку. Например, загрузка файлов конфигурации, инициализация очередей сообщений, запуск администратора канала, загрузка конфигурации Spring, регистрация программ-ловушек и т. д.
канал имитирует подчиненный протокол, который находится вEventParser
запустил в модуле.
В коде канала весь процесс упрощается следующим образом:
// 开始执行replication
// 1. 构造Erosa连接
ErosaConnection erosaConnection = buildErosaConnection();
// 2. 启动一个心跳线程
startHeartBeat(erosaConnection);
// 3. 执行dump前的准备工作
preDump(erosaConnection);
erosaConnection.connect();// 链接
// 查询master serverId
long queryServerId = erosaConnection.queryServerId();
if (queryServerId != 0) {
serverId = queryServerId;
}
// 4. 获取binlog最后的位置信息
EntryPosition position = findStartPosition(erosaConnection);
final EntryPosition startPosition = position;
// 加载元数据
processTableMeta(startPosition);
// 重新链接,因为在找position过程中可能有状态,需要断开后重建
erosaConnection.reconnect();
// 4. 开始dump数据
erosaConnection.dump(startPosition.getJournalName(),startPosition.getPosition(),sinkHandler);
1, рукопожатие, проверка
Прежде чем начать, канал должен сначала установить соединение с сервером MySQL и завершить аутентификацию клиента.
В MySQL протокол процесса подключения выглядит следующим образом:
В коде смотрим способ его подключения:
в,negotiate
Методы — это конкретные реализации протокола рукопожатия и аутентификации клиента. Это соответствует спецификации протокола MySQL, созданной вышеуказаннымSocket channel
для чтения и записи сетевых данных.
2. Подготовка перед сбросом
После правильного подключения к MySQL перед запуском команды дампа инициализируйте некоторую информацию о конфигурации.
Идея состоит в том, чтобы выполнять операторы SQL и получать информацию через исполнитель MySQL.
Код не липкий, но операторы, которые они выполняют, следующие:
show variables like 'binlog_format' #获取binlog format格式
show variables like 'binlog_row_image' #获取binlog image格式
show variables like 'server_id' #获取matser serverId
show master status #获取binlog名称和position
3. Зарегистрируйте подчиненное устройство
позвони сейчасerosaConnection.dump(binlogfilename,binlogPosition,func)
способ зарегистрировать ведомое устройство и отправить команду дампа.
в настоящее время используетCOM_BINLOG_DUMP
Для отправки перед запросом событий binlog зарегистрируйте подчиненный сервер на главном сервере, и его инструкцииCOM_REGISTER_SLAVE
.
После завершения регистрации отправляется запрос дампа, и его командаCOM_BINLOG_DUMP
.
После выполнения этого кода мы передаемshow processlist;
Глядя на процесс, вы можете увидеть состояние потока дампа.
id | user | host | db | command | time | state |
---|---|---|---|---|---|---|
139 | canal | localhost:62901 | null | Binlog Dump | 3 | Master has sent all binlog to slave; waiting for more updates |
5. Как парсить данные бинлога?
В приведенной выше главе мы видели, что главный сервер MySQL принял подчиненный сервер канала, затем, когда канал получает содержимое binlog, И как его разобрать?
Во-первых, помните, что при настройке сервера MySQL мыbinlog-format
Установите режим ROW, который представляет собой репликацию на основе строк.
Каждое изменение данных в бинлоге можно назвать событием, в режиме ROW существует несколько основных типов событий:
событие | SQL команда | содержимое строк |
---|---|---|
TABLE_MAP_EVENT | null | Определите таблицу, которая будет изменена. |
WRITE_ROWS_EVENT | вставлять | данные строки для вставки |
DELETE_ROWS_EVENT | Удалить | удаленные данные |
UPDATE_ROWS_EVENT | возобновить | исходные данные + данные для изменения |
Каждый раз, когда данные изменяются, будут инициированы два события, сначала сообщая вам информацию о таблице, которая должна быть изменена, а затем сообщая вам об измененном содержимом строки.
НапримерTABLE_MAP_EVENT + WRITE_ROWS_EVENT
.
После того, как канал получает данные бинлога, он не сразу парсит их в привычные данные JSON, а запускается при их отправке.
Например, мы решили использоватьRocketMQ
, то массив байтов в бинлоге перед отправкой конвертируется в объект.
// 并发构造
EntryRowData[] datas = MQMessageUtils.buildMessageData(message, executor);
// 串行分区
List<FlatMessage> flatMessages = MQMessageUtils.messageConverter(datas, message.getId());
В этих двух методах преобразование массива байтов в объект завершено. конвертирован вFlatMessage
Объект становится структурой данных, которую мы потребляем в очереди сообщений.
public class FlatMessage implements Serializable {
private long id;
private String database;
private String table;
private List<String> pkNames;
private Boolean isDdl;
private String type;
// binlog executeTime
private Long es;
// dml build timeStamp
private Long ts;
private String sql;
private Map<String, Integer> sqlType;
private Map<String, String> mysqlType;
private List<Map<String, String>> data;
private List<Map<String, String>> old;
}
Суммировать
Как я сказал в начале этой статьи, когда я впервые узнал о механизме канала, я действительно почувствовал, что это невероятно.
А как он имитирует ведомое устройство MySQL? Мне всегда интересно, есть ли в этом какая-то черная технология. . .
На самом деле это связано с нежеланием автора MySQL.
MySQL уже сформулировал различные интерфейсные протоколы, и как подключиться, проверять, регистрацию и дамп, четко написано там.
Это именно то, о чем говорится в предложении: цветы расцветают как раз, просто подожди, пока ты придешь~