Подробное объяснение инструмента синтаксического анализа MySQL Binlog Maxwell
Максвелл Профиль
Maxwell — это приложение, которое может читать двоичный журнал двоичного журнала MySQL в режиме реального времени и генерировать сообщения в формате JSON в качестве производителя для отправки на Kafka, Kinesis, RabbitMQ, Redis, Google Cloud Pub/Sub, файловые или другие платформы. Его общие сценарии применения включают ETL, обслуживание кэша, сбор dml-индикаторов на уровне таблиц, инкрементный переход к поисковым системам, миграцию разделов данных и схемы отката binlog для сокращения базы данных. Официальный сайт (http://maxwells-daemon.io), GitHub (https://github.com/zendesk/maxwell)
Maxwell в основном предоставляет следующие функции:
- служба поддержки
SELECT * FROM table
способ инициализировать полный объем данных - Поддержка автоматического восстановления местоположения binlog (GTID) после аварийного переключения в основной библиотеке.
- Данные могут быть секционированы для решения проблемы перекоса данных, а данные, отправляемые в Kafka, поддерживают секционирование данных на уровне базы данных, таблицы, столбца и других уровнях.
- Работа маскируется под Slave, получает события binlog, а затем собирается в соответствии со схемами информации, может принимать различные события ddl, xid, row и т.д.
В дополнение к Maxwell, в настоящее время широко используемые инструменты для синтаксического анализа MySQL Binlog в основном включают канал Али и mysql_streamer. Три инструмента сравниваются следующим образом:
Canal разработан на Java и разделен на сервер и клиент, у него много производных приложений со стабильной производительностью и мощными функциями, canal должен написать свой собственный клиент для использования данных, проанализированных canal.
Преимущество maxwell перед canal в том, что он прост в использовании, он напрямую выводит изменения данных в виде строк json без необходимости писать клиент.
быстрый старт
Во-первых, MySQL должен включить BinLog сначала. Для чего есть Binlog Mysql, вы можете обратиться к статье »Введение в Бинлог MySQL》
$ vi my.cnf
[mysqld]
server_id=1
log-bin=master
binlog_format=row
Создайте пользователя Maxwell и дайте некоторые разрешения для библиотеки maxwell.
CREATE USER 'maxwell'@'%' IDENTIFIED BY '123456';
GRANT ALL ON maxwell.* TO 'maxwell'@'%';
GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE on *.* to 'maxwell'@'%';
Вам нужно запустить kafka перед использованием maxwell
wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.1.0/kafka_2.11-2.1.0.tgz
tar -xzf kafka_2.11-2.1.0.tgz
cd kafka_2.11-2.1.0
# 启动Zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties
Перед запуском kafka на одной машине необходимо изменить файл конфигурации и открыть файл конфигурации.vi config/server.properties
, добавьте в конец файлаadvertised.host.name
Конфигурация, значение — IP машины, на которой стоит кафка
advertised.host.name=10.100.97.246
В противном случае будет сообщено об исключении, когда maxwell будет запущен позже через докер (hadoop2 — это имя моего хоста)
17:45:21,446 DEBUG NetworkClient - [Producer clientId=producer-1] Error connecting to node hadoop2:9092 (id: 0 rack: null)
java.io.IOException: Can't resolve address: hadoop2:9092
at org.apache.kafka.common.network.Selector.connect(Selector.java:217) ~[kafka-clients-1.0.0.jar:?]
at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:793) [kafka-clients-1.0.0.jar:?]
at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:230) [kafka-clients-1.0.0.jar:?]
at org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:263) [kafka-clients-1.0.0.jar:?]
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:238) [kafka-clients-1.0.0.jar:?]
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:176) [kafka-clients-1.0.0.jar:?]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_181]
Caused by: java.nio.channels.UnresolvedAddressException
at sun.nio.ch.Net.checkAddress(Net.java:101) ~[?:1.8.0_181]
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:622) ~[?:1.8.0_181]
at org.apache.kafka.common.network.Selector.connect(Selector.java:214) ~[kafka-clients-1.0.0.jar:?]
... 6 more
Затем вы можете начать кафку
bin/kafka-server-start.sh config/server.properties
тест кафка
# 创建一个 topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
# 列出所有 topic
bin/kafka-topics.sh --list --zookeeper localhost:2181
# 启动一个生产者,然后随意发送一些消息
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message
This is another message
# 在另一个终端启动一下消费者,观察所消费的消息
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
This is a message
This is another message
Быстро установить и использовать Maxwell через докер (конечно, вам нужно установить докер самостоятельно)
# 拉取镜像
docker pull zendesk/maxwell
# 启动maxwell,并将解析出的binlog输出到控制台
docker run -ti --rm zendesk/maxwell bin/maxwell --user='maxwell' --password='123456' --host='10.100.97.246' --producer=stdout
Протестируйте Максвелла, сначала создайте простую таблицу, затем добавляйте, изменяйте и удаляйте данные
CREATE TABLE `test` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`age` int(11) DEFAULT NULL,
`name` varchar(255) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
insert into test values(1,22,"小旋锋");
update test set name='whirly' where id=1;
delete from test where id=1;
Наблюдайте за выводом консоли Docker.Из журнала вывода вы можете увидеть формат строки JSON binlog, проанализированной Maxwell.
{"database":"test","table":"test","type":"insert","ts":1552153502,"xid":832,"commit":true,"data":{"id":1,"age":22,"name":"小旋锋"}}
{"database":"test","table":"test","type":"update","ts":1552153502,"xid":833,"commit":true,"data":{"id":1,"age":22,"name":"whirly"},"old":{"name":"小旋锋"}}
{"database":"test","table":"test","type":"delete","ts":1552153502,"xid":834,"commit":true,"data":{"id":1,"age":22,"name":"whirly"}}
Вывод в кафка, Закрыть докер, сброс параметров запуска
docker run -it --rm zendesk/maxwell bin/maxwell --user='maxwell' \
--password='123456' --host='10.100.97.246' --producer=kafka \
--kafka.bootstrap.servers='10.100.97.246:9092' --kafka_topic=maxwell --log_level=debug
Затем запустите потребитель, чтобы потреблять сообщения темы maxwell и наблюдать за его выводом; снова выполните SQL для добавления, изменения и удаления данных, вы все равно можете получить тот же результат.
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic maxwell
Формат выходной строки JSON
- Данные Последние данные, модифицированные данные
- старые старые данные, данные до модификации
- введите тип операции, здесь вставка, обновление, удаление, создание базы данных, изменение базы данных, удаление базы данных, создание таблицы, изменение таблицы, удаление таблицы, вставка начальной загрузки, int (неизвестный тип)
- ID транзакции XID
- commit Один и тот же xid представляет ту же транзакцию, и последний оператор транзакции будет иметь коммит, который можно использовать для воспроизведения транзакции.
- server_id
- thread_id
- Добавьте параметр --output_ddl при запуске программы, вы можете захватить оператор ddl
- Столбец даты и времени будет выводиться как «ГГГГ-ММ-ДД чч: мм: сс», если он встретит «0000-00-00 00:00:00», он будет выводиться как есть.
- maxwell поддерживает несколько кодировок, но выводит только кодировку utf8
- TIMESTAMP Максвелла всегда обрабатывается как UTC. Если вы хотите настроить свой собственный часовой пояс, вам нужно обработать его в логике сервера.
Конфигурация, связанная с выходным форматом, выглядит следующим образом.
опции | значение параметра | описывать | По умолчанию |
---|---|---|---|
output_binlog_position |
BOOLEAN | Включать ли позицию binlog | false |
output_gtid_position |
BOOLEAN | Содержит позицию GTID | false |
output_commit_info |
BOOLEAN | Включать ли коммит и XID | true |
output_xoffset |
BOOLEAN | Включать ли виртуальное смещение TX-ряд | false |
output_nulls |
BOOLEAN | Включать ли поля со значением NULL | true |
output_server_id |
BOOLEAN | включать ли server_id | false |
output_thread_id |
BOOLEAN | включать ли thread_id | false |
output_schema_id |
BOOLEAN | включать ли schema_id | false |
output_row_query |
BOOLEAN | Содержит операторы вставки / обновления / удаления. MySQL нужно открытьbinlog_rows_query_log_events
|
false |
output_ddl |
BOOLEAN | Включать ли события DDL (table-alter, table-create и т. д.) | false |
output_null_zerodates |
BOOLEAN | Преобразовать «0000-00-00» в ноль? | false |
Расширенное использование
базовая конфигурация
опции | значение параметра | описывать | По умолчанию |
---|---|---|---|
config |
конфигурационный файлconfig.properties маршрут |
||
log_level |
[debug | info | warn | error] |
уровень журнала | info |
daemon |
Укажите экземпляр Maxwell для запуска в фоновом режиме в качестве демона. | ||
env_config_prefix |
STRING | Переменные среды, соответствующие этому префиксу, будут рассматриваться как значения конфигурации. |
Вы можете записать параметры запуска Maxwell в файл конфигурацииconfig.properties
, затем указывается с помощью параметра конфигурации,bin/maxwell --config config.properties
user=maxwell
password=123456
host=10.100.97.246
producer=kafka
kafka.bootstrap.servers=10.100.97.246:9092
kafka_topic=maxwell
Параметры конфигурации MySQL
Максвелл делит MySQL на 3 роли в зависимости от использования:
-
host
: размещение, создание библиотечной таблицы maxwell, сохранение захваченной схемы и другой информации.- В основном есть шесть таблиц, начальная загрузка используется для инициализации данных, схемы записывают всю информацию о файле binlog, базы данных записывают всю информацию о базе данных, таблицы записывают всю информацию о таблицах, столбцы записывают всю информацию о полях, а записи о позициях считывают информацию о смещении binlog, Heartbeats записывает сердцебиение. Информация
-
replication_host
: Скопируйте хост, отслеживайте события, читайте бинарный журнал хоста.- будет
host
иreplication_host
Отдельно, чтобы избежатьreplication_user
Запись данных в производственный репозиторий
- будет
-
schema_host
:schema host, хост, который фиксирует схему структуры таблицы- В binlog нет информации о полях, поэтому maxwell нужно найти схему в базе данных и сохранить ее.
-
schema_host
Обычно не используется, но вbinlog-proxy
Сценарий очень практичный. Чтобы иметь такой автономный поток binlog JSON, сгенерированный Maxwell, поэтому нет собственного сервера mysql, структура для передачи только binlog, эта таблица времени, полученная из тормозного механизма, может schema_host.
Обычно эти три хоста одинаковы.schema_host
Есть толькоreplication_host
при использовании.
Следующие конфигурации относятся к MySQL
опции | значение параметра | описывать | По умолчанию |
---|---|---|---|
host |
STRING | адрес mysql | localhost |
user |
STRING | имя пользователя mysql | |
password |
STRING | пароль mysql | (no password) |
port |
INT | mysql порт 3306 | |
jdbc_options |
STRING | mysql jdbc connection options | DEFAULT_JDBC_OPTS |
ssl |
SSL_OPT | SSL behavior for mysql cx | DISABLED |
schema_database |
STRING | База данных, которая будет использоваться схемой и позицией, которую Максвелл использует для поддержания | maxwell |
client_id |
STRING | Уникальная строка, используемая для идентификации экземпляра Maxwell. | maxwell |
replica_server_id |
LONG | Уникальный номер, используемый для идентификации экземпляра Maxwell. | 6379 (see notes) |
master_recovery |
BOOLEAN | enable experimental master recovery code | false |
gtid_mode |
BOOLEAN | Следует ли открывать репликацию на основе GTID | false |
recapture_schema |
BOOLEAN | Восстановить последнюю структуру таблицы (схему), не настраиваемую в config.properties | false |
replication_host |
STRING | server to replicate from. See split server roles | schema-store host |
replication_password |
STRING | password on replication server | (none) |
replication_port |
INT | port on replication server | 3306 |
replication_user |
STRING | user on replication server | |
replication_ssl |
SSL_OPT | SSL behavior for replication cx cx | DISABLED |
schema_host |
STRING | server to capture schema from. See split server roles | schema-store host |
schema_password |
STRING | password on schema-capture server | (none) |
schema_port |
INT | port on schema-capture server | 3306 |
schema_user |
STRING | user on schema-capture server | |
schema_ssl |
SSL_OPT | SSL behavior for schema-capture server | DISABLED |
Конфигурация производителя
Представлена только kafka, а конфигурация других производителей подробно описана в официальной документации.
Kafka — это Maxwell, поддерживающий наиболее полный производитель и имеющий множество версий клиентов Kafka (0.8.2.2, 0.9.0.1, 0.10.0.1, 0.10.0.0.), по умолчанию kafka_version = 1.0.0 (текущая версия Maxwell 1.20.0)
Максвелл доставит сообщение на тему Кафки, темуkafka_topic
Опция указана, значение по умолчаниюmaxwell
В дополнение к указанию статической темы вы также можете указать динамическую, напримерnamespace_%{database}_%{table}
,%{database}
и%{table}
Будет заменена базой данных и таблицей для конкретных сообщений.
Когда Максвелл читает конфигурацию, если элемент конфигурацииkafka.
В начале конфигурация будет настроена на параметры подключения клиента Kafka Producer, такие как
kafka.acks = 1
kafka.compression.type = snappy
kafka.retries=5
Ниже приведены параметры конфигурации для универсального производителя Maxwell и производителя Kafka.
опции | значение параметра | описывать | По умолчанию |
---|---|---|---|
producer |
PRODUCER_TYPE | тип производителя | stdout |
custom_producer.factory |
CLASS_NAME | Заводской класс для пользовательского потребителя | |
producer_ack_timeout |
PRODUCER_ACK_TIMEOUT | Асинхронное потребление учитывает тайм-аут потери сообщения (мс) | |
producer_partition_by |
PARTITION_BY | Функция разделения для ввода в kafka/kinesis | database |
producer_partition_columns |
STRING | Если столбец, имена столбцов, разделенные запятыми | |
producer_partition_by_fallback |
PARTITION_BY_FALLBACK |
producer_partition_by=column требуется, когда столбец не существует используется |
|
ignore_producer_error |
BOOLEAN | При значении false выход из программы при возникновении ошибки в kafka/kinesis, при значении true записывается только лог См. такжеdead_letter_topic
|
true |
kafka.bootstrap.servers |
STRING | список кластеров кафки,HOST:PORT[,HOST:PORT]
|
|
kafka_topic |
STRING | kafka topic | maxwell |
dead_letter_topic |
STRING | Подробности смотрите в официальной документации | |
kafka_version |
KAFKA_VERSION | Указывает клиентскую версию maxwell производителя kafka, которую нельзя настроить в config.properties. | 0.11.0.1 |
kafka_partition_hash |
[default | murmur3] |
Метод хеширования для использования при выборе разделов kafka | default |
kafka_key_format |
[array | hash] |
how maxwell outputs kafka keys, either a hash or an array of hashes | hash |
ddl_kafka_topic |
STRING | когдаoutput_ddl При значении true все сообщения DDL будут доставляться в этот раздел. |
kafka_topic |
конфигурация фильтра
Максвелл может пройти--filter
Элементы конфигурации для указания правил фильтрации черезexclude
исключить, поinclude
Содержит, значение может быть определенной базой данных, таблицей данных, столбцом данных или даже использовать Javascript для определения сложных правил фильтрации; это может быть описано регулярными выражениями, есть несколько примеров с официального сайта
# 仅匹配foodb数据库的tbl表和所有table_数字的表
--filter='exclude: foodb.*, include: foodb.tbl, include: foodb./table_\d+/'
# 排除所有库所有表,仅匹配db1数据库
--filter = 'exclude: *.*, include: db1.*'
# 排除含db.tbl.col列值为reject的所有更新
--filter = 'exclude: db.tbl.col = reject'
# 排除任何包含col_a列的更新
--filter = 'exclude: *.*.col_a = *'
# blacklist 黑名单,完全排除bad_db数据库,若要恢复,必须删除maxwell库
--filter = 'blacklist: bad_db.*'
Инициализация данных
После начала Maxwell позиция последней остановки будет получена из библиотеки MaxWell, а Binlog будет прочитан с точки останова. Если бинлог был очищен, как можно скопировать весь стол через Maxwell? То есть как делать инициализацию данных?
Оперировать всей таблицей и искусственно генерировать бинлог? Например, найти поле, которое не влияет на бизнес, такое как update_time, затем добавить одну секунду, а затем вычесть одну секунду?
update test set update_time = DATE_ADD(update_time,intever 1 second);
update test set update_time = DATE_ADD(update_time,intever -1 second);
Очевидно, что здесь есть несколько больших проблем:
- Что делать, если неважное поле не существует? Каждое поле важно и не может быть изменено случайно!
- Если вся таблица большая, процесс модификации займет много времени и повлияет на бизнес!
- Будет генерировать много некоммерческого бинлога!
Maxwell предоставляет командный инструмент для проблемы инициализации данныхmaxwell-bootstrap
Помогите нам завершить инициализацию данных,maxwell-bootstrap
основан наSELECT * FROM table
способ инициализировать полный объем данных без создания избыточного бинлога!
Этот инструмент имеет следующие параметры:
параметр | инструкция |
---|---|
--log_level LOG_LEVEL |
Уровень журнала (DEBUG, INFO, WARN или ERROR) |
--user USER |
имя пользователя mysql |
--password PASSWORD |
пароль mysql |
--host HOST |
адрес mysql |
--port PORT |
порт mysql |
--database DATABASE |
База данных, в которой находится таблица для начальной загрузки |
--table TABLE |
стол для загрузки |
--where WHERE_CLAUSE |
установить фильтры |
--client_id CLIENT_ID |
Указывает экземпляр Maxwell для выполнения действия начальной загрузки. |
Поэкспериментируйте, следующее поможетtest
в базе данныхtest
Таблица, первая заключается в подготовке нескольких фрагментов данных для тестирования
INSERT INTO `test` VALUES (1, 1, '1');
INSERT INTO `test` VALUES (2, 2, '2');
INSERT INTO `test` VALUES (3, 3, '3');
INSERT INTO `test` VALUES (4, 4, '4');
потомreset master;
Очистить бинлог, удалить таблицы в библиотеке maxwell. Затем используйте команды быстрого запуска, чтобы запустить потребителей Kafka, Maxwell и Kafka, затем запуститеmaxwell-bootstrap
docker run -it --rm zendesk/maxwell bin/maxwell-bootstrap --user maxwell \
--password 123456 --host 10.100.97.246 --database test --table test --client_id maxwell
Уведомление:--bootstrapper=sync
При обработке Bootstrap будет заблокирован обычный разбор Binlog;--bootstrapper=async
не заблокирует.
Вы также можете выполнить следующий SQL вmaxwell.bootstrap
Вставить записи в таблицу, запустить вручную
insert into maxwell.bootstrap (database_name, table_name) values ('test', 'test');
Вы можете увидеть управляемые данные на стороне потребителя kafka.
{"database":"maxwell","table":"bootstrap","type":"insert","ts":1552199115,"xid":36738,"commit":true,"data":{"id":3,"database_name":"test","table_name":"test","where_clause":null,"is_complete":0,"inserted_rows":0,"total_rows":0,"created_at":null,"started_at":null,"completed_at":null,"binlog_file":null,"binlog_position":0,"client_id":"maxwell"}}
{"database":"test","table":"test","type":"bootstrap-start","ts":1552199115,"data":{}}
{"database":"test","table":"test","type":"bootstrap-insert","ts":1552199115,"data":{"id":1,"age":1,"name":"1"}}
{"database":"test","table":"test","type":"bootstrap-insert","ts":1552199115,"data":{"id":2,"age":2,"name":"2"}}
{"database":"test","table":"test","type":"bootstrap-insert","ts":1552199115,"data":{"id":3,"age":3,"name":"3"}}
{"database":"test","table":"test","type":"bootstrap-insert","ts":1552199115,"data":{"id":4,"age":4,"name":"4"}}
{"database":"maxwell","table":"bootstrap","type":"update","ts":1552199115,"xid":36756,"commit":true,"data":{"id":3,"database_name":"test","table_name":"test","where_clause":null,"is_complete":1,"inserted_rows":4,"total_rows":0,"created_at":null,"started_at":"2019-03-10 14:25:15","completed_at":"2019-03-10 14:25:15","binlog_file":"mysql-bin.000001","binlog_position":64446,"client_id":"maxwell"},"old":{"is_complete":0,"inserted_rows":1,"completed_at":null}}
{"database":"test","table":"test","type":"bootstrap-complete","ts":1552199115,"data":{}}
4 посередине этоtest.test
данные binlog, обратите внимание, что тип здесьbootstrap-insert
, вместоinsert
.
Затем снова проверьте бинлог,show binlog events;
, вы обнаружите, что только сmaxwell
связанный binlog, и нетtest.test
связанный бинлог, поэтомуmaxwell-bootstrap
Команда не создает дополнительный двоичный журнал, что будет более очевидно при большом количестве листов данных.
Процесс начальной загрузкиbootstrap-start -> bootstrap-insert -> bootstrap-complete
, где поля данных начала и завершения пусты и не содержат данных.
В процессе бутстрапа, если максвелл крашится, при перезагрузке бутстрап запустится заново, сколько бы он ни был сделан до этого.Если вы этого не хотите, то можете задать это в базе данныхis_complete
Значение поля равно 1 (готово), или строка удалена
Максвелл Мониторинг
Максвелл предоставляетbase logging mechanism, JMX, HTTP or by push to Datadog
Для этих четырех методов мониторинга элементы конфигурации, связанные с мониторингом, следующие:
опции | значение параметра | описывать | По умолчанию |
---|---|---|---|
metrics_prefix |
STRING | префикс индикатора | MaxwellMetrics |
metrics_type |
[slf4j | jmx | http | datadog] |
Как публиковать метрики | |
metrics_jvm |
BOOLEAN | Собирать информацию о JVM | false |
metrics_slf4j_interval |
SECONDS | Как часто метрика зарегистрирована в журнал,metrics_type Должен быть настроен как slf4j |
60 |
http_port |
INT |
metrics_type Если это http, порт, на который публикуется индикатор |
8080 |
http_path_prefix |
STRING | префикс пути http | / |
http_bind_address |
STRING | http публикует адрес, привязанный к индикатору | all addresses |
http_diagnostic |
BOOLEAN | Включает ли http диагностический суффикс | false |
http_diagnostic_timeout |
MILLISECONDS | HTTP диагностическое время отклика | 10000 |
metrics_datadog_type |
[udp | http] |
metrics_type Как публиковать метрики для datadog |
udp |
metrics_datadog_tags |
STRING | Теги, предоставляемые datadog, такие как tag1:value1,tag2:value2 | |
metrics_datadog_interval |
INT | Как часто отправлять метрики в datadog, в секундах | 60 |
metrics_datadog_apikey |
STRING | когдаmetrics_datadog_type=http Ключ API, используемый datadog |
|
metrics_datadog_host |
STRING | когдаmetrics_datadog_type=udp Нажмите индекс, когда целевой адрес |
localhost |
metrics_datadog_port |
INT | когдаmetrics_datadog_type=udp Пуш-индикатор порта времени |
8125 |
Какие конкретные показатели мониторинга можно получить? Есть следующие, обратите внимание, что все индикаторы предварительно настроены с префиксами индикатораmetrics_prefix
показатель | тип | инструкция |
---|---|---|
messages.succeeded |
Counters | Количество сообщений, успешно отправленных в kafka |
messages.failed |
Counters | Количество сообщений, которые не удалось отправить |
row.count |
Counters | Количество обработанных строк бинлога, обратите внимание, что не все бинлоги отправляются в kafka |
messages.succeeded.meter |
Meters | Скорость, с которой сообщения были успешно отправлены в Kafka |
messages.failed.meter |
Meters | Скорость, с которой отправка сообщения не удается кафке |
row.meter |
Meters | Скорость, с которой строки поступают в maxwell из коннектора binlog |
replication.lag |
Gauges | Время (в миллисекундах), прошедшее между фиксацией транзакции базы данных и обработкой транзакции Maxwell. |
inflightmessages.count |
Gauges | Количество сообщений, обрабатываемых в данный момент (ожидающих подтверждения от получателя или до сообщения) |
message.publish.time |
Timers | Отправьте время (миллисекунды) в Кафку |
message.publish.age |
Timers | Время (в миллисекундах) между генерацией события из базы данных и его отправкой в Kafka с точностью +/- 500 мс. |
replication.queue.time |
Timers | Время в миллисекундах для отправки события binlog в очередь обработки |
Некоторые из вышеперечисленных индикаторов специфичны для Kafka и не поддерживаются всеми производителями.
Экспериментируйте, получайте показатели мониторинга через http
docker run -p 8080:8080 -it --rm zendesk/maxwell bin/maxwell --user='maxwell' \
--password='123456' --host='10.100.97.246' --producer=kafka \
--kafka.bootstrap.servers='10.100.97.246:9092' --kafka_topic=maxwell --log_level=debug \
--metrics_type=http --metrics_jvm=true --http_port=8080
Большая часть приведенной выше конфигурации такая же, как и предыдущая, разница-p 8080:8080
сопоставление портов докера и--metrics_type=http --metrics_jvm=true --http_port=8080
, настроить публикацию индикаторов через http, включить сбор информации JVM, порт 8080, а дальше можно проходитьhttp://10.100.97.246:8080/metrics
Доступны все показатели
В методе http есть четыре суффикса, соответствующие четырем различным форматам.
endpoint | инструкция |
---|---|
/metrics |
Все метрики возвращаются в формате JSON |
/prometheus |
Все метрики возвращаются в формате Prometheus (Prometheus представляет собой комбинацию открытого исходного кода для мониторинга, оповещения и баз данных временных рядов). |
/healthcheck |
Возвращает, был ли Максвелл здоров в течение последних 15 минут. |
/ping |
Простой тест, который возвращаетpong
|
Если вы собираете индикаторы мониторинга Maxwell через JMX, вы можетеJAVA_OPTS
Переменные среды Настройка доступа к JMX
export JAVA_OPTS="-Dcom.sun.management.jmxremote \
-Dcom.sun.management.jmxremote.port=9010 \
-Dcom.sun.management.jmxremote.local.only=false \
-Dcom.sun.management.jmxremote.authenticate=false \
-Dcom.sun.management.jmxremote.ssl=false \
-Djava.rmi.server.hostname=10.100.97.246"
Несколько экземпляров Maxwell
В различных конфигурациях Maxwell может запускать несколько экземпляров на одном и том же главном сервере. Это полезно, если вы хотите, чтобы производитель работал в другой конфигурации, например, для доставки событий из разных групп таблиц в разные темы. Каждый экземпляр Maxwell должен быть настроен с уникальным идентификатором client_id, чтобы различать расположение binlog.
Поддержка GTID.
Maxwell поддерживает репликацию на основе GTID, начиная с версии 1.8.0 (GTID-based replication), в режиме GTID Maxwell будет прозрачно выбирать новое место для репликации после смены хоста.
Что такое репликация GTID?
GTID (Global Transaction ID) — это номер совершенной транзакции, который является глобальным уникальным номером.
Начиная с MySQL 5.6.5, был добавлен метод репликации на основе GTID. GTID гарантирует, что каждая транзакция, зафиксированная на мастере, имеет уникальный идентификатор в кластере. Такой подход усиливает согласованность базы данных «главный-подчиненный», восстановление после сбоев и отказоустойчивость.
В исходной двоичной репликации на основе журнала подчиненная библиотека должна сообщить главной библиотеке, с какого смещения следует выполнять добавочную синхронизацию.Если указанная ошибка неверна, это приведет к пропуску данных, что приведет к несогласованности данных. С помощью GTID в случае переключения master-slave другие ведомые библиотеки MySQL могут автоматически находить правильное место репликации в новой master базе данных, что значительно упрощает обслуживание кластера при сложной топологии репликации и снижает вероятность искусственного установка места репликации Риск неправильного использования. Кроме того, репликация на основе GTID может игнорировать уже выполненные транзакции, что снижает риск несогласованности данных.
Меры предосторожности
timestamp column
maxwell для типов времени (datetime, timestamp, date)Рассматривать как строкуДа, это также необходимо для обеспечения согласованности данных (например,0000-00-00 00:00:00
Такое время является недопустимым в метке времени, но mysql распознает, что оно равно null/None при анализе в типе java или python).
Если поле в таблице MySQL имеет тип временной метки, это концепция часового пояса,Бинлог анализирует стандартное время UTC.Но пользователь видит местное время. Напримерf_create_time timestamp
Время создания - пекинское время.2018-01-05 21:01:01
, то на самом деле mysql хранит2018-01-05 13:01:01
, что также является строкой времени в binlog. Если нет потребителя и нет преобразования часового пояса, это будет на 8 часов меньше.
Вместо того, чтобы рассматривать эту проблему для каждого клиента, я думаю, что более разумно предоставить параметры часового пояса, а затем maxwell автоматически обрабатывает проблему часового пояса, в противном случае клиенту нужно сначала знать, какие столбцы являются типами меток времени, или подключиться к оригинальный библиотечный кеш для этих типов.
binary column
maxwell может обрабатывать столбцы двоичного типа, такие как blob, varbinary, и его подход заключается в использовании двоичных столбцов.base64_encode
, как строковый вывод в json. После того, как потребители получат этот столбец данных, они не могут быть собраны напрямую, им нужноbase64_decode
.
Структура таблицы не синхронизирована
Если вы возьмете более старый бинлог и поместите его на новый сервер mysql и используете maxwell для его извлечения, возможно, структура таблицы изменилась, например соотношение полей в бинлогеschema_host
В нем есть еще одно поле. На данный момент никаких отклонений от нормы в этой ситуации обнаружено не было, например, Alibaba RDS по умолчанию добавит таблицу без первичного ключа и без уникального индекса.__##alibaba_rds_rowid##__
,существуетshow create table
иschema
В ней этого скрытого первичного ключа не видно, но он будет в бинлоге, который будет синхронизирован с ведомой библиотекой.
Кроме того, мы управляли версией структуры через git, если есть такой сценарий, мы также можем справиться с этим.
Большая транзакция Binlog
Когда объем двоичного журнала, сгенерированного транзакцией, очень велик, например при переносе данных ежедневной таблицы, для контроля использования памяти maxwell автоматически помещает необработанный двоичный журнал в файловую систему.
Using kafka version: 0.11.0.1
21:16:07,109 WARN MaxwellMetrics - Metrics will not be exposed: metricsReportingType not configured.
21:16:07,380 INFO SchemaStoreSchema - Creating maxwell database
21:16:07,540 INFO Maxwell - Maxwell v?? is booting (RabbitmqProducer), starting at Position[BinlogPosition[mysql-bin.006235:24980714],
lastHeartbeat=0]
21:16:07,649 INFO AbstractSchemaStore - Maxwell is capturing initial schema
21:16:08,267 INFO BinlogConnectorReplicator - Setting initial binlog pos to: mysql-bin.006235:24980714
21:16:08,324 INFO BinaryLogClient - Connected to rm-xxxxxxxxxxx.mysql.rds.aliyuncs.com:3306 at mysql-bin.006235/24980714 (sid:637
9, cid:9182598)
21:16:08,325 INFO BinlogConnectorLifecycleListener - Binlog connected.
03:15:36,104 INFO ListWithDiskBuffer - Overflowed in-memory buffer, spilling over into /tmp/maxwell7935334910787514257events
03:17:14,880 INFO ListWithDiskBuffer - Overflowed in-memory buffer, spilling over into /tmp/maxwell3143086481692829045events
Но столкнулись с другой проблемой, переполнение, затем ненормальноеEventDataDeserializationException: Failed to deserialize data of EventHeaderV4
, Когда я запускаю другую позицию binlog до того, как maxwell укажет на начало синтаксического анализа, исключение не создается. Посмертные данные также показали, что данные не были потеряны.
Причина проблемы до сих пор неизвестна.Caused by: java.net.SocketException: Connection reset
, такое ощущение, что при чтении потока binlog не было прочитано полное событие, и соединение аварийно закрывается. Эта проблема относительно упорная, и аналогичные проблемы на github не достигли четкого решения. (Это также говорит нам со стороны о том, что миграция больших табличных данных также должна производиться пакетами, а не однимinsert into .. select
Возьми
03:18:20,586 INFO ListWithDiskBuffer - Overflowed in-memory buffer, spilling over into /tmp/maxwell5229190074667071141events
03:19:31,289 WARN BinlogConnectorLifecycleListener - Communication failure.
com.github.shyiko.mysql.binlog.event.deserialization.EventDataDeserializationException: Failed to deserialize data of EventHeaderV4{time
stamp=1514920657000, eventType=WRITE_ROWS, serverId=2115082720, headerLength=19, dataLength=8155, nextPosition=520539918, flags=0}
at com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.deserializeEventData(EventDeserializer.java:216) ~[mys
ql-binlog-connector-java-0.13.0.jar:0.13.0]
at com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.nextEvent(EventDeserializer.java:184) ~[mysql-binlog-c
onnector-java-0.13.0.jar:0.13.0]
at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:890) [mysql-binlog-connector-java-0
.13.0.jar:0.13.0]
at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:559) [mysql-binlog-connector-java-0.13.0.jar:0.13
.0]
at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:793) [mysql-binlog-connector-java-0.13.0.jar:0.13.0
]
at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121]
Caused by: java.net.SocketException: Connection reset
at java.net.SocketInputStream.read(SocketInputStream.java:210) ~[?:1.8.0_121]
at java.net.SocketInputStream.read(SocketInputStream.java:141) ~[?:1.8.0_121]
at com.github.shyiko.mysql.binlog.io.BufferedSocketInputStream.read(BufferedSocketInputStream.java:51) ~[mysql-binlog-connector-
java-0.13.0.jar:0.13.0]
at com.github.shyiko.mysql.binlog.io.ByteArrayInputStream.readWithinBlockBoundaries(ByteArrayInputStream.java:202) ~[mysql-binlo
g-connector-java-0.13.0.jar:0.13.0]
at com.github.shyiko.mysql.binlog.io.ByteArrayInputStream.read(ByteArrayInputStream.java:184) ~[mysql-binlog-connector-java-0.13
.0.jar:0.13.0]
at com.github.shyiko.mysql.binlog.io.ByteArrayInputStream.readInteger(ByteArrayInputStream.java:46) ~[mysql-binlog-connector-jav
a-0.13.0.jar:0.13.0]
at com.github.shyiko.mysql.binlog.event.deserialization.AbstractRowsEventDataDeserializer.deserializeLong(AbstractRowsEventDataD
eserializer.java:212) ~[mysql-binlog-connector-java-0.13.0.jar:0.13.0]
at com.github.shyiko.mysql.binlog.event.deserialization.AbstractRowsEventDataDeserializer.deserializeCell(AbstractRowsEventDataD
eserializer.java:150) ~[mysql-binlog-connector-java-0.13.0.jar:0.13.0]
at com.github.shyiko.mysql.binlog.event.deserialization.AbstractRowsEventDataDeserializer.deserializeRow(AbstractRowsEventDataDeserializer.java:132) ~[mysql-binlog-connector-java-0.13.0.jar:0.13.0]
at com.github.shyiko.mysql.binlog.event.deserialization.WriteRowsEventDataDeserializer.deserializeRows(WriteRowsEventDataDeserializer.java:64) ~[mysql-binlog-connector-java-0.13.0.jar:0.13.0]
at com.github.shyiko.mysql.binlog.event.deserialization.WriteRowsEventDataDeserializer.deserialize(WriteRowsEventDataDeserializer.java:56) ~[mysql-binlog-connector-java-0.13.0.jar:0.13.0]
at com.github.shyiko.mysql.binlog.event.deserialization.WriteRowsEventDataDeserializer.deserialize(WriteRowsEventDataDeserializer.java:32) ~[mysql-binlog-connector-java-0.13.0.jar:0.13.0]
at com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.deserializeEventData(EventDeserializer.java:210) ~[mysql-binlog-connector-java-0.13.0.jar:0.13.0]
... 5 more
03:19:31,514 INFO BinlogConnectorLifecycleListener - Binlog disconnected.
03:19:31,590 WARN BinlogConnectorReplicator - replicator stopped at position: mysql-bin.006236:520531744 -- restarting
03:19:31,595 INFO BinaryLogClient - Connected to rm-xxxxxx.mysql.rds.aliyuncs.com:3306 at mysql-bin.006236/520531744 (sid:6379, cid:9220521)
tableMapCache
Как упоминалось ранее, если я хочу получить только изменения binlog определенных таблиц, мне нужно использовать include_tables для фильтрации, но если таблица t1 удалена на сервере mysql, но мой binlog был прочитан со вчерашнего дня, тот, который был удалена Таблица t1 не может извлечь структуру таблицы при запуске maxwell. Тогда вчерашний бинлог имеет изменение t1, потому что структуру таблицы не найти, чтобы собрать ее в json, и будет выброшено исключение.
вручную вmaxwell.tables/columns
Внутри можно вставлять записи. Но корень этой проблемы в том, что maxwell обрабатывает row_event только при фильтрации binlog и требует, чтобы все таблицы в binlog имели tableMapCache.
Я (seanlook) отправил фиксацию и могу запросить кэширование таблиц include_dbs/tables только при выполнении tableMapCache: https://github.com/seanlook/maxwell/commit/2618b70303078bf910a1981b69943cca75ee04fb
Улучшить потребительские качества
При использовании rabbitmq,routing_key
да%db%.%table%
, но приращение binlog, генерируемое некоторыми таблицами, очень велико, что приведет к тому, что количество сообщений в каждой очереди будет очень неравномерным.В настоящее время, поскольку параллельное воспроизведение на уровне транзакции xid или thread_id еще не достигнуто, минимальная очередь гранулярность тоже таблица, попробуй поставить одну отдельно Очередь, а остальные небольшие объемы данных объединяются вместе.
binlog
Максвелл поддерживает некоторые из причин, по которым информация о бинарном журнале Максвелла вытесняется в библиотеку, например, из-заreset master;
, записи в библиотеке maxwell не соответствуют фактическому binlog, и будет сообщено об исключении Это нужно вручную исправить смещение binlog или напрямую очистить/удалить библиотеку maxwell для перестроения
com.github.shyiko.mysql.binlog.network.ServerException: Could not find first log file name in binary log index file
at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:885)
at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:564)
at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:796)
at java.lang.Thread.run(Thread.java:748)
а также
com.github.shyiko.mysql.binlog.network.ServerException: A slave with the same server_uuid/server_id as this slave has connected to the master; the first event 'mysql-bin.000001' at 760357, the last event read from './mysql-bin.000001' at 1888540, the last byte read from './mysql-bin.000001' at 1888540.
at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:885)
at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:564)
at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:796)
at java.lang.Thread.run(Thread.java:748)