предисловие
Нам часто нужно что-то делать на основе того, что пользователи делают со своими данными.
Например, если пользователь удаляет свою учетную запись, мы даем емунапиши ему, чтобы отправить текстовое сообщение с просьбой вернуться.
Подобно этой функции, конечно, ее можно реализовать на уровне бизнес-логики и выполнять эту операцию после получения запроса пользователя на удаление, но бинлог базы данных предоставляет нам другой метод работы.
Чтобы отслеживать бинлог, вам нужно сделать два шага: первый шаг, конечно же, ваш mysql должен включить эту функцию, а второй — написать программу для чтения лога.
mysql открыть бинлог.
Во-первых, бинлог mysql открывается не каждый день, поэтому нам нужно:
- Найдите файл конфигурации mysql
my.cnf
, Это потому что операционная система другая, локация не обязательно та же, можете найти сами, - Добавьте к нему следующее:
[mysqld]
server_id = 1
log-bin = mysql-bin
binlog-format = ROW
- Затем перезапустите mysql.
/ ubuntu
service mysql restart
// mac
mysql.server restart
- Проверьте, успешно ли он включен
Введите командную строку mysql и выполните:
show variables like '%log_bin%' ;
Если результат такой, как показано ниже, это означает успех:
- Проверьте статус записываемого бинлога:
код для чтения бинлога
импортировать зависимости
Мы используем некоторые реализации с открытым исходным кодом, потому что некоторыестранная причина, Я выбралmysql-binlog-connector-java
Этот пакет (официальный репозиторий github)[GitHub.com/это о/ничего…] Конкретные зависимости следующие:
<!-- https://mvnrepository.com/artifact/com.github.shyiko/mysql-binlog-connector-java -->
<dependency>
<groupId>com.github.shyiko</groupId>
<artifactId>mysql-binlog-connector-java</artifactId>
<version>0.17.0</version>
</dependency>
Конечно, существует множество реализаций с открытым исходным кодом для обработки бинарных журналов.cancl
Только один, вы тоже можете использовать его.
написать демо
Согласно ридми в официальном репозитории, просто напишите демо.
public static void main(String[] args) {
BinaryLogClient client = new BinaryLogClient("hostname", 3306, "username", "passwd");
EventDeserializer eventDeserializer = new EventDeserializer();
eventDeserializer.setCompatibilityMode(
EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG,
EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY
);
client.setEventDeserializer(eventDeserializer);
client.registerEventListener(new BinaryLogClient.EventListener() {
@Override
public void onEvent(Event event) {
// TODO
dosomething();
logger.info(event.toString());
}
});
client.connect();
}
Это полностью написано по официальному туториалу.Вы можете написать свою бизнес-логику в onEvent.Так как я только тестирую, я буду распечатывать каждое событие в нем.
После этого я вручную вошел в mysql и добавил, изменил и удалил операции соответственно.Отслеживаемые журналы выглядят следующим образом:
00:23:13.331 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=0, eventType=ROTATE, serverId=1, headerLength=19, dataLength=28, nextPosition=0, flags=32}, data=RotateEventData{binlogFilename='mysql-bin.000001', binlogPosition=886}}
00:23:13.334 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468403000, eventType=FORMAT_DESCRIPTION, serverId=1, headerLength=19, dataLength=100, nextPosition=0, flags=0}, data=FormatDescriptionEventData{binlogVersion=4, serverVersion='5.7.23-0ubuntu0.16.04.1-log', headerLength=19, dataLength=95}}
00:23:23.715 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468603000, eventType=ANONYMOUS_GTID, serverId=1, headerLength=19, dataLength=46, nextPosition=951, flags=0}, data=null}
00:23:23.716 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468603000, eventType=QUERY, serverId=1, headerLength=19, dataLength=51, nextPosition=1021, flags=8}, data=QueryEventData{threadId=4, executionTime=0, errorCode=0, database='pf', sql='BEGIN'}}
00:23:23.721 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468603000, eventType=TABLE_MAP, serverId=1, headerLength=19, dataLength=32, nextPosition=1072, flags=0}, data=TableMapEventData{tableId=108, database='pf', table='student', columnTypes=15, 3, columnMetadata=135, 0, columnNullability={}}}
00:23:23.724 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468603000, eventType=EXT_WRITE_ROWS, serverId=1, headerLength=19, dataLength=23, nextPosition=1114, flags=0}, data=WriteRowsEventData{tableId=108, includedColumns={0, 1}, rows=[
[[B@546a03af, 2]
]}}
00:23:23.725 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468603000, eventType=XID, serverId=1, headerLength=19, dataLength=12, nextPosition=1145, flags=0}, data=XidEventData{xid=28}}
00:23:55.872 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468635000, eventType=ANONYMOUS_GTID, serverId=1, headerLength=19, dataLength=46, nextPosition=1210, flags=0}, data=null}
00:23:55.872 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468635000, eventType=QUERY, serverId=1, headerLength=19, dataLength=51, nextPosition=1280, flags=8}, data=QueryEventData{threadId=4, executionTime=0, errorCode=0, database='pf', sql='BEGIN'}}
00:23:55.873 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468635000, eventType=TABLE_MAP, serverId=1, headerLength=19, dataLength=32, nextPosition=1331, flags=0}, data=TableMapEventData{tableId=108, database='pf', table='student', columnTypes=15, 3, columnMetadata=135, 0, columnNullability={}}}
00:23:55.875 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468635000, eventType=EXT_UPDATE_ROWS, serverId=1, headerLength=19, dataLength=31, nextPosition=1381, flags=0}, data=UpdateRowsEventData{tableId=108, includedColumnsBeforeUpdate={0, 1}, includedColumns={0, 1}, rows=[
{before=[[B@6833ce2c, 1], after=[[B@725bef66, 3]}
]}}
00:23:55.875 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468635000, eventType=XID, serverId=1, headerLength=19, dataLength=12, nextPosition=1412, flags=0}, data=XidEventData{xid=41}}
00:24:22.333 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468662000, eventType=ANONYMOUS_GTID, serverId=1, headerLength=19, dataLength=46, nextPosition=1477, flags=0}, data=null}
00:24:22.334 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468662000, eventType=QUERY, serverId=1, headerLength=19, dataLength=51, nextPosition=1547, flags=8}, data=QueryEventData{threadId=4, executionTime=0, errorCode=0, database='pf', sql='BEGIN'}}
00:24:22.334 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468662000, eventType=TABLE_MAP, serverId=1, headerLength=19, dataLength=32, nextPosition=1598, flags=0}, data=TableMapEventData{tableId=108, database='pf', table='student', columnTypes=15, 3, columnMetadata=135, 0, columnNullability={}}}
00:24:22.335 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468662000, eventType=EXT_DELETE_ROWS, serverId=1, headerLength=19, dataLength=23, nextPosition=1640, flags=0}, data=DeleteRowsEventData{tableId=108, includedColumns={0, 1}, rows=[
[[B@1888ff2c, 3]
]}}
00:24:22.335 [main] INFO util.MysqlBinLog - Event{header=EventHeaderV4{timestamp=1556468662000, eventType=XID, serverId=1, headerLength=19, dataLength=12, nextPosition=1671, flags=0}, data=XidEventData{xid=42}}
Инкапсулируйте лучший и более индивидуальный класс инструментов в соответствии с вашим бизнесом.
В начале я планировал выложить код, , но чем больше кода написано, тем больше его выложено на github, а здесь выложена только часть реализации.кодовый портал
Реализовать идеи
- Поддерживается прослушивание одной таблицы, потому что мы не хотим на самом деле прослушивать все таблицы во всех базах данных.
- Может быть многопоточное потребление.
- Конвертируем отслеживаемый контент в понравившийся нам вид (структура данных в тексте не обязательно очень хорошая, более подходящей я и не ожидал).
Итак, идея реализации примерно такова:
- Инкапсулируйте клиента, предоставьте внешнему миру только метод получения и защитите подробный код инициализации.
- Предоставляет метод регистрации слушателя (псевдо), который может зарегистрировать слушателя в таблице (переопределить интерфейс слушателя, и все зарегистрированные слушатели могут реализовать это).
- Настоящим слушателем является только клиент, он прослушивает все операции над этим экземпляром базы данных и преобразует их в нужный нам формат.
LogItem
Поставить в очередь на блокировку. - Запустите несколько потоков, используйте блокирующие очереди и
LogItem
Вызовите прослушиватель соответствующей таблицы данных и выполните некоторую бизнес-логику.
Код инициализации:
public MysqlBinLogListener(Conf conf) {
BinaryLogClient client = new BinaryLogClient(conf.host, conf.port, conf.username, conf.passwd);
EventDeserializer eventDeserializer = new EventDeserializer();
eventDeserializer.setCompatibilityMode(
EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG,
EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY
);
client.setEventDeserializer(eventDeserializer);
this.parseClient = client;
this.queue = new ArrayBlockingQueue<>(1024);
this.conf = conf;
listeners = new ConcurrentHashMap<>();
dbTableCols = new ConcurrentHashMap<>();
this.consumer = Executors.newFixedThreadPool(consumerThreads);
}
Регистрационный код:
public void regListener(String db, String table, BinLogListener listener) throws Exception {
String dbTable = getdbTable(db, table);
Class.forName("com.mysql.jdbc.Driver");
// 保存当前注册的表的colum信息
Connection connection = DriverManager.getConnection("jdbc:mysql://" + conf.host + ":" + conf.port, conf.username, conf.passwd);
Map<String, Colum> cols = getColMap(connection, db, table);
dbTableCols.put(dbTable, cols);
// 保存当前注册的listener
List<BinLogListener> list = listeners.getOrDefault(dbTable, new ArrayList<>());
list.add(listener);
listeners.put(dbTable, list);
}
На этом этапе при регистрации слушателя мы получаем информацию о схеме таблицы и сохраняем ее на карте, чтобы облегчить последующую обработку данных.
Код мониторинга:
@Override
public void onEvent(Event event) {
EventType eventType = event.getHeader().getEventType();
if (eventType == EventType.TABLE_MAP) {
TableMapEventData tableData = event.getData();
String db = tableData.getDatabase();
String table = tableData.getTable();
dbTable = getdbTable(db, table);
}
// 只处理添加删除更新三种操作
if (isWrite(eventType) || isUpdate(eventType) || isDelete(eventType)) {
if (isWrite(eventType)) {
WriteRowsEventData data = event.getData();
for (Serializable[] row : data.getRows()) {
if (dbTableCols.containsKey(dbTable)) {
LogItem e = LogItem.itemFromInsert(row, dbTableCols.get(dbTable));
e.setDbTable(dbTable);
queue.add(e);
}
}
}
}
}
Мне лень, тут реализована только обработка операции добавления, а остальные операции не пишутся.
код потребления:
public void parse() throws IOException {
parseClient.registerEventListener(this);
for (int i = 0; i < consumerThreads; i++) {
consumer.submit(() -> {
while (true) {
if (queue.size() > 0) {
try {
LogItem item = queue.take();
String dbtable = item.getDbTable();
listeners.get(dbtable).forEach(l -> {
l.onEvent(item);
});
} catch (InterruptedException e) {
e.printStackTrace();
}
}
Thread.sleep(1000);
}
});
}
parseClient.connect();
}
При потреблении получить элемент из очереди, а затем получить соответствующий один или несколько слушателей для потребления элемента соответственно.
Тестовый код:
public static void main(String[] args) throws Exception {
Conf conf = new Conf();
conf.host = "hostname";
conf.port = 3306;
conf.username = conf.passwd = "hhsgsb";
MysqlBinLogListener mysqlBinLogListener = new MysqlBinLogListener(conf);
mysqlBinLogListener.parseArgsAndRun(args);
mysqlBinLogListener.regListener("pf", "student", item -> {
System.out.println(new String((byte[])item.getAfter().get("name")));
logger.info("insert into {}, value = {}", item.getDbTable(), item.getAfter());
});
mysqlBinLogListener.regListener("pf", "teacher", item -> System.out.println("teacher ===="));
mysqlBinLogListener.parse();
}
В этом маленьком коде прописаны два слушателя, и они отслеживаются отдельно.student
иteacher
таблицы и обработки печати отдельно, после тестирования, вteacher
При вставке данных в таблицу определенная бизнес-логика может выполняться независимо.
Примечание. Класс инструмента здесь нельзя использовать напрямую, потому что многие исключения не обрабатываются, а функция слушает только оператор вставки, который можно использовать в качестве эталона для реализации.
Справочная статья
cloud.Tencent.com/developer/ ах…
ChangeLog
2019-04-30 Конец 01.05.2019 Заменить карту на мультикарту>Все вышеизложенное является личными мыслями, если есть какие-либо ошибки, пожалуйста, исправьте их в комментариях.
Добро пожаловать на перепечатку, пожалуйста, подпишите и сохраните исходную ссылку.
Контактный адрес электронной почты: huyanshi2580@gmail.com
Дополнительные заметки об обучении см. в личном блоге ------>Хуян тен