Углубленный анализ промежуточного ПО - Canal

задняя часть MySQL сервер ZooKeeper

canal: Инкрементная подписка и компонент потребления binlog базы данных Alibaba mysql.

MySQL binlog

Репликация master-slave MySQL

сервер mysql изменить конфигурацию и перезапустить

$ vi /etc/my.cnf
[mysqld]
log-bin=mysql-bin
binlog-format=ROW
server_id=1

$ mysql -uroot
CREATE USER canal IDENTIFIED BY 'canal';  
GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;

$ sudo service mysqld start
Вопрос: С какой целью создается пользователь канала? Можно ли использовать существующее имя пользователя, например root.
Ответ: Некоторые пользователи не имеют полномочий REPLICATION SLAVE, REPLICATION CLIENT.При подключении к каналу с этими пользователями бинлог не может быть получен.
Здесь пользователь канала авторизовал все разрешения, поэтому клиент может получить бинарный журнал из канала.

Проясняются две концепции: сервер канала подключается к mysql, а клиент подключается к серверу канала.

  • canal относится к серверу канала, который читает бинарный журнал mysql и сохраняет его после синтаксического анализа.
  • Клиент обращается к бинлогу, который потребляет канальный сервер

Машина подключается к серверу и проверяет, что формат binlog — ROW.

$ mysql -h192.168.6.52 -ucanal -pcanal
mysql> show variables like '%binlog_format%';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| binlog_format | ROW   |
+---------------+-------+

Принцип репликации master-slave mysql:

  • Мастер записывает изменения в двоичный журнал;
  • Подчиненное устройство копирует события двоичного журнала ведущего в свой журнал ретрансляции;
  • Ведомый повторяет события в журнале реле, изменяя данные, чтобы они отражали его собственные.

binlog

Перед запуском канала давайте сначала разберемся, что такое бинлог mysql:

mysql> show binlog events;
| Log_name         | Pos   | Event_type  | Server_id | End_log_pos | Info                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                             |
+------------------+-------+-------------+-----------+-------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| mysql-bin.000001 |     4 | Format_desc |         1 |         106 | Server ver: 5.1.73-log, Binlog ver: 4                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                            |
| mysql-bin.000001 |   106 | Query       |         1 |        1864 | use `mysql`; CREATE TABLE IF NOT EXISTS db (   Host char(60) binary DEFAULT '' NOT NULL, Db char(64) binary DEFAULT '' NOT NULL, User char(16) binary DEFAULT '' NOT NULL, Select_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, Insert_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, Update_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, Delete_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, Create_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, Drop_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, Grant_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, References_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, Index_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, Alter_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, Create_tmp_table_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, Lock_tables_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, Create_view_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, Show_view_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, Create_routine_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, Alter_routine_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, Execute_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, Event_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, Trigger_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, PRIMARY KEY Host (Host,Db,User), KEY User (User) ) engine=MyISAM CHARACTER SET utf8 COLLATE utf8_bin comment='Database privileges' |
| mysql-bin.000001 |  1864 | Query       |         1 |        3518 | use `mysql`; CREATE TABLE IF NOT EXISTS host (  Host char(60) binary DEFAULT '' NOT NULL, Db char(64) binary DEFAULT '' NOT NULL, Select_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, Insert_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, Update_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, Delete_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, Create_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, Drop_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, Grant_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, References_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, Index_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, Alter_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, Create_tmp_table_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, Lock_tables_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, Create_view_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, Show_view_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, Create_routine_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, Alter_routine_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, Execute_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, Trigger_priv enum('N','Y') COLLATE utf8_general_ci DEFAULT 'N' NOT NULL, PRIMARY KEY Host (Host,Db) ) engine=MyISAM CHARACTER SET utf8 COLLATE utf8_bin comment='Host privileges;  Merged with database privileges' |

Файл binlog mysql-bin.xxx и индексный файл будут сгенерированы в файле данных mysql.

[qihuang.zheng@dp0652 canal]$ ll /var/lib/mysql/
总用量 26228
drwx------ 2 mysql mysql     4096 10月 11 14:05 canal_test
-rw-rw---- 1 mysql mysql 10485760 9月  30 22:12 ibdata1
-rw-rw---- 1 mysql mysql  5242880 10月 11 09:57 ib_logfile0
-rw-rw---- 1 mysql mysql  5242880 10月 11 09:57 ib_logfile1
drwx------ 2 mysql mysql     4096 8月   2 11:01 mysql
-rw-rw---- 1 mysql mysql    18451 8月   2 11:01 mysql-bin.000001
-rw-rw---- 1 mysql mysql   929226 8月   2 11:01 mysql-bin.000002
-rw-rw---- 1 mysql mysql  4890698 9月  30 22:12 mysql-bin.000003
-rw-rw---- 1 mysql mysql      897 10月 11 14:06 mysql-bin.000004
-rw-rw---- 1 mysql mysql       76 10月 11 09:57 mysql-bin.index
srwxrwxrwx 1 mysql mysql        0 10月 11 09:57 mysql.sock

Операции с mysql будут иметь двоичные события, записанные в файле binlog. Некоторые из следующих операций включают создание пользователя, авторизацию, создание базы данных, создание таблицы и вставку записи.

[qihuang.zheng@dp0652 canal]$ sudo strings /var/lib/mysql/mysql-bin.000004
5.1.73-log
CREATE USER canal IDENTIFIED BY 'canal'
root    localhost
GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%'
FLUSH PRIVILEGES
canal_test
create database canal_test    ===》创建数据库
canal_test
create table test (   uid int (4) primary key not null auto_increment,   name varchar(10) not null)  ==》创建表
canal_test
BEGIN     ==》插入记录,这里有事务。但是没有把具体的语句打印出来
canal_test
test
canal_test
COMMIT

Canal QuickStart

canal & config

Разверните сервер канала на 6.52 и запустите его. Посмотреть лог канала:

[qihuang.zheng@dp0652 canal]$ cat logs/canal/canal.log
2017-10-11 11:31:52.076 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## start the canal server.
2017-10-11 11:31:52.151 [main] INFO  com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[192.168.6.52:11111]
2017-10-11 11:31:52.644 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ......

Просмотрите журнал экземпляра:

[qihuang.zheng@dp0652 canal]$ cat logs/example/example.log
2017-10-11 11:31:52.435 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]
2017-10-11 11:31:52.444 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties]
2017-10-11 11:31:52.587 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example
2017-10-11 11:31:52.599 [main] INFO  c.a.otter.canal.instance.core.AbstractCanalInstance - start successful....
2017-10-11 11:31:52.679 [destination = example , address = /127.0.0.1:3306 , EventParser] WARN  c.a.otter.canal.parse.inbound.mysql.MysqlEventParser - prepare to find start position just show master status

В конфиге канала канала есть несколько конфигурационных файлов.

➜  canal.deployer-1.0.24 tree conf
conf
├── canal.properties
├── example
│   └── instance.properties
├── logback.xml
└── spring
    ├── default-instance.xml
    ├── file-instance.xml
    ├── group-instance.xml
    ├── local-instance.xml
    └── memory-instance.xml

Давайте сначала посмотрим на первые четыре элемента конфигурации общего свойства canal.properties:

canal.id= 1
canal.ip=
canal.port= 11111
canal.zkServers=

canal.id — это номер канала.В кластерной среде идентификаторы разных каналов отличаются.Обратите внимание, что они отличаются от server_id в mysql.
ip здесь не указан, по умолчанию стоит машина, например выше указан 192.168.6.52, а номер порта 11111. zk используется для кластера каналов.

Взгляните на конфигурации, связанные с пунктами назначения в canal.properties:

#################################################
#########       destinations        ############# 
#################################################
canal.destinations = example
canal.conf.dir = ../conf
canal.auto.scan = true
canal.auto.scan.interval = 5

canal.instance.global.mode = spring 
canal.instance.global.lazy = false
canal.instance.global.spring.xml = classpath:spring/file-instance.xml

Здесь canal.destinations = example может быть задано несколько, например, example1, example2,
Вам нужно создать две соответствующие папки, и в каждой папке есть файл instance.properties.

Spring используется для глобального управления экземплярами канала.Файл-instance.xml здесь в конечном итоге создаст экземпляры всех целевых экземпляров:

<bean class="com.alibaba.otter.canal.instance.spring.support.PropertyPlaceholderConfigurer" lazy-init="false">
    <property name="ignoreResourceNotFound" value="true" />
    <property name="systemPropertiesModeName" value="SYSTEM_PROPERTIES_MODE_OVERRIDE"/><!-- 允许system覆盖 -->
    <property name="locationNames">
        <list>
            <value>classpath:canal.properties</value>
            <value>classpath:${canal.instance.destination:}/instance.properties</value>
        </list>
    </property>
</bean>
<bean id="instance" class="com.alibaba.otter.canal.instance.spring.CanalInstanceWithSpring">
    <property name="destination" value="${canal.instance.destination}" />
    <property name="eventParser"><ref local="eventParser" /></property>
    <property name="eventSink"><ref local="eventSink" /></property>
    <property name="eventStore"><ref local="eventStore" /></property>
    <property name="metaManager"><ref local="metaManager" /></property>
    <property name="alarmHandler"><ref local="alarmHandler" /></property>
</bean>

Например, если canal.instance.destination равно example, будет загружен файл конфигурации example/instance.properties.

Конфигурационный файл instance.properties в примере изменять не нужно. Сервер канала может запускать несколько экземпляров канала.

#################################################
## mysql serverId,这里的slaveId不能和myql集群中已有的server_id一样
canal.instance.mysql.slaveId = 1234

# position info 这里连接的是mysql master的地址。
canal.instance.master.address = 127.0.0.1:3306
canal.instance.master.journal.name = 
canal.instance.master.position = 
canal.instance.master.timestamp = 

#canal.instance.standby.address = 
#canal.instance.standby.journal.name =
#canal.instance.standby.position = 
#canal.instance.standby.timestamp = 

# username/password
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
canal.instance.defaultDatabaseName =
canal.instance.connectionCharset = UTF-8

canal.instance.filter.regex = .*\\..*
canal.instance.filter.black.regex =  
#################################################

simple client

Создайте базу данных на mysql, создайте таблицу, вставьте запись, а затем измените запись.

create database canal_test;
use canal_test;
create table test (   uid int (4) primary key not null auto_increment,   name varchar(10) not null);
insert into test (name) values('10');

Измените информацию о соединении в тестовом примере клиента. Пример соответствует имени экземпляра канала.

String destination = "example";
CanalConnector connector = CanalConnectors.newSingleConnector(
    new InetSocketAddress("192.168.6.52", 11111), destination, "canal", "canal");

Примечание. Если в соединении возникает ошибка, тестовый пример клиента немедленно завершается, печатая ## stop the canal client. Обычно терминал не закрывается, он продолжает работать.

Результат консоли SimpleCanalClientTest выглядит следующим образом:

****************************************************
* Batch Id: [1] ,count : [2] , memsize : [263] , Time : 2017-10-11 14:06:06
* Start : [mysql-bin.000004:396:1507701897000(2017-10-11 14:04:57)] 
* End : [mysql-bin.000004:491:1507701904000(2017-10-11 14:05:04)] 
****************************************************

----------------> binlog[mysql-bin.000004:396] , name[canal_test,] , eventType : QUERY , executeTime : 1507701897000 , delay : 69710ms
 sql ----> create database canal_test

----------------> binlog[mysql-bin.000004:491] , name[canal_test,test] , eventType : CREATE , executeTime : 1507701904000 , delay : 62723ms
 sql ----> create table test (   uid int (4) primary key not null auto_increment,   name varchar(10) not null)

Вставьте запись: (где и uid, и обновление имени равны истине)

****************************************************
* Batch Id: [2] ,count : [3] , memsize : [186] , Time : 2017-10-11 14:06:32
* Start : [mysql-bin.000004:659:1507701989000(2017-10-11 14:06:29)] 
* End : [mysql-bin.000004:822:1507701989000(2017-10-11 14:06:29)] 
****************************************************

================> binlog[mysql-bin.000004:659] , executeTime : 1507701989000 , delay : 3142ms
 BEGIN ----> Thread id: 11
----------------> binlog[mysql-bin.000004:785] , name[canal_test,test] , eventType : INSERT , executeTime : 1507701989000 , delay : 3154ms
uid : 1    type=int(4)    update=true
name : 10    type=varchar(10)    update=true
----------------
 END ----> transaction id: 0
================> binlog[mysql-bin.000004:822] , executeTime : 1507701989000 , delay : 3179ms

Измените запись: (где обновление имени равно true)

****************************************************
* Batch Id: [3] ,count : [3] , memsize : [202] , Time : 2017-10-11 14:49:11
* Start : [mysql-bin.000004:897:1507704547000(2017-10-11 14:49:07)] 
* End : [mysql-bin.000004:1076:1507704547000(2017-10-11 14:49:07)] 
****************************************************

================> binlog[mysql-bin.000004:897] , executeTime : 1507704547000 , delay : 4048ms
 BEGIN ----> Thread id: 13
----------------> binlog[mysql-bin.000004:1023] , name[canal_test,test] , eventType : UPDATE , executeTime : 1507704547000 , delay : 4059ms
uid : 1    type=int(4)
name : zqhxuyuan    type=varchar(10)    update=true
----------------
 END ----> transaction id: 0
================> binlog[mysql-bin.000004:1076] , executeTime : 1507704547000 , delay : 4096ms

В дополнение к example.log есть еще meta.log под экземпляром примера под инсталляционным пакетом канала

[qihuang.zheng@dp0652 canal]$ cat logs/example/meta.log
2017-10-11 14:06:03.728 - clientId:1001 cursor:[mysql-bin.000004,396,1507701897000] address[/127.0.0.1:3306]
2017-10-11 14:06:04.589 - clientId:1001 cursor:[mysql-bin.000004,491,1507701904000] address[localhost/127.0.0.1:3306]
2017-10-11 14:06:29.589 - clientId:1001 cursor:[mysql-bin.000004,822,1507701989000] address[localhost/127.0.0.1:3306]
2017-10-11 14:49:08.589 - clientId:1001 cursor:[mysql-bin.000004,1076,1507704547000] address[localhost/127.0.0.1:3306]

Cannal Internal Overview

canal client & server

Связь между клиентом канала и сервером канала осуществляется в режиме C/S.Клиент принимает NIO, а сервер использует Netty.
После запуска сервера канала, если нет клиента канала, сервер канала не будет обращаться к mysql для извлечения бинлога.

try {
    connector.connect();
    connector.subscribe();
    while (running) {
        Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
        long batchId = message.getId();
        int size = message.getEntries().size();
        printSummary(message, batchId, size);
        printEntry(message.getEntries());
        connector.ack(batchId); // 提交确认
        connector.rollback(batchId); // 处理失败, 回滚数据
    }
} finally {
    connector.disconnect();
}

Отношения между клиентом канала и сервером канала относятся к возрастающей подписке/потреблению Блок-схема выглядит следующим образом: (сторона C — клиент канала, сторона S — сервер канала)

Когда клиент канала вызывает метод connect(), тип PacketType.HANDSHAKE, а затем записывается CLIENTAUTHENTICATION. Затем вызовите метод subscribe() типа SUBSCRIPTION.

Соответствующий сервер использует netty для обработки запросов RPC (CanalServerWithNetty):

bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
    public ChannelPipeline getPipeline() throws Exception {
        ChannelPipeline pipelines = Channels.pipeline();
        pipelines.addLast(FixedHeaderFrameDecoder.class.getName(), new FixedHeaderFrameDecoder());
        // 处理客户端的HANDSHAKE请求
        pipelines.addLast(HandshakeInitializationHandler.class.getName(),
            new HandshakeInitializationHandler(childGroups));
        // 处理客户端的CLIENTAUTHENTICATION请求
        pipelines.addLast(ClientAuthenticationHandler.class.getName(),
            new ClientAuthenticationHandler(embeddedServer));

        // 处理客户端的会话请求,包括SUBSCRIPTION,GET等
        SessionHandler sessionHandler = new SessionHandler(embeddedServer);
        pipelines.addLast(SessionHandler.class.getName(), sessionHandler);
        return pipelines;
    }
});

После того, как ClientAuthenticationHandler обрабатывает аутентификацию, HandshakeInitializationHandler и ClientAuthenticationHandler удаляются.

Взяв клиент, отправляющий GET, сервер, получающий бинарный журнал от mysql, и возвращающий СООБЩЕНИЯ клиенту в качестве примера, чтобы проиллюстрировать процесс взаимодействия rpc между клиентом и сервером:

SimpleCanalConnector отправляет запрос GET и считывает процесс результата ответа:

public Message getWithoutAck(int batchSize, Long timeout, TimeUnit unit) throws CanalClientException {
    waitClientRunning();
    int size = (batchSize <= 0) ? 1000 : batchSize;
    long time = (timeout == null || timeout < 0) ? -1 : timeout; // -1代表不做timeout控制
    if (unit == null) unit = TimeUnit.MILLISECONDS;

    // client发送GET请求
    writeWithHeader(Packet.newBuilder()
        .setType(PacketType.GET)
        .setBody(Get.newBuilder()
            .setAutoAck(false)
            .setDestination(clientIdentity.getDestination())
            .setClientId(String.valueOf(clientIdentity.getClientId()))
            .setFetchSize(size)
            .setTimeout(time)
            .setUnit(unit.ordinal())
            .build()
            .toByteString())
        .build()
        .toByteArray());
    // client获取GET结果    
    return receiveMessages();
}

private Message receiveMessages() throws IOException {
    // 读取server发送的数据包
    Packet p = Packet.parseFrom(readNextPacket());
    switch (p.getType()) {
        case MESSAGES: {
            Messages messages = Messages.parseFrom(p.getBody());
            Message result = new Message(messages.getBatchId());
            for (ByteString byteString : messages.getMessagesList()) {
                result.addEntry(Entry.parseFrom(byteString));
            }
            return result;
        }
    }
}

Сервер SessionHandler обрабатывает процесс запроса GET, отправленный клиентом:

case GET:
    // 读取客户端发送的数据包,封装为Get对象
    Get get = CanalPacket.Get.parseFrom(packet.getBody());
    // destination表示canal instance
    if (StringUtils.isNotEmpty(get.getDestination()) && StringUtils.isNotEmpty(get.getClientId())) {
        clientIdentity = new ClientIdentity(get.getDestination(), Short.valueOf(get.getClientId()));
        Message message = null;
        if (get.getTimeout() == -1) {// 是否是初始值
            message = embeddedServer.getWithoutAck(clientIdentity, get.getFetchSize());
        } else {
            TimeUnit unit = convertTimeUnit(get.getUnit());
            message = embeddedServer.getWithoutAck(clientIdentity, get.getFetchSize(), get.getTimeout(), unit);
        }
        // 设置返回给客户端的数据包类型为MESSAGES   
        Packet.Builder packetBuilder = CanalPacket.Packet.newBuilder();
        packetBuilder.setType(PacketType.MESSAGES);
        // 构造Message
        Messages.Builder messageBuilder = CanalPacket.Messages.newBuilder();
        messageBuilder.setBatchId(message.getId());
        if (message.getId() != -1 && !CollectionUtils.isEmpty(message.getEntries())) {
            for (Entry entry : message.getEntries()) {
                messageBuilder.addMessages(entry.toByteString());
            }
        }
        packetBuilder.setBody(messageBuilder.build().toByteString());
        // 输出数据,返回给客户端
        NettyUtils.write(ctx.getChannel(), packetBuilder.build().toByteArray(), null);
    }

Введение в протокол get/ack/rollback:

  • Message getWithoutAck(int batchSize), позволяет указать размер партии, вы можете получить несколько частей за раз, каждый раз возвращается объект Message, а содержимое:
    – уникальный идентификатор батча
    – записи определенного объекта данных, соответствующий формат объекта данных:EntryProtocol.proto
  • void rollback(long batchId), Gu Mingsi, откатывает последний запрос на получение и снова извлекает данные. Отправить на основе идентификатора партии, полученного с помощью get, чтобы избежать неправильной работы
  • void ack(long batchId), Guming Siyi, подтверждает, что потребление прошло успешно, и информирует сервер о необходимости удалить данные. Отправить на основе идентификатора партии, полученного с помощью get, чтобы избежать неправильной работы

Структура сообщения канала, соответствующая EntryProtocol.protod, выглядит следующим образом:

Entry  
    Header  
        logfileName [binlog文件名]  
        logfileOffset [binlog position]  
        executeTime [binlog里记录变更发生的时间戳,精确到秒]  
        schemaName   
        tableName  
        eventType [insert/update/delete类型]  
    entryType   [事务头BEGIN/事务尾END/数据ROWDATA]  
    storeValue  [byte数据,可展开,对应的类型为RowChange]  
      
RowChange  
    isDdl       [是否是ddl变更操作,比如create table/drop table]  
    sql         [具体的ddl sql]  
    rowDatas    [具体insert/update/delete的变更数据,可为多条,1个binlog event事件可对应多条变更,比如批处理]  
        beforeColumns [Column类型的数组,变更前的数据字段]  
        afterColumns [Column类型的数组,变更后的数据字段]  
          
Column   
    index         
    sqlType     [jdbc type]  
    name        [column name]  
    isKey       [是否为主键]  
    updated     [是否发生过变更]  
    isNull      [值是否为null]  
    value       [具体的内容,注意为string文本]

В SessionHandler, когда сервер обрабатывает другие типы запросов от клиента, он будет вызывать соответствующие методы CanalServerWithEmbedded:

case SUBSCRIPTION:
        Sub sub = Sub.parseFrom(packet.getBody());
        embeddedServer.subscribe(clientIdentity);
case GET:
        Get get = CanalPacket.Get.parseFrom(packet.getBody());
        message = embeddedServer.getWithoutAck(clientIdentity, get.getFetchSize());
case CLIENTACK:
        ClientAck ack = CanalPacket.ClientAck.parseFrom(packet.getBody());
        embeddedServer.ack(clientIdentity, ack.getBatchId());
case CLIENTROLLBACK:
        ClientRollback rollback = CanalPacket.ClientRollback.parseFrom(packet.getBody());
        embeddedServer.rollback(clientIdentity);// 回滚所有批次

CanalServerWithEmbedded

CanalServer содержит несколько экземпляров, а его переменная-член canalInstances записывает отношение сопоставления между именами экземпляров и экземплярами.
Поскольку это карта, экземпляры с одним и тем же именем не допускаются на одном сервере.Например, на одном сервере не может быть двух экземпляров одновременно.

public class CanalServerWithEmbedded extends AbstractCanalLifeCycle implements CanalServer, CanalService {
    private Map<String, CanalInstance> canalInstances;
    private CanalInstanceGenerator     canalInstanceGenerator;
}

На следующем рисунке показано, что у сервера есть два экземпляра, и каждый клиент подключен к экземпляру.
Каждый экземпляр Canal моделируется как ведомое устройство MySQL, поэтому slaveId каждого экземпляра должен быть другим. Например, идентификаторы двух экземпляров на рисунке равны 1234 и 1235 соответственно.

Обратите внимание, что каждый клиент Canal здесь соответствует экземпляру, и каждый клиент будет указывать пункт назначения при запуске, и этот пункт назначения представляет имя экземпляра.
Поэтому, когда CanalServerWithEmbedded обрабатывает различные запросы, в параметрах есть ClientIdentity.Получите назначение из ClientIdentity, и вы сможете получить соответствующий CanalInstance

Ниже приведен пример метода подписки CanalServerWithEmbedded:

public void subscribe(ClientIdentity clientIdentity) throws CanalServerException {
    // ClientIdentity表示Canal Client客户端,从中可以获取出客户端指定连接的Destination
    // 由于CanalServerWithEmbedded记录了每个Destination对应的Instance,可以获取客户端对应的Instance
    CanalInstance canalInstance = canalInstances.get(clientIdentity.getDestination());
    if (!canalInstance.getMetaManager().isStart()) {
        canalInstance.getMetaManager().start(); // 启动Instance的元数据管理器
    }
    canalInstance.getMetaManager().subscribe(clientIdentity); // 执行一下meta订阅
    Position position = canalInstance.getMetaManager().getCursor(clientIdentity);
    if (position == null) {
        position = canalInstance.getEventStore().getFirstPosition();// 获取一下store中的第一条
        if (position != null) {
            canalInstance.getMetaManager().updateCursor(clientIdentity, position); // 更新一下cursor
        }
    }
    // 通知下订阅关系变化
    canalInstance.subscribeChange(clientIdentity);
}

Каждый CanalInstance включает четыре компонента: EventParser, EventSink, EventStore и MetaManager.

Основные методы обработки на стороне сервера включают get/ack/rollback, Эти три метода будут использовать несколько внутренних компонентов выше Instance, в основном EventStore и MetaManager:

Перед этим мы должны сначала понять значение EventStore.EventStore — это RingBuffer с тремя указателями: Put, Get и Ack.

  • Put: после того, как Canal Server извлекает данные из MySQL, они помещаются в память, а значение Put увеличивается.
  • Get: Потребитель (Canal Client) потребляет данные из памяти, Get увеличивает
  • Ack: потребление потребителя завершено, и Ack увеличивается. И удалит данные, которые были Ack in Put

Связь между этими тремя операциями и компонентом Instance следующая:

Клиент может получить бинарный журнал mysql через сервер канала несколькими способами (метод get и getWithoutAck):

  • Если тайм-аут равен нулю, метод tryGet используется для его немедленного получения.
  • если тайм-аут не равен нулю
    1. Если тайм-аут равен 0, метод блокировки get используется для получения данных без установки тайм-аута, и он не вернется, пока не будет достаточно данных batchSize.
    2. Если тайм-аут не равен 0, для получения данных используется метод get+timeout. По истечении тайм-аута не хватает данных в batchSize. Сколько возвращается?
private Events<Event> getEvents(CanalEventStore eventStore, Position start, int batchSize, Long timeout,
                                TimeUnit unit) {
    if (timeout == null) {
        return eventStore.tryGet(start, batchSize); // 即时获取
    } else if (timeout <= 0){
        return eventStore.get(start, batchSize); // 阻塞获取
    } else {
        return eventStore.get(start, batchSize, timeout, unit); // 异步获取
    }
}

Примечание. В реализации EventStore используется кольцевой буфер RingBuffer, подобный Disruptor. Класс реализации RingBuffer — MemoryEventStoreWithBuffer.

Разница между методом get и методом getWithoutAck заключается в следующем:

  • Метод get немедленно вызовет ack
  • метод getWithoutAck не вызывает ack

EventStore

В качестве примера возьмем 10 фрагментов данных, изначально текущий = -1, первый элемент начинается следующим = 0, конец = 9, цикл[0,9]Все элементы.
Элементы списка: (A,B,C,D,E,F,G,H,I,J)

next entries[next] next-current-1 list element
0 entries[0] 0-(-1)-1=0 A
1 entries[1] 1-(-1)-1=1 B
2 entries[2] 2-(-1)-1=2 C
3 entries[3] 3-(-1)-1=3 D
. ……. ……. .
9 entries[9] 9-(-1)-1=9 J

После размещения первой партии из 10 элементов для putSequence устанавливается значение end=9. Допустим, вторая партия кладет еще 5 элементов: (K, L, M, N, O)

current=9, start next=9+1=10, end=9+5=14, после завершения Put для putSequence устанавливается значение end=14.

next entries[next] next-current-1 list element
10 entries[10] 10-(9)-1=0 K
11 entries[11] 11-(9)-1=1 L
12 entries[12] 12-(9)-1=2 M
13 entries[13] 13-(9)-1=3 N
14 entries[14] 14-(9)-1=3 O

Если предположить, что максимальный размер кольцевого буфера равен 15 (16 МБ в исходном коде), то два вышеупомянутых пакета генерируют в общей сложности 15 элементов, которые просто заполняют кольцевой буфер.
Если приходит другое событие Put из-за того, что кольцевой буфер заполнен и нет доступного слота, операция Put будет заблокирована до тех пор, пока она не будет использована.

Ниже приведен код для заполнения кольцевого буфера Put, проверки доступных слотов (метод checkFreeSlotAt) в нескольких методах put.

public class MemoryEventStoreWithBuffer extends AbstractCanalStoreScavenge implements CanalEventStore<Event>, CanalStoreScavenge {
    private static final long INIT_SQEUENCE = -1;
    private int               bufferSize    = 16 * 1024;
    private int               bufferMemUnit = 1024;                         // memsize的单位,默认为1kb大小
    private int               indexMask;
    private Event[]           entries;

    // 记录下put/get/ack操作的三个下标
    private AtomicLong        putSequence   = new AtomicLong(INIT_SQEUENCE); // 代表当前put操作最后一次写操作发生的位置
    private AtomicLong        getSequence   = new AtomicLong(INIT_SQEUENCE); // 代表当前get操作读取的最后一条的位置
    private AtomicLong        ackSequence   = new AtomicLong(INIT_SQEUENCE); // 代表当前ack操作的最后一条的位置

    // 启动EventStore时,创建指定大小的缓冲区,Event数组的大小是16*1024
    // 也就是说算个数的话,数组可以容纳16000个事件。算内存的话,大小为16MB
    public void start() throws CanalStoreException {
        super.start();
        indexMask = bufferSize - 1;
        entries = new Event[bufferSize];
    }

    // EventParser解析后,会放入内存中(Event数组,缓冲区)
    private void doPut(List<Event> data) {
        long current = putSequence.get(); // 取得当前的位置,初始时为-1,第一个元素为-1+1=0
        long end = current + data.size(); // 最末尾的位置,假设Put了10条数据,end=-1+10=9
        // 先写数据,再更新对应的cursor,并发度高的情况,putSequence会被get请求可见,拿出了ringbuffer中的老的Entry值
        for (long next = current + 1; next <= end; next++) {
            entries[getIndex(next)] = data.get((int) (next - current - 1));
        }
        putSequence.set(end);
    } 
}

Put — это данные о производстве, Get — это данные о потреблении, а Get не должен превышать Put. Например, если вы поместите 10 фрагментов данных, Get может получить не более 10 фрагментов данных. Но иногда, чтобы обеспечить скорость обработки Get, Put и Get не равны.
Вы можете думать о Put как о производителе, а о Get как о потребителе. Производители могут быть быстрыми, а потребители могут потреблять медленно. Например, Поместите 1000 фрагментов данных и Получите, нам нужно каждый раз обрабатывать только 10 фрагментов данных.

Предыдущий пример все еще используется для иллюстрации процесса Get.Первоначально current=-1, предполагая, что два пакета данных помещаются в общей сложности 15 штук, maxAbleSequence=14, а BatchSize Get принимается равным 10.
Первоначально next=current=-1, end=-1. По startPosition будет установлено next=0. Наконец, end присваивается значение 9, то есть циклический буфер [0,9] имеет всего 10 элементов.

private Events<Event> doGet(Position start, int batchSize) throws CanalStoreException {
    LogPosition startPosition = (LogPosition) start;

    long current = getSequence.get();
    long maxAbleSequence = putSequence.get();
    long next = current;
    long end = current;
    // 如果startPosition为null,说明是第一次,默认+1处理
    if (startPosition == null || !startPosition.getPostion().isIncluded()) { // 第一次订阅之后,需要包含一下start位置,防止丢失第一条记录
        next = next + 1;
    }

    end = (next + batchSize - 1) < maxAbleSequence ? (next + batchSize - 1) : maxAbleSequence;
    // 提取数据并返回
    for (; next <= end; next++) {
        Event event = entries[getIndex(next)];
        if (ddlIsolation && isDdl(event.getEntry().getHeader().getEventType())) {
            // 如果是ddl隔离,直接返回
            if (entrys.size() == 0) {
                entrys.add(event);// 如果没有DML事件,加入当前的DDL事件
                end = next; // 更新end为当前
            } else {
                // 如果之前已经有DML事件,直接返回了,因为不包含当前next这记录,需要回退一个位置
                end = next - 1; // next-1一定大于current,不需要判断
            }
            break;
        } else {
            entrys.add(event);
        }
    }
    // 处理PositionRange,然后设置getSequence为end
    getSequence.compareAndSet(current, end)
}

Верхним пределом операции подтверждения является получение.Предполагая, что 15 элементов данных были отправлены и 10 элементов данных были получены, только 10 элементов данных могут быть подтверждены максимум. Целью Ack является очистка данных, которые были получены в буфере.

public void ack(Position position) throws CanalStoreException {
    cleanUntil(position);
}

public void cleanUntil(Position position) throws CanalStoreException {
    long sequence = ackSequence.get();
    long maxSequence = getSequence.get();

    boolean hasMatch = false;
    long memsize = 0;
    for (long next = sequence + 1; next <= maxSequence; next++) {
        Event event = entries[getIndex(next)];
        memsize += calculateSize(event);
        boolean match = CanalEventUtils.checkPosition(event, (LogPosition) position);
        if (match) {// 找到对应的position,更新ack seq
            hasMatch = true;

            if (batchMode.isMemSize()) {
                ackMemSize.addAndGet(memsize);
                // 尝试清空buffer中的内存,将ack之前的内存全部释放掉
                for (long index = sequence + 1; index < next; index++) {
                    entries[getIndex(index)] = null;// 设置为null
                }
            }

            ackSequence.compareAndSet(sequence, next)
        }
    }
}

Реализация метода отката rollback относительно проста, он возвращает getSequence в позицию подтверждения.

public void rollback() throws CanalStoreException {
    getSequence.set(ackSequence.get());
    getMemSize.set(ackMemSize.get());
}

На следующем рисунке показаны несколько примеров работы RingBuffer:

EventParser WorkFlow

EventStore отвечает за хранение проанализированных событий Binlog, а действие парсинга отвечает за извлечение Binlog.Этот процесс более сложен. Требуется взаимодействие с MetaManager.
Например, необходимо записывать позицию каждой тяги, чтобы в следующий раз можно было продолжить тягу с последней позиции предыдущего раза. Таким образом, MetaManager должен иметь состояние.

Процесс EventParser выглядит следующим образом:

  1. Соединение получает местоположение, в котором последний синтаксический анализ был успешным (если оно запускается в первый раз, оно получает изначально указанное местоположение или местоположение binlog текущей базы данных)
  2. Соединение устанавливает связь и отправляет команду BINLOG_DUMP.
  3. Mysql начинает проталкивать Binaly Log
  4. Полученный бинарный журнал анализируется синтаксическим анализатором Binlog, и добавляется некоторая конкретная информация.
  5. Передается в модуль EventSink для хранения данных, это блокирующая операция до тех пор, пока сохранение не будет успешным.
  6. После успешного сохранения регулярно записывайте положение Binaly Log.

Упомянутая выше связь относится к реализацииErosaConnectionинтерфейсMysqlConnection.
EventParserКласс реализации реализованAbstractEventParserизMysqlEventParser.

EventParserPass после разбора binlogEventSinkнаписатьEventStore, эту ссылку можно соединить последовательно через метод put EventStore:

На самом деле есть еще и буфер EventTransactionBuffer, то есть после разбора парсером он кладется в буфер первым.
Когда происходит транзакция или данные превышают порог, выполняется операция обновления: то есть данные в буфере потребления помещаются в EventStore.
Этот буфер имеет два указателя смещения: putSequence и flushSequence.

Canal HA

Смоделируйте два сервера Canal на одном компьютере, скопируйте автономный режим в две папки и измените соответствующую конфигурацию.

canal_m/conf/canal.properties

canal.id= 2
canal.ip=
canal.port= 11112
canal.zkServers=localhost:2181
canal.instance.global.spring.xml = classpath:spring/default-instance.xml

canal_m/conf/example/instance.properties

canal.instance.mysql.slaveId = 1235

canal_s

canal.id= 3
canal.ip=
canal.port= 11113
canal.zkServers=localhost:2181
canal.instance.global.spring.xml = classpath:spring/default-instance.xml

canal_s/conf/example/instance.properties

canal.instance.mysql.slaveId = 1236

начать canal_m

2017-10-12 14:51:45.202 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## start the canal server.
2017-10-12 14:51:45.776 [main] INFO  com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[192.168.6.52:11112]
2017-10-12 14:51:46.687 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ......

начать canal_s

2017-10-12 14:52:18.999 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## start the canal server.
2017-10-12 14:52:19.208 [main] INFO  com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[192.168.6.52:11113]
2017-10-12 14:52:19.364 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ......

Мастер предоставляет услуги, в canal_m/logs/example/example.log есть логи, а в canal_s/logs нет папки example

[qihuang.zheng@dp0652 ~]$ tail -f canal_m/logs/example/example.log
2017-10-12 14:51:46.453 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]
2017-10-12 14:51:46.463 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties]
2017-10-12 14:51:46.624 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example
2017-10-12 14:51:46.644 [main] INFO  c.a.otter.canal.instance.core.AbstractCanalInstance - start successful....
2017-10-12 14:51:46.658 [destination = example , address = /127.0.0.1:3306 , EventParser] WARN  c.a.otter.canal.parse.inbound.mysql.MysqlEventParser - prepare to find start position just show master status

Посмотреть информацию, записанную Canal HA в ZK

[zk: 192.168.6.52:2181(CONNECTED) 7] ls /otter/canal/destinations/example/cluster
[192.168.6.52:11112, 192.168.6.52:11113]

[zk: 192.168.6.52:2181(CONNECTED) 10] get /otter/canal/destinations/example/running
{"active":true,"address":"192.168.6.52:11112","cid":2}

Запустите ClusterCanalClientTest примера

CanalConnector connector = CanalConnectors.newClusterConnector("192.168.6.52:2181", destination, "canal", "canal");

Выполнить SQL:update test set name = 'zqh' where uid=1;, консоль выводит журнал следующим образом:

****************************************************
* Batch Id: [1] ,count : [3] , memsize : [203] , Time : 2017-10-12 15:05:20
* Start : [mysql-bin.000004:1151:1507791918000(2017-10-12 15:05:18)] 
* End : [mysql-bin.000004:1331:1507791918000(2017-10-12 15:05:18)] 
****************************************************

================> binlog[mysql-bin.000004:1151] , executeTime : 1507791918000 , delay : 2080ms
 BEGIN ----> Thread id: 763
----------------> binlog[mysql-bin.000004:1277] , name[canal_test,test] , eventType : UPDATE , executeTime : 1507791918000 , delay : 2092ms
uid : 1    type=int(4)
name : zqh    type=varchar(10)    update=true
----------------
 END ----> transaction id: 0
================> binlog[mysql-bin.000004:1331] , executeTime : 1507791918000 , delay : 2130ms

Еще раз проверьте информацию о клиенте, записанную в ZK:

  • Экземпляр соответствует клиенту, где имя экземпляра — пример, а соответствующий номер клиента — 1001.
  • Чтобы убедиться, что Экземпляр действительно подключен указанным Клиентом, просмотрите порт 11112 на Сервере.
[zk: 192.168.6.52:2181(CONNECTED) 18] get /otter/canal/destinations/example/1001/running
{"active":true,"address":"10.57.241.44:53942","clientId":1001}

[zk: 192.168.6.52:2181(CONNECTED) 19] get /otter/canal/destinations/example/1001/cursor
{"@type":"com.alibaba.otter.canal.protocol.position.LogPosition",
"identity":{"slaveId":-1,"sourceAddress":{"address":"localhost","port":3306}},
"postion":{"included":false,"journalName":"mysql-bin.000004","position":1331,"serverId":1,"timestamp":1507791918000}} ==》serverId表示MySQL的server_id

[qihuang.zheng@dp0652 ~]$ netstat -anpt|grep 11112
tcp        0      0 0.0.0.0:11112               0.0.0.0:*                   LISTEN      27816/java   ==》Canal服务端
tcp        0     19 192.168.6.52:11112          10.57.241.44:53942          ESTABLISHED 27816/java   ==》Canal客户端

остановка canal_m

[qihuang.zheng@dp0652 canal_m]$ bin/stop.sh
dp0652: stopping canal 27816 ...
Oook! cost:1

Экземпляр будет запущен на ведомом узле, т.е. canal_s

[qihuang.zheng@dp0652 ~]$ tail -f canal_s/logs/example/example.log
2017-10-12 15:17:21.452 [New I/O server worker #1-1] ERROR com.alibaba.otter.canal.server.netty.NettyUtils - ErrotCode:400 , Caused by :
something goes wrong with channel:[id: 0x0c182149, /10.57.241.44:54008 => /192.168.6.52:11113], exception=com.alibaba.otter.canal.server.exception.CanalServerException: destination:example should start first

2017-10-12 15:17:21.661 [pool-1-thread-1] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]
2017-10-12 15:17:21.663 [pool-1-thread-1] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties]
2017-10-12 15:17:21.767 [pool-1-thread-1] WARN  org.springframework.beans.TypeConverterDelegate - PropertyEditor [com.sun.beans.editors.EnumEditor] found through deprecated global PropertyEditorManager fallback - consider using a more isolated form of registration, e.g. on the BeanWrapper/BeanFactory!
2017-10-12 15:17:21.968 [pool-1-thread-1] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example
2017-10-12 15:17:21.998 [pool-1-thread-1] INFO  c.a.otter.canal.instance.core.AbstractCanalInstance - start successful....
2017-10-12 15:17:22.071 [destination = example , address = /127.0.0.1:3306 , EventParser] WARN  c.a.otter.canal.parse.inbound.mysql.MysqlEventParser - prepare to find start position just last position
 {"identity":{"slaveId":-1,"sourceAddress":{"address":"localhost","port":3306}},"postion":{"included":false,"journalName":"mysql-bin.000004","position":1331,"serverId":1,"timestamp":1507791918000}}

После остановки canal_m остается только canal_s, поэтому кластер Canal имеет только один узел:

[zk: 192.168.6.52:2181(CONNECTED) 14] ls /otter/canal/cluster
[192.168.6.52:11113]

[zk: 192.168.6.52:2181(CONNECTED) 5] get /otter/canal/destinations/example/running
{"active":true,"address":"192.168.6.52:11113","cid":3}

В процессе переключения лог клиента

2017-10-12 15:17:22.524 [Thread-2] WARN  c.alibaba.otter.canal.client.impl.ClusterCanalConnector - failed to connect to:/192.168.6.52:11113 after retry 0 times
2017-10-12 15:17:22.529 [Thread-2] WARN  c.a.otter.canal.client.impl.running.ClientRunningMonitor - canal is not run any in node
2017-10-12 15:17:27.695 [Thread-2] INFO  c.alibaba.otter.canal.client.impl.ClusterCanalConnector - restart the connector for next round retry.

****************************************************
* Batch Id: [1] ,count : [1] , memsize : [75] , Time : 2017-10-12 15:17:27
* Start : [mysql-bin.000004:1331:1507791918000(2017-10-12 15:05:18)] 
* End : [mysql-bin.000004:1331:1507791918000(2017-10-12 15:05:18)] 
****************************************************
----------------
 END ----> transaction id: 0
================> binlog[mysql-bin.000004:1331] , executeTime : 1507791918000 , delay : 729763ms

Выполните оператор SQL еще раз

****************************************************
* Batch Id: [2] ,count : [3] , memsize : [198] , Time : 2017-10-12 15:20:56
* Start : [mysql-bin.000004:1406:1507792855000(2017-10-12 15:20:55)] 
* End : [mysql-bin.000004:1581:1507792855000(2017-10-12 15:20:55)] 
****************************************************

================> binlog[mysql-bin.000004:1406] , executeTime : 1507792855000 , delay : 1539ms
 BEGIN ----> Thread id: 763
----------------> binlog[mysql-bin.000004:1532] , name[canal_test,test] , eventType : UPDATE , executeTime : 1507792855000 , delay : 1539ms
uid : 1    type=int(4)
name : zqhx    type=varchar(10)    update=true
----------------
 END ----> transaction id: 0
================> binlog[mysql-bin.000004:1581] , executeTime : 1507792855000 , delay : 1540ms

После остановки клиента запросите информацию о клиенте в ZK. Обратите внимание, что информация о курсоре все еще есть, но не работает, потому что у экземпляра нет соответствующего клиента.

[zk: 192.168.6.52:2181(CONNECTED) 1] ls /otter/canal/destinations/example
[running, cluster, 1001]

[zk: 192.168.6.52:2181(CONNECTED) 0] ls /otter/canal/destinations/example/1001
[cursor]

[zk: 192.168.6.52:2181(CONNECTED) 6] get /otter/canal/destinations/example/1001/cursor
{"@type":"com.alibaba.otter.canal.protocol.position.LogPosition",
"identity":{"slaveId":-1,"sourceAddress":{"address":"localhost","port":3306}},
"postion":{"included":false,"journalName":"mysql-bin.000004","position":1581,"serverId":1,"timestamp":1507792855000}}

Информация о курсоре — это место, где экземпляр потребляет бинлог.Даже если клиент останавливается, он все равно остается в zk.

Примечание. 1001 — это фиксированное число ClientIdentity, и соответствующий исходный код находится в методе построения SimpleCanalConnector.

Ниже приводится сводка соответствующих записей в zk:

/otter/canal/
  |- cluster          ==> [192.168.6.52:11112, 192.168.6.52:11113]
  |- destinations     ==> instances
     |- example1/     ==> instance name
     |  |- cluster    ==> [192.168.6.52:11112, 192.168.6.52:11113]
     |  |- running    ==> {"active":true,"address":"192.168.6.52:11112","cid":2}
     |  |- 1001
     |     |-running  ==> {"active":true,"address":"10.57.241.44:53942","clientId":1001}
     |     |- cursor  ==> {localhost:3306,"journalName":"mysql-bin.000004","position":1331,"serverId":1}
     |- example2/
     |  |- cluster    ==> [192.168.6.52:11112, 192.168.6.52:11113]
     |  |- running    ==> {"active":true,"address":"192.168.6.52:11112","cid":2}
     |  |- 1001
     |     |-running  ==> {"active":true,"address":"10.57.241.44:53942","clientId":1001}
     |     |- cursor  ==> {localhost:3306,"journalName":"mysql-bin.000004","position":1331,"serverId":1}

На следующем рисунке представлена ​​блок-схема Canal Server HA:

  1. Когда сервер канала хочет запустить экземпляр канала, он сначала пытается начать оценку с помощью zookeeper (реализация: создать EPHEMERAL узел, которому будет разрешен запуск любого успешно созданного узла)
  2. После успешного создания узла zookeeper соответствующий сервер канала запустит соответствующий экземпляр канала.Если экземпляр канала не будет успешно создан, он будет находиться в состоянии ожидания.
  3. Как только zookeeper обнаруживает, что узел, созданный сервером каналов A, исчезает, он немедленно информирует другие серверы каналов о необходимости повторного выполнения операции на шаге 1 и повторно выбирает сервер каналов для запуска экземпляра.
  4. Каждый раз, когда клиент канала подключается, он сначала спрашивает у зоопарка, кто запустил инстанс канала, а затем устанавливает с ним линк.

Canal Client HA

Метод клиента канала аналогичен методу сервера канала, и он также контролируется способом вытеснения узлов EPHEMERA зоопарка.

Для проверки Canal Client HA вы можете обратиться к:blog.CSDN.net/Сяо Линьцзы 00…

  • Запустите несколько клиентов одновременно в IDEA, выполните оператор SQL, один из клиентов распечатает журнал, а другой нет.
  • Остановить клиент.
  • Выполните оператор SQL еще раз, другой клиент распечатает журнал

Журнал Client1:

****************************************************
* Batch Id: [3] ,count : [3] , memsize : [198] , Time : 2017-10-12 17:59:59
* Start : [mysql-bin.000004:1656:1507802398000(2017-10-12 17:59:58)] 
* End : [mysql-bin.000004:1831:1507802398000(2017-10-12 17:59:58)] 
****************************************************

================> binlog[mysql-bin.000004:1656] , executeTime : 1507802398000 , delay : 1188ms
 BEGIN ----> Thread id: 768
----------------> binlog[mysql-bin.000004:1782] , name[canal_test,test] , eventType : UPDATE , executeTime : 1507802398000 , delay : 1199ms
uid : 1    type=int(4)
name : zqh    type=varchar(10)    update=true
----------------
 END ----> transaction id: 0
================> binlog[mysql-bin.000004:1831] , executeTime : 1507802398000 , delay : 1236ms
## stop the canal client## canal client is down.

После остановки Client1 журнал Client2:

****************************************************
* Batch Id: [4] ,count : [3] , memsize : [198] , Time : 2017-10-12 18:02:15
* Start : [mysql-bin.000004:1906:1507802534000(2017-10-12 18:02:14)] 
* End : [mysql-bin.000004:2081:1507802534000(2017-10-12 18:02:14)] 
****************************************************

================> binlog[mysql-bin.000004:1906] , executeTime : 1507802534000 , delay : 1807ms
 BEGIN ----> Thread id: 768
----------------> binlog[mysql-bin.000004:2032] , name[canal_test,test] , eventType : UPDATE , executeTime : 1507802534000 , delay : 1819ms
uid : 1    type=int(4)
name : zqhx    type=varchar(10)    update=true
----------------
 END ----> transaction id: 0
================> binlog[mysql-bin.000004:2081] , executeTime : 1507802534000 , delay : 1855ms

Наблюдайте за клиентским узлом, соответствующим экземпляру в узле ZK, который будет изменен при переключении клиента.
Например, клиент ниже переключается с порта 56806 на порт 56842.
После закрытия всех клиентов запуска под 1001 нет. Указывает, что у экземпляра нет клиента, использующего binlog.

[zk: 192.168.6.52:2181(CONNECTED) 29] get /otter/canal/destinations/example/1001/running
{"active":true,"address":"10.57.241.44:56806","clientId":1001}

[zk: 192.168.6.52:2181(CONNECTED) 30] get /otter/canal/destinations/example/1001/running
Node does not exist: /otter/canal/destinations/example/1001/running

[zk: 192.168.6.52:2181(CONNECTED) 31] get /otter/canal/destinations/example/1001/running
{"active":true,"address":"10.57.241.44:56842","clientId":1001}

[zk: 192.168.6.52:2181(CONNECTED) 32] ls /otter/canal/destinations/example/1001
[cursor]

Конкретными классами, связанными с реализацией, являются: ClientRunningMonitor/ClientRunningListener/ClientRunningData.

Клиент, выполняющий связанный контроль, в основном для решения собственного механизма аварийного переключения клиента.
Клиент канала позволяет запускать несколько клиентов канала одновременно.Благодаря работающему механизму гарантируется, что работает только один клиент, а остальные клиенты используются в качестве холодного резерва.
Когда работающий клиент зависает, запуск будет контролировать холодный резервный клиент, чтобы переключиться в рабочий режим.
Это гарантирует, что клиент канала не будет одной точкой, а обеспечивает высокую доступность всей системы.

В левой части рисунка ниже показана реализация высокой доступности клиента, а в правой части — реализация высокой доступности сервера.

Develop Canal Client

Сначала поймите:GitHub.com/Alibaba/Misery…

subscribe change

Посмотрите еще раз на метод подписки CanalServerWithEmbedded. Мы знаем, что клиент вызовет метод subscribe() сразу после подключения к месту назначения на сервере.

Когда клиент подключается к серверу, он должен указать имя получателя, поскольку у сервера может быть несколько назначений. Например, сервер запускает два Экземпляра, и имена их назначения — example1 и example2 соответственно.
Предположим, есть два клиента A и B, A подключен к example1, а B подключен к example2. Словарь canalInstances сервера: {example1=>Instance1, example2->Instance2}.
Тогда назначение ClientA равно example1, а соответствующий экземпляр сервера — Instance1. Место назначения ClientB равно example2, а соответствующий экземпляр сервера — Instance3.

/**
 * 客户端订阅,重复订阅时会更新对应的filter信息
 */
public void subscribe(ClientIdentity clientIdentity) throws CanalServerException {
    CanalInstance canalInstance = canalInstances.get(clientIdentity.getDestination());
    if (!canalInstance.getMetaManager().isStart()) {
        canalInstance.getMetaManager().start();
    }

    canalInstance.getMetaManager().subscribe(clientIdentity); // 执行一下meta订阅

    // 根据Client从MetaManager中获取最近一次的Cursor
    Position position = canalInstance.getMetaManager().getCursor(clientIdentity);
    if (position == null) { // 如果没有
        position = canalInstance.getEventStore().getFirstPosition();// 获取一下store中的第一条
        if (position != null) {
            canalInstance.getMetaManager().updateCursor(clientIdentity, position); // 更新一下cursor
        }
        logger.info("subscribe successfully, {} with first position:{} ", clientIdentity, position);
    } else { // 有就直接使用
        logger.info("subscribe successfully, use last cursor position:{} ", clientIdentity, position);
    }

    // 通知下订阅关系变化
    canalInstance.subscribeChange(clientIdentity);
}

О методе подписки есть два места: сам CanalInstance вызывает subscribeChange, а связанный с ним MetaManager также вызывает метод подписки.

CanalServer может иметь несколько экземпляров CanalInstance, и каждый экземпляр будет иметь MetaManager. Экземпляр соответствует клиенту.
Таким образом, у МетаМенеджера будет только один Клиент. Но из следующей структуры данных кажется, что MetaManager может иметь несколько пунктов назначения.

public class MemoryMetaManager extends AbstractCanalLifeCycle implements CanalMetaManager {
    protected Map<String, List<ClientIdentity>>              destinations;
    protected Map<ClientIdentity, MemoryClientIdentityBatch> batches;
    protected Map<ClientIdentity, Position>                  cursors;

    public synchronized void subscribe(ClientIdentity clientIdentity) throws CanalMetaManagerException {
        List<ClientIdentity> clientIdentitys = destinations.get(clientIdentity.getDestination());
        if (clientIdentitys.contains(clientIdentity)) {
            clientIdentitys.remove(clientIdentity);
        }
        clientIdentitys.add(clientIdentity);
    }
}
Угадайте: несколько клиентов могут быть подключены к одному и тому же экземпляру (хотя будет работать только один экземпляр), поэтому один MetaManager может управлять несколькими клиентами.
НЕТ! HA Клиента отличается от Клиента, записанного MetaManager. HA означает, что одновременно работает только один Клиент, поэтому MetaManager не может одновременно записывать двух Клиентов.

В официальном документе ClientAPI: ClientIdentity — это идентификатор взаимодействия между клиентом и сервером канала.В настоящее время clientId жестко запрограммирован как 1001.
(В настоящее время экземпляр на сервере канала может использоваться только одним клиентом. Дизайн clientId зарезервирован для режима потребления одного экземпляра с несколькими клиентами, что в настоящее время не требуется.)

То есть: экземпляр все еще может иметь несколько подключенных клиентов.

Почему структура данных здесь разработана таким образом, вам также нужно обратиться к методу doSubscribeTest класса AbstractMetaManagerTest, чтобы понять.

На один и тот же пункт назначения могут быть подписаны разные клиенты. В следующем примере выполняется подписка на [client1, client2] и [client1, client3] соответственно.

public void doSubscribeTest(CanalMetaManager metaManager) {
    ClientIdentity client1 = new ClientIdentity(destination, (short) 1);
    metaManager.subscribe(client1);
    metaManager.subscribe(client1); // 重复调用:删除旧的client1,并继续增加新的client1
    ClientIdentity client2 = new ClientIdentity(destination, (short) 2);
    metaManager.subscribe(client2);

    List<ClientIdentity> clients = metaManager.listAllSubscribeInfo(destination);
    Assert.assertEquals(Arrays.asList(client1, client2), clients);

    metaManager.unsubscribe(client2);
    ClientIdentity client3 = new ClientIdentity(destination, (short) 3);
    metaManager.subscribe(client3);

    clients = metaManager.listAllSubscribeInfo(destination);
    Assert.assertEquals(Arrays.asList(client1, client3), clients);
}

Метод подписки CanalServerWithEmbedded, наконец, вызовет метод subscribeChange класса AbstractCanalInstance.
Здесь будет установлен фильтр имени таблицы, а также черный список. Элементы конфигурации находятся в instance.properties.

# table regex
canal.instance.filter.regex = .*\\..*
# table black regex
canal.instance.filter.black.regex =

Фильтр указывает, какие таблицы binlogs MySQL должны быть получены клиентом через Canal Server.Приведенный выше элемент конфигурации указывает, что все таблицы должны быть получены.

public class AbstractCanalInstance extends AbstractCanalLifeCycle implements CanalInstance {
    protected Long                                   canalId;                                                      // 和manager交互唯一标示
    protected String                                 destination;                                                  // 队列名字
    protected CanalEventStore<Event>                 eventStore;                                                   // 有序队列

    protected CanalEventParser                       eventParser;                                                  // 解析对应的数据信息
    protected CanalEventSink<List<CanalEntry.Entry>> eventSink;                                                    // 链接parse和store的桥接器
    protected CanalMetaManager                       metaManager;                                                  // 消费信息管理器
    protected CanalAlarmHandler                      alarmHandler;                                                 // alarm报警机制

    @Override
    public boolean subscribeChange(ClientIdentity identity) {
        if (StringUtils.isNotEmpty(identity.getFilter())) {
            logger.info("subscribe filter change to " + identity.getFilter());
            AviaterRegexFilter aviaterFilter = new AviaterRegexFilter(identity.getFilter());

            boolean isGroup = (eventParser instanceof GroupEventParser);
            if (isGroup) {
                // 处理group的模式
                List<CanalEventParser> eventParsers = ((GroupEventParser) eventParser).getEventParsers();
                for (CanalEventParser singleEventParser : eventParsers) {// 需要遍历启动
                    ((AbstractEventParser) singleEventParser).setEventFilter(aviaterFilter);
                }
            } else {
                ((AbstractEventParser) eventParser).setEventFilter(aviaterFilter);
            }
        }

        // filter的处理规则
        // a. parser处理数据过滤处理
        // b. sink处理数据的路由&分发,一份parse数据经过sink后可以分发为多份,每份的数据可以根据自己的过滤规则不同而有不同的数据
        // 后续内存版的一对多分发,可以考虑
        return true;
    }
}

Соответственно в EventParser есть две ссылки на Filter. Например, описанный выше метод eventParser.setEventFilter() установит eventFilter класса AbstractEventParser.

public abstract class AbstractEventParser<EVENT> extends AbstractCanalLifeCycle implements CanalEventParser<EVENT> {
    protected CanalLogPositionManager                logPositionManager         = null;
    protected CanalEventSink<List<CanalEntry.Entry>> eventSink                  = null;
    protected CanalEventFilter                       eventFilter                = null;
    protected CanalEventFilter                       eventBlackFilter           = null;
}

EventParser Implement

Метод start() класса AbstractEventParser является основным методом анализа binlog. После запуска transactionBuffer и BinLogParser фоновый рабочий поток parseThread будет запускаться постоянно:

Примечание. Следующие шаги вложены в бесконечный цикл while, и сон будет выполнен в конце.

// 开始执行replication
// 1. 构造Erosa连接
erosaConnection = buildErosaConnection();

// 2. 启动一个心跳线程
startHeartBeat(erosaConnection);

// 3. 执行dump前的准备工作
preDump(erosaConnection);

// 4. 连接MySQL数据库
erosaConnection.connect(); 

// 5. 获取最后的位置信息
EntryPosition startPosition = findStartPosition(erosaConnection);
logger.info("find start position : {}", startPosition.toString());
// 重新链接,因为在找position过程中可能有状态,需要断开后重建
erosaConnection.reconnect();

// 定义回调函数,当解析成功后,sink()方法会暂存到缓冲区transactionBuffer中。缓冲区的数据会通过心跳线程放入EventSink
final SinkFunction sinkHandler = new SinkFunction<EVENT>() {
    private LogPosition lastPosition;

    public void sink(EVENT event) {
        CanalEntry.Entry entry = parseAndProfilingIfNecessary(event);
        if (entry != null) {
            transactionBuffer.add(entry);
            this.lastPosition = buildLastPosition(entry);  // 记录一下对应的positions
        }
    }
};

// 6. 开始dump数据
if (StringUtils.isEmpty(startPosition.getJournalName()) && startPosition.getTimestamp() != null) {
    erosaConnection.dump(startPosition.getTimestamp(), sinkHandler);
} else {
    erosaConnection.dump(startPosition.getJournalName(), startPosition.getPosition(), sinkHandler);
}

Здесь erosaConnection относится к соединению Canal Server с MySQL.
Клиентское (CanalClient) соединение CanalConnector, о котором мы упоминали ранее, относится к соединению между CanalClient и CanalServer.

Соединение CanalServer с MySQL предназначено для получения пакетов дампа binlog. Существуют различные запросы (GET/ACK и т. д.) от CanalClient к CanalServer.

Мы не будем подробно разбирать процесс дампа, а посмотрим, как MySQL-реализация erosaConnection, MysqlConnection, вызывает callback-функцию после получения события.

public void dump(String binlogfilename, Long binlogPosition, SinkFunction func) throws IOException {
    updateSettings();
    sendBinlogDump(binlogfilename, binlogPosition);
    // connector指的是CanalServer到MySQL Master服务器的连接,创建一个拉取线程拉取MySQL的binlog
    DirectLogFetcher fetcher = new DirectLogFetcher(connector.getReceiveBufferSize());
    fetcher.start(connector.getChannel());
    LogDecoder decoder = new LogDecoder(LogEvent.UNKNOWN_EVENT, LogEvent.ENUM_END_EVENT);
    LogContext context = new LogContext();
    while (fetcher.fetch()) { // 由于设置了缓冲区的大小,每次dump都只会拉取一批数据
        LogEvent event = null;
        event = decoder.decode(fetcher, context);
        if (!func.sink(event)) break; // 调用回调方法
    }
}

На сервере есть поток пульса, целью которого является потребление transactionBuffer и запись его в EventSink.

protected boolean consumeTheEventAndProfilingIfNecessary(List<CanalEntry.Entry> entrys) {
    boolean result = eventSink.sink(entrys, 
        (runningInfo == null) ? null : runningInfo.getAddress(), destination);
    return result;
}

EventSink в конечном итоге будет записывать данные в EventStore, то есть помещать в RingBuffer.

eunomia

[zk: 192.168.6.55:2181(CONNECTED) 3] ls /otter/canal/destinations
[octopus_demeter, example_bak, namelist_test, xiaopang2, namelist2, xiaopang3, namelist1, example, xiaopang]

[zk: 192.168.6.55:2181(CONNECTED) 4] ls /otter/canal/destinations/xiaopang
[eunomia, cluster, 1001, running]

[zk: 192.168.6.55:2181(CONNECTED) 5] ls /otter/canal/destinations/xiaopang/eunomia
[_c_2a900d4e-75fb-4445-b30c-04e1bdb2e5d9-lock-0001381746, runnning, _c_ea33db37-9193-4c75-9e61-85e59e123109-lock-0001381738]

// Eunomia Server?还是Canal Client?
[zk: 192.168.6.55:2181(CONNECTED) 7] get /otter/canal/destinations/xiaopang/eunomia/runnning
10.57.17.100

[zk: 192.168.6.55:2181(CONNECTED) 18] get /otter/canal/destinations/xiaopang/1001/running
{"active":true,"address":"10.57.17.100:60661","clientId":1001}