Другой способ, задержка синхронизации master-slave MySQL, это решение тоже очень хорошее

Java
Другой способ, задержка синхронизации master-slave MySQL, это решение тоже очень хорошее

WX искал [внутренние дела программиста], и ответ [666] был замечательным.

1. Что такое канал?

canalЭто инфраструктура, разработанная Alibaba, которая обеспечивает подписку и потребление дополнительных данных на основе анализа добавочного журнала базы данных.JAVAразработка, в настоящее время поддерживается толькоMysqlиMariaDB(аналогично mysql).

Так что же такое инкрементный журнал базы данных?

Существует много типов журналов MySQL, в основном в том числе: журналы ошибок, журналы запросов, журналы медленных запросов, журналы транзакций и двоичные журналы. иMySQLИзменения данных, происходящие в базе данных (DML(язык манипулирования данными) язык манипулирования данными, то есть мы знакомы с добавлениями, удалениями и изменениями), будет в бинарном логе (binary log) в виде хранения.

2. Принцип канала

ПредставляемcanalПрежде чем принцип, давайте рассмотримMySQLПринцип синхронизации master-slave, который может дать вам лучшее пониманиеcanalрабочий механизм.

1. Принцип синхронизации master-slave MySQL:

Синхронизация master-slave MySQL также называется разделением чтения и записи, что может улучшить нагрузку и отказоустойчивость базы данных и обеспечить высокую доступность базы данных.

Давайте сначала проанализируем принципиальную схему синхронизации master-slave MySQL:在这里插入图片描述

Вышеуказанные фотографии взяты из Интернета, если есть какие-либо нарушения, пожалуйста, свяжитесь с нами, чтобы удалить

Процесс работы мастер-узла:

когдаmasterПосле изменения данных узла (удаления, обновления, вставки или создания функций, хранимых процедур и т. д.)binary logПишите записи в журнал, эти записи еще называют二进制日志事件(бинарные события журнала).

show binlog events 

在这里插入图片描述  Эти события последовательно записываются в бинарный журнал. Когда подчиненный узел начинает подключаться к главному узлу, главный узел открывает поток дампа binlog для подчиненного узла (отвечающего за передачу данных binlog).

Как только бинарный журнал главного узла изменится, бинарный журналПоток дампа уведомит подчиненный узел о наличии бинарных журналов, которые можно передать, и отправит соответствующее содержимое бинарного журнала на подчиненный узел.

Процесс работы подчиненного узла:

На ведомом узле будут созданы два потока: одинпоток ввода/вывода,ОдинSQL-поток. Поток ввода/вывода подключается к главному узлу, аbinlog dumpПоток отправит содержимое binlog потоку ввода-вывода.

После того, как поток ввода/вывода получает содержимое binlog, он записывает содержимое в локальный журнал ретрансляции. Поток sql читает журнал ретрансляции, записанный потоком ввода-вывода, и записывает содержимое журнала ретрансляции в подчиненную базу данных.


2, принцип канала

После понимания вышеприведенного принципа синхронизации master-slave в MySQL рабочий механизм канала становится понятным. По сути, канал имитирует протокол взаимодействия между подчиненным узлом и главным узлом в базе данных MySQL, притворяется подчиненным узлом MySQL и отправляет сообщение главному узлу MySQL.dump协议, главный узел MySQL получает запрос на создание дампа и начинает передавать двоичный журнал подчиненному узлу (то есть,canal).在这里插入图片描述

Вышеуказанные фотографии взяты из Интернета, если есть какие-либо нарушения, пожалуйста, свяжитесь с нами, чтобы удалить

Просто говорите и не практикуйте фальшивую ручку, давайте сделаем это!

В-третьих, канал реализует «мониторинг» MySQL.

Прежде чем писать код, давайте внесем некоторые изменения в MySQL, я не буду вдаваться в подробности установки MySQL, а основные операции будут выполнены.

1. Проверьте, включена ли в MySQL функция двоичного журнала.

show binary logs 

Если он не включен, он находится в состоянии, показанном на картинке.Обычные пользователи не имеют права на эту команду, но я имею, цк цк цк!在这里插入图片描述Если ручная активация не требуется иmy.cnfконфигурация в файлеbinlog-formatзаRowмодель

log-bin=mysq-bin
binlog-format=Row

log-binдаbinlogместо хранения файловbinlog-formatУстановите способ, которым MySQL реплицирует лог-бин

Три метода репликации MySQL:

Репликация на основе операторов (SBR)

  • Преимущества: sql, который изменяет данные, хранится в binlog, нет необходимости записывать каждый sql и изменения данных, объем binlog будет небольшим, накладные расходы ввода-вывода низкие, а производительность хорошая.
  • Недостаток: это приведет к несогласованности данных в master-slave.

репликация на основе строк (RBR)

  • Преимущества: Контекстная информация каждого SQL-оператора не записывается, только какие данные были изменены и что было изменено.
  • Недостатки: Объем бинарного журнала очень велик, особенно при использовании атрибута alter table будет сгенерирован большой объем данных бинарного журнала.

Смешанная репликация (MBR)

  • Соответственно есть три формата бинлога: STATEMENT, ROW, MIXED.

2. Создайте пользователя с разрешением на работу с MySQL для канала.

CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;

3. Установите канал

Адрес загрузки: https://github.com/alibaba/canal/releases

После скачивания выберите версию например: canal.deployer-xxx.tar.gz

4. Настроить канал

Для изменения файла instance.properties необходимо добавить правила мониторинга баз данных и таблиц, Canal может отслеживать всю базу данных или таблицу, что более гибко.

vim conf/example/instance.properties
#################################################
## mysql serverId
canal.instance.mysql.slaveId = 2020

# position info 修改自己的数据库(canal要监听的数据库 地址 )
canal.instance.master.address = 127.0.0.1:3306
canal.instance.master.journal.name = 
canal.instance.master.position = 
canal.instance.master.timestamp = 

#canal.instance.standby.address = 
#canal.instance.standby.journal.name =
#canal.instance.standby.position = 
#canal.instance.standby.timestamp = 

# username/password 修改成自己 数据库信息的账号 (单独开一个 准备阶段创建的账号)
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
canal.instance.defaultDatabaseName =
canal.instance.connectionCharset = UTF-8

# table regex  表的监听规则 
# canal.instance.filter.regex = blogs\.blog_info  
canal.instance.filter.regex = .\*\\\\..\*
# table black regex
canal.instance.filter.black.regex = 

пусковой канал

sh bin/startup.sh

Посмотрите журнал сервера, чтобы убедиться, что канал запускается нормально.

vi logs/canal/canal.log

Сервер Display Canal работает сейчас успешно

2020-01-08 15:25:33.361 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ##    start the canal server.
2020-01-08 15:25:33.468 [main] INFO  com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[192.168.12.245:11111]
2020-01-08 15:25:34.061 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ......

5. Напишите клиентский код Java для реализации мониторинга каналов.

импортировать пакет зависимостей

<dependency>
  <groupId>com.alibaba.otter</groupId>
  <artifactId>canal.client</artifactId>
  <version>1.1.0</version>
</dependency>

Вот простая реализация

public class MainApp {

    public static void main(String... args) throws Exception {

        /**
         * 创建与
         */
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),
                11111), "example", "", "");

        int batchSize = 1000;
        int emptyCount = 0;
        try {
            connector.connect();
            /**
             * 监控数据库中所有表
             */
            connector.subscribe(".*\\..*");
            /**
             * 指定要监控的表,库名.表名
             */
            //connector.subscribe("xin-master.jk_order");
            connector.rollback();

            //120次心跳过后未检测到,跳出
            int totalEmptyCount = 120;
            while (emptyCount < totalEmptyCount) {
                Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId == -1 || size == 0) {
                    emptyCount++;
                    System.out.println("empty count : " + emptyCount);
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                    }
                } else {
                    emptyCount = 0;
                    // System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
                    printEntry(message.getEntries());
                }
                /**
                 *  提交确认
                 */
                connector.ack(batchId);
                /**
                 * 处理失败, 回滚数据
                 */
                connector.rollback(batchId);
            }

            System.out.println("empty too many times, exit");
        } finally {
            connector.disconnect();
            /**
             * 手动开启事务回滚
             */
            //TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
        }
    }

    private static void printEntry(List<CanalEntry.Entry> entrys) {

        for (CanalEntry.Entry entry : entrys) {

            if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry
                    .EntryType
                    .TRANSACTIONEND) {
                continue;
            }

            CanalEntry.RowChange rowChage = null;
            try {
                rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
                        e);
            }

            CanalEntry.EventType eventType = rowChage.getEventType();
            System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                    eventType));

            for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
                if (eventType == CanalEntry.EventType.DELETE) {
                    printColumn(rowData.getBeforeColumnsList());
                } else if (eventType == CanalEntry.EventType.INSERT) {
                    printColumn(rowData.getAfterColumnsList());
                } else {
                    System.out.println("-------> before");
                    printColumn(rowData.getBeforeColumnsList());
                    System.out.println("-------> after");
                    printColumn(rowData.getAfterColumnsList());
                }
            }
        }
    }

    private static void printColumn(List<CanalEntry.Column> columns) {
        for (CanalEntry.Column column : columns) {
            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
        }
    }
}

Здесь написан код, запустим сервис, посмотрим, что получится, так как база не работает, результаты мониторинга пустые.在这里插入图片描述Далее мы выполняемupdateпредложение попробовать

update jk_orderset order_no = '1111'  where id = 40

Консоль обнаруживает изменения базы данных и создает файл журнала binlog.mysql-bin.000009:3830 在这里插入图片描述Итак, как использовать сгенерированный файл binlog и как преобразовать его в инструкцию SQl?

<!-- mysql binlog解析 -->
        <dependency>
            <groupId>com.github.shyiko</groupId>
            <artifactId>mysql-binlog-connector-java</artifactId>
            <version>0.13.0</version>
</dependency>

Загрузите файл binlog прямо сейчас и протестируйте его локально

 public static void main(String[] args) throws IOException {
        String filePath = "C:\\ProgramData\\MySQL\\MySQL Server 5.7\\Data\\mysql-bin.000009:3830";
        File binlogFile = new File(filePath);
        EventDeserializer eventDeserializer = new EventDeserializer();
        eventDeserializer.setChecksumType(ChecksumType.CRC32);
        BinaryLogFileReader reader = new BinaryLogFileReader(binlogFile, eventDeserializer);
        try {
            for (Event event; (event = reader.readEvent()) != null; ) {
                System.out.println(event.toString());
            }
        } finally {
            reader.close();
        }
    }

Проверьте результат выполнения и обнаружите, что самой последней операцией базы данных является добавление индекса idx_index.

Event{header=EventHeaderV4{timestamp=1551325542000, eventType=ANONYMOUS_GTID, serverId=1, headerLength=19, dataLength=46, nextPosition=8455, flags=0}, data=null}
Event{header=EventHeaderV4{timestamp=1551325542000, eventType=QUERY, serverId=1, headerLength=19, dataLength=190, nextPosition=8664, flags=0}, data=QueryEventData{threadId=25, executionTime=0, errorCode=0, database='xin-master', sql='ALTER TABLE `jk_order`
DROP INDEX `idx_index` ,
ADD INDEX `idx_index` (`user_id`, `service_id`, `real_price`) USING BTREE'}}
Event{header=EventHeaderV4{timestamp=1551438586000, eventType=STOP, serverId=1, headerLength=19, dataLength=4, nextPosition=8687, flags=0}, data=null}

До сих пор мы реализовали мониторинг MySQL,

Четыре сценария применения канала

canalСценарии применения примерно следующие:

  • Решить проблему задержки синхронизации master-slave в MySQL.
  • Резервное копирование базы данных в режиме реального времени
  • Многоуровневый индекс (у продавцов и покупателей есть собственный индекс подбазы данных)
  • Реализовать обновление бизнес-кэша
  • Важные деловые новости, такие как изменение цен

Сосредоточьтесь на анализе того, как канал решает проблему задержки синхронизации master-slave в MySQL.

в производственной средеMySQLСинхронный режим ведущий-ведомый (maser-slave) очень распространен, но для кластеров, развернутых в компьютерных залах, будут возникать задержки синхронизации. Возьмите каштан:

Статус заказа не оплачен,masterНода модифицируется на платную, но по каким-то причинам задержанные данные не могут быть синхронизированы сslave, то пользователь сразу проверяет статус заказа (запросы идутslave) показывает, что платеж до сих пор не оплачен, поэтому пользователь не должен паниковать, увидев эту ситуацию.

Почему возникает задержка синхронизации ведущий-ведомый?

Когда основная библиотекаmasterизTPSКогда параллелизм высокий,masterУзлы одновременно генерируют операции модификации, в то время какslaveузлаsql线程Это однопоточная обработка синхронных данных, и задержка генерируется естественным образом.

мы используемcanalмониторинг в реальном времениmaserОбновление данных узла (можно отслеживать для таблицы),canalСразу после отлова измененного SQLslaveВыполнение узла, чтобы решить проблему задержки ведущий-ведомый.

Однако причин для синхронизации ведущий-подчиненный больше, чем это.Поскольку главный-подчиненный сервер существует между машинами и компьютерными комнатами, в дополнение к причинам пропускной способности сети, стабильность сети и синхронизация между машинами являются основными причинами синхронизации ведущий-подчиненный. следует рассматривать.

Суммировать

В этой статье просто реализована функция мониторинга каналов по базе данных, и ее цель — предоставить вам способ решения проблемы.

Организовал сотни различных технических электронных книг и видеоматериалов,嘘~,免费Отправить, ответить в официальном аккаунте【666] Самовывоз. Я построил один с моими друзьями技术交流群, вместе обсуждайте технологии и делитесь технической информацией, стремясь вместе учиться и совершенствоваться, присоединяйтесь к нам, если вам это интересно!