База данных Mysql слушает binlog

MySQL

предисловие

Нам часто нужно что-то делать на основе того, что пользователи делают со своими данными.

Например, если пользователь удаляет свою учетную запись, мы даем емунапиши ему, чтобы отправить текстовое сообщение с просьбой вернуться.

Подобно этой функции, конечно, ее можно реализовать на уровне бизнес-логики и выполнять эту операцию после получения запроса пользователя на удаление, но бинлог базы данных предоставляет нам другой метод работы.

Чтобы отслеживать бинлог, вам нужно сделать два шага: первый шаг, конечно же, ваш mysql должен включить эту функцию, а второй — написать программу для чтения лога.

mysql открыть бинлог.

Во-первых, бинлог mysql открывается не каждый день, поэтому нам нужно:

  1. Найдите файл конфигурации mysqlmy.cnf, Это потому что операционная система другая, локация не обязательно та же, можете найти сами,
  2. Добавьте к нему следующее:

[mysqld]
server_id = 1
log-bin = mysql-bin
binlog-format = ROW


  1. Затем перезапустите mysql.
/ ubuntu
service mysql restart
// mac
mysql.server restart
  1. Проверьте, успешно ли он включен

Введите командную строку mysql и выполните:


show variables like '%log_bin%' ;

Если результат такой, как показано ниже, это означает успех:

2019-04-29-00-31-29

  1. Проверьте статус записываемого бинлога:

2019-04-29-00-32-14

код для чтения бинлога

импортировать зависимости

Мы используем некоторые реализации с открытым исходным кодом, потому что некоторыестранная причина, Я выбралmysql-binlog-connector-javaЭтот пакет (официальный репозиторий github)[GitHub.com/это о/ничего…] Конкретные зависимости следующие:

<!-- https://mvnrepository.com/artifact/com.github.shyiko/mysql-binlog-connector-java -->
    <dependency>
      <groupId>com.github.shyiko</groupId>
      <artifactId>mysql-binlog-connector-java</artifactId>
      <version>0.17.0</version>
    </dependency>

Конечно, существует множество реализаций с открытым исходным кодом для обработки бинарных журналов.canclТолько один, вы тоже можете использовать его.

написать демо

Согласно ридми в официальном репозитории, просто напишите демо.

    public static void main(String[] args) {
        BinaryLogClient client = new BinaryLogClient("hostname", 3306, "username", "passwd");
        EventDeserializer eventDeserializer = new EventDeserializer();
        eventDeserializer.setCompatibilityMode(
                EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG,
                EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY
        );
        client.setEventDeserializer(eventDeserializer);
        client.registerEventListener(new BinaryLogClient.EventListener() {

            @Override
            public void onEvent(Event event) {
                // TODO
                dosomething();
                logger.info(event.toString());
            }
        });
        client.connect();
    }

Это полностью написано по официальному туториалу.Вы можете написать свою бизнес-логику в onEvent.Так как я только тестирую, я буду распечатывать каждое событие в нем.

После этого я вручную вошел в mysql и добавил, изменил и удалил операции соответственно.Отслеживаемые журналы выглядят следующим образом:

00:23:13.331 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=0, eventType=ROTATE, serverId=1, headerLength=19, dataLength=28, nextPosition=0, flags=32}, data=RotateEventData{binlogFilename='mysql-bin.000001', binlogPosition=886}}
00:23:13.334 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468403000, eventType=FORMAT_DESCRIPTION, serverId=1, headerLength=19, dataLength=100, nextPosition=0, flags=0}, data=FormatDescriptionEventData{binlogVersion=4, serverVersion='5.7.23-0ubuntu0.16.04.1-log', headerLength=19, dataLength=95}}
00:23:23.715 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468603000, eventType=ANONYMOUS_GTID, serverId=1, headerLength=19, dataLength=46, nextPosition=951, flags=0}, data=null}
00:23:23.716 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468603000, eventType=QUERY, serverId=1, headerLength=19, dataLength=51, nextPosition=1021, flags=8}, data=QueryEventData{threadId=4, executionTime=0, errorCode=0, database='pf', sql='BEGIN'}}
00:23:23.721 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468603000, eventType=TABLE_MAP, serverId=1, headerLength=19, dataLength=32, nextPosition=1072, flags=0}, data=TableMapEventData{tableId=108, database='pf', table='student', columnTypes=15, 3, columnMetadata=135, 0, columnNullability={}}}
00:23:23.724 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468603000, eventType=EXT_WRITE_ROWS, serverId=1, headerLength=19, dataLength=23, nextPosition=1114, flags=0}, data=WriteRowsEventData{tableId=108, includedColumns={0, 1}, rows=[
    [[B@546a03af, 2]
]}}
00:23:23.725 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468603000, eventType=XID, serverId=1, headerLength=19, dataLength=12, nextPosition=1145, flags=0}, data=XidEventData{xid=28}}
00:23:55.872 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468635000, eventType=ANONYMOUS_GTID, serverId=1, headerLength=19, dataLength=46, nextPosition=1210, flags=0}, data=null}
00:23:55.872 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468635000, eventType=QUERY, serverId=1, headerLength=19, dataLength=51, nextPosition=1280, flags=8}, data=QueryEventData{threadId=4, executionTime=0, errorCode=0, database='pf', sql='BEGIN'}}
00:23:55.873 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468635000, eventType=TABLE_MAP, serverId=1, headerLength=19, dataLength=32, nextPosition=1331, flags=0}, data=TableMapEventData{tableId=108, database='pf', table='student', columnTypes=15, 3, columnMetadata=135, 0, columnNullability={}}}
00:23:55.875 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468635000, eventType=EXT_UPDATE_ROWS, serverId=1, headerLength=19, dataLength=31, nextPosition=1381, flags=0}, data=UpdateRowsEventData{tableId=108, includedColumnsBeforeUpdate={0, 1}, includedColumns={0, 1}, rows=[
    {before=[[B@6833ce2c, 1], after=[[B@725bef66, 3]}
]}}
00:23:55.875 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468635000, eventType=XID, serverId=1, headerLength=19, dataLength=12, nextPosition=1412, flags=0}, data=XidEventData{xid=41}}
00:24:22.333 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468662000, eventType=ANONYMOUS_GTID, serverId=1, headerLength=19, dataLength=46, nextPosition=1477, flags=0}, data=null}
00:24:22.334 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468662000, eventType=QUERY, serverId=1, headerLength=19, dataLength=51, nextPosition=1547, flags=8}, data=QueryEventData{threadId=4, executionTime=0, errorCode=0, database='pf', sql='BEGIN'}}
00:24:22.334 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468662000, eventType=TABLE_MAP, serverId=1, headerLength=19, dataLength=32, nextPosition=1598, flags=0}, data=TableMapEventData{tableId=108, database='pf', table='student', columnTypes=15, 3, columnMetadata=135, 0, columnNullability={}}}
00:24:22.335 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468662000, eventType=EXT_DELETE_ROWS, serverId=1, headerLength=19, dataLength=23, nextPosition=1640, flags=0}, data=DeleteRowsEventData{tableId=108, includedColumns={0, 1}, rows=[
    [[B@1888ff2c, 3]
]}}
00:24:22.335 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468662000, eventType=XID, serverId=1, headerLength=19, dataLength=12, nextPosition=1671, flags=0}, data=XidEventData{xid=42}}


Инкапсулируйте лучший и более индивидуальный класс инструментов в соответствии с вашим бизнесом.

В начале я планировал выложить код, , но чем больше кода написано, тем больше его выложено на github, а здесь выложена только часть реализации.кодовый портал

Реализовать идеи

  1. Поддерживается прослушивание одной таблицы, потому что мы не хотим на самом деле прослушивать все таблицы во всех базах данных.
  2. Может быть многопоточное потребление.
  3. Конвертируем отслеживаемый контент в понравившийся нам вид (структура данных в тексте не обязательно очень хорошая, более подходящей я и не ожидал).

Итак, идея реализации примерно такова:

  1. Инкапсулируйте клиента, предоставьте внешнему миру только метод получения и защитите подробный код инициализации.
  2. Предоставляет метод регистрации слушателя (псевдо), который может зарегистрировать слушателя в таблице (переопределить интерфейс слушателя, и все зарегистрированные слушатели могут реализовать это).
  3. Настоящим слушателем является только клиент, он прослушивает все операции над этим экземпляром базы данных и преобразует их в нужный нам формат.LogItemПоставить в очередь на блокировку.
  4. Запустите несколько потоков, используйте блокирующие очереди иLogItemВызовите прослушиватель соответствующей таблицы данных и выполните некоторую бизнес-логику.

Код инициализации:

    public MysqlBinLogListener(Conf conf) {
        BinaryLogClient client = new BinaryLogClient(conf.host, conf.port, conf.username, conf.passwd);
        EventDeserializer eventDeserializer = new EventDeserializer();
        eventDeserializer.setCompatibilityMode(
                EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG,
                EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY
        );
        client.setEventDeserializer(eventDeserializer);
        this.parseClient = client;
        this.queue = new ArrayBlockingQueue<>(1024);
        this.conf = conf;
        listeners = new ConcurrentHashMap<>();
        dbTableCols = new ConcurrentHashMap<>();
        this.consumer = Executors.newFixedThreadPool(consumerThreads);
    }

Регистрационный код:

    public void regListener(String db, String table, BinLogListener listener) throws Exception {
        String dbTable = getdbTable(db, table);
        Class.forName("com.mysql.jdbc.Driver");
        // 保存当前注册的表的colum信息
        Connection connection = DriverManager.getConnection("jdbc:mysql://" + conf.host + ":" + conf.port, conf.username, conf.passwd);
        Map<String, Colum> cols = getColMap(connection, db, table);
        dbTableCols.put(dbTable, cols);

        // 保存当前注册的listener
        List<BinLogListener> list = listeners.getOrDefault(dbTable, new ArrayList<>());
        list.add(listener);
        listeners.put(dbTable, list);
    }

На этом этапе при регистрации слушателя мы получаем информацию о схеме таблицы и сохраняем ее на карте, чтобы облегчить последующую обработку данных.

Код мониторинга:

    @Override
    public void onEvent(Event event) {
        EventType eventType = event.getHeader().getEventType();

        if (eventType == EventType.TABLE_MAP) {
            TableMapEventData tableData = event.getData();
            String db = tableData.getDatabase();
            String table = tableData.getTable();
            dbTable = getdbTable(db, table);
        }

        // 只处理添加删除更新三种操作
        if (isWrite(eventType) || isUpdate(eventType) || isDelete(eventType)) {
            if (isWrite(eventType)) {
                WriteRowsEventData data = event.getData();
                for (Serializable[] row : data.getRows()) {
                    if (dbTableCols.containsKey(dbTable)) {
                        LogItem e = LogItem.itemFromInsert(row, dbTableCols.get(dbTable));
                        e.setDbTable(dbTable);
                        queue.add(e);
                    }
                }
            }
        }
    }

Мне лень, тут реализована только обработка операции добавления, а остальные операции не пишутся.

код потребления:


    public void parse() throws IOException {
        parseClient.registerEventListener(this);

        for (int i = 0; i < consumerThreads; i++) {
            consumer.submit(() -> {
                while (true) {
                    if (queue.size() > 0) {
                        try {
                            LogItem item = queue.take();
                            String dbtable = item.getDbTable();
                            listeners.get(dbtable).forEach(l -> {
                                l.onEvent(item);
                            });

                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    Thread.sleep(1000);
                }
            });
        }
        parseClient.connect();
    }

При потреблении получить элемент из очереди, а затем получить соответствующий один или несколько слушателей для потребления элемента соответственно.

Тестовый код:

    public static void main(String[] args) throws Exception {
        Conf conf = new Conf();
        conf.host = "hostname";
        conf.port = 3306;
        conf.username = conf.passwd = "hhsgsb";

        MysqlBinLogListener mysqlBinLogListener = new MysqlBinLogListener(conf);
        mysqlBinLogListener.parseArgsAndRun(args);
        mysqlBinLogListener.regListener("pf", "student", item -> {
            System.out.println(new String((byte[])item.getAfter().get("name")));
            logger.info("insert into {}, value = {}", item.getDbTable(), item.getAfter());
        });
        mysqlBinLogListener.regListener("pf", "teacher", item -> System.out.println("teacher ===="));

        mysqlBinLogListener.parse();
    }

В этом маленьком коде прописаны два слушателя, и они отслеживаются отдельно.studentиteacherтаблицы и обработки печати отдельно, после тестирования, вteacherПри вставке данных в таблицу определенная бизнес-логика может выполняться независимо.

Примечание. Класс инструмента здесь нельзя использовать напрямую, потому что многие исключения не обрабатываются, а функция слушает только оператор вставки, который можно использовать в качестве эталона для реализации.

Справочная статья

GitHub.com/это о/ничего…

cloud.Tencent.com/developer/ ах…





ChangeLog

2019-04-30 Конец 01.05.2019 Заменить карту на мультикарту>

Все вышеизложенное является личными мыслями, если есть какие-либо ошибки, пожалуйста, исправьте их в комментариях.

Добро пожаловать на перепечатку, пожалуйста, подпишите и сохраните исходную ссылку.

Контактный адрес электронной почты: huyanshi2580@gmail.com

Дополнительные заметки об обучении см. в личном блоге ------>Хуян тен