Подробное объяснение инструмента синтаксического анализа MySQL Binlog Maxwell

MySQL

Подробное объяснение инструмента синтаксического анализа MySQL Binlog Maxwell

Максвелл Профиль

Maxwell — это приложение, которое может читать двоичный журнал двоичного журнала MySQL в режиме реального времени и генерировать сообщения в формате JSON в качестве производителя для отправки на Kafka, Kinesis, RabbitMQ, Redis, Google Cloud Pub/Sub, файловые или другие платформы. Его общие сценарии применения включают ETL, обслуживание кэша, сбор dml-индикаторов на уровне таблиц, инкрементный переход к поисковым системам, миграцию разделов данных и схемы отката binlog для сокращения базы данных. Официальный сайт (http://maxwells-daemon.io), GitHub (https://github.com/zendesk/maxwell)

Maxwell в основном предоставляет следующие функции:

  • служба поддержкиSELECT * FROM tableспособ инициализировать полный объем данных
  • Поддержка автоматического восстановления местоположения binlog (GTID) после аварийного переключения в основной библиотеке.
  • Данные могут быть секционированы для решения проблемы перекоса данных, а данные, отправляемые в Kafka, поддерживают секционирование данных на уровне базы данных, таблицы, столбца и других уровнях.
  • Работа маскируется под Slave, получает события binlog, а затем собирается в соответствии со схемами информации, может принимать различные события ddl, xid, row и т.д.

В дополнение к Maxwell, в настоящее время широко используемые инструменты для синтаксического анализа MySQL Binlog в основном включают канал Али и mysql_streamer. Три инструмента сравниваются следующим образом:

canal、maxwell、mysql_streamer对比

Canal разработан на Java и разделен на сервер и клиент, у него много производных приложений со стабильной производительностью и мощными функциями, canal должен написать свой собственный клиент для использования данных, проанализированных canal.

Преимущество maxwell перед canal в том, что он прост в использовании, он напрямую выводит изменения данных в виде строк json без необходимости писать клиент.

быстрый старт

Во-первых, MySQL должен включить BinLog сначала. Для чего есть Binlog Mysql, вы можете обратиться к статье »Введение в Бинлог MySQL

$ vi my.cnf

[mysqld]
server_id=1
log-bin=master
binlog_format=row

Создайте пользователя Maxwell и дайте некоторые разрешения для библиотеки maxwell.

CREATE USER 'maxwell'@'%' IDENTIFIED BY '123456';
GRANT ALL ON maxwell.* TO 'maxwell'@'%';
GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE on *.* to 'maxwell'@'%'; 

Вам нужно запустить kafka перед использованием maxwell

wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.1.0/kafka_2.11-2.1.0.tgz
tar -xzf kafka_2.11-2.1.0.tgz
cd kafka_2.11-2.1.0
# 启动Zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties

Перед запуском kafka на одной машине необходимо изменить файл конфигурации и открыть файл конфигурации.vi config/server.properties, добавьте в конец файлаadvertised.host.nameКонфигурация, значение — IP машины, на которой стоит кафка

advertised.host.name=10.100.97.246

В противном случае будет сообщено об исключении, когда maxwell будет запущен позже через докер (hadoop2 — это имя моего хоста)

17:45:21,446 DEBUG NetworkClient - [Producer clientId=producer-1] Error connecting to node hadoop2:9092 (id: 0 rack: null)
java.io.IOException: Can't resolve address: hadoop2:9092
        at org.apache.kafka.common.network.Selector.connect(Selector.java:217) ~[kafka-clients-1.0.0.jar:?]
        at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:793) [kafka-clients-1.0.0.jar:?]
        at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:230) [kafka-clients-1.0.0.jar:?]
        at org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:263) [kafka-clients-1.0.0.jar:?]
        at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:238) [kafka-clients-1.0.0.jar:?]
        at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:176) [kafka-clients-1.0.0.jar:?]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_181]
Caused by: java.nio.channels.UnresolvedAddressException
        at sun.nio.ch.Net.checkAddress(Net.java:101) ~[?:1.8.0_181]
        at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:622) ~[?:1.8.0_181]
        at org.apache.kafka.common.network.Selector.connect(Selector.java:214) ~[kafka-clients-1.0.0.jar:?]
        ... 6 more

Затем вы можете начать кафку

bin/kafka-server-start.sh config/server.properties

тест кафка

# 创建一个 topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

# 列出所有 topic
bin/kafka-topics.sh --list --zookeeper localhost:2181

# 启动一个生产者,然后随意发送一些消息
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message
This is another message

# 在另一个终端启动一下消费者,观察所消费的消息
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
This is a message
This is another message

Быстро установить и использовать Maxwell через докер (конечно, вам нужно установить докер самостоятельно)

# 拉取镜像 
docker pull zendesk/maxwell

# 启动maxwell,并将解析出的binlog输出到控制台
docker run -ti --rm zendesk/maxwell bin/maxwell --user='maxwell' --password='123456' --host='10.100.97.246' --producer=stdout

Протестируйте Максвелла, сначала создайте простую таблицу, затем добавляйте, изменяйте и удаляйте данные

CREATE TABLE `test` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `age` int(11) DEFAULT NULL,
  `name` varchar(255) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
 
insert into test values(1,22,"小旋锋");
update test set name='whirly' where id=1;
delete from test where id=1;

Наблюдайте за выводом консоли Docker.Из журнала вывода вы можете увидеть формат строки JSON binlog, проанализированной Maxwell.

{"database":"test","table":"test","type":"insert","ts":1552153502,"xid":832,"commit":true,"data":{"id":1,"age":22,"name":"小旋锋"}}
{"database":"test","table":"test","type":"update","ts":1552153502,"xid":833,"commit":true,"data":{"id":1,"age":22,"name":"whirly"},"old":{"name":"小旋锋"}}
{"database":"test","table":"test","type":"delete","ts":1552153502,"xid":834,"commit":true,"data":{"id":1,"age":22,"name":"whirly"}}

Вывод в кафка, Закрыть докер, сброс параметров запуска

docker run -it --rm zendesk/maxwell bin/maxwell --user='maxwell' \
    --password='123456' --host='10.100.97.246' --producer=kafka \
    --kafka.bootstrap.servers='10.100.97.246:9092' --kafka_topic=maxwell --log_level=debug

Затем запустите потребитель, чтобы потреблять сообщения темы maxwell и наблюдать за его выводом; снова выполните SQL для добавления, изменения и удаления данных, вы все равно можете получить тот же результат.

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic maxwell

Формат выходной строки JSON

  • Данные Последние данные, модифицированные данные
  • старые старые данные, данные до модификации
  • введите тип операции, здесь вставка, обновление, удаление, создание базы данных, изменение базы данных, удаление базы данных, создание таблицы, изменение таблицы, удаление таблицы, вставка начальной загрузки, int (неизвестный тип)
  • ID транзакции XID
  • commit Один и тот же xid представляет ту же транзакцию, и последний оператор транзакции будет иметь коммит, который можно использовать для воспроизведения транзакции.
  • server_id
  • thread_id
  • Добавьте параметр --output_ddl при запуске программы, вы можете захватить оператор ddl
  • Столбец даты и времени будет выводиться как «ГГГГ-ММ-ДД чч: мм: сс», если он встретит «0000-00-00 00:00:00», он будет выводиться как есть.
  • maxwell поддерживает несколько кодировок, но выводит только кодировку utf8
  • TIMESTAMP Максвелла всегда обрабатывается как UTC. Если вы хотите настроить свой собственный часовой пояс, вам нужно обработать его в логике сервера.

Конфигурация, связанная с выходным форматом, выглядит следующим образом.

опции значение параметра описывать По умолчанию
output_binlog_position BOOLEAN Включать ли позицию binlog false
output_gtid_position BOOLEAN Содержит позицию GTID false
output_commit_info BOOLEAN Включать ли коммит и XID true
output_xoffset BOOLEAN Включать ли виртуальное смещение TX-ряд false
output_nulls BOOLEAN Включать ли поля со значением NULL true
output_server_id BOOLEAN включать ли server_id false
output_thread_id BOOLEAN включать ли thread_id false
output_schema_id BOOLEAN включать ли schema_id false
output_row_query BOOLEAN Содержит операторы вставки / обновления / удаления. MySQL нужно открытьbinlog_rows_query_log_events false
output_ddl BOOLEAN Включать ли события DDL (table-alter, table-create и т. д.) false
output_null_zerodates BOOLEAN Преобразовать «0000-00-00» в ноль? false

Расширенное использование

базовая конфигурация

опции значение параметра описывать По умолчанию
config конфигурационный файлconfig.propertiesмаршрут
log_level [debug | info | warn | error] уровень журнала info
daemon Укажите экземпляр Maxwell для запуска в фоновом режиме в качестве демона.
env_config_prefix STRING Переменные среды, соответствующие этому префиксу, будут рассматриваться как значения конфигурации.

Вы можете записать параметры запуска Maxwell в файл конфигурацииconfig.properties, затем указывается с помощью параметра конфигурации,bin/maxwell --config config.properties

user=maxwell
password=123456
host=10.100.97.246
producer=kafka
kafka.bootstrap.servers=10.100.97.246:9092
kafka_topic=maxwell

Параметры конфигурации MySQL

Максвелл делит MySQL на 3 роли в зависимости от использования:

  • host: размещение, создание библиотечной таблицы maxwell, сохранение захваченной схемы и другой информации.

    • В основном есть шесть таблиц, начальная загрузка используется для инициализации данных, схемы записывают всю информацию о файле binlog, базы данных записывают всю информацию о базе данных, таблицы записывают всю информацию о таблицах, столбцы записывают всю информацию о полях, а записи о позициях считывают информацию о смещении binlog, Heartbeats записывает сердцебиение. Информация
  • replication_host: Скопируйте хост, отслеживайте события, читайте бинарный журнал хоста.

    • будетhostиreplication_hostОтдельно, чтобы избежатьreplication_userЗапись данных в производственный репозиторий
  • schema_host:schema host, хост, который фиксирует схему структуры таблицы

    • В binlog нет информации о полях, поэтому maxwell нужно найти схему в базе данных и сохранить ее.
    • schema_hostОбычно не используется, но вbinlog-proxyСценарий очень практичный. Чтобы иметь такой автономный поток binlog JSON, сгенерированный Maxwell, поэтому нет собственного сервера mysql, структура для передачи только binlog, эта таблица времени, полученная из тормозного механизма, может schema_host.

Обычно эти три хоста одинаковы.schema_hostЕсть толькоreplication_hostпри использовании.

Следующие конфигурации относятся к MySQL

опции значение параметра описывать По умолчанию
host STRING адрес mysql localhost
user STRING имя пользователя mysql
password STRING пароль mysql (no password)
port INT mysql порт 3306
jdbc_options STRING mysql jdbc connection options DEFAULT_JDBC_OPTS
ssl SSL_OPT SSL behavior for mysql cx DISABLED
schema_database STRING База данных, которая будет использоваться схемой и позицией, которую Максвелл использует для поддержания maxwell
client_id STRING Уникальная строка, используемая для идентификации экземпляра Maxwell. maxwell
replica_server_id LONG Уникальный номер, используемый для идентификации экземпляра Maxwell. 6379 (see notes)
master_recovery BOOLEAN enable experimental master recovery code false
gtid_mode BOOLEAN Следует ли открывать репликацию на основе GTID false
recapture_schema BOOLEAN Восстановить последнюю структуру таблицы (схему), не настраиваемую в config.properties false
replication_host STRING server to replicate from. See split server roles schema-store host
replication_password STRING password on replication server (none)
replication_port INT port on replication server 3306
replication_user STRING user on replication server
replication_ssl SSL_OPT SSL behavior for replication cx cx DISABLED
schema_host STRING server to capture schema from. See split server roles schema-store host
schema_password STRING password on schema-capture server (none)
schema_port INT port on schema-capture server 3306
schema_user STRING user on schema-capture server
schema_ssl SSL_OPT SSL behavior for schema-capture server DISABLED

Конфигурация производителя

Представлена ​​только kafka, а конфигурация других производителей подробно описана в официальной документации.

Kafka — это Maxwell, поддерживающий наиболее полный производитель и имеющий множество версий клиентов Kafka (0.8.2.2, 0.9.0.1, 0.10.0.1, 0.10.0.0.), по умолчанию kafka_version = 1.0.0 (текущая версия Maxwell 1.20.0)

Максвелл доставит сообщение на тему Кафки, темуkafka_topicОпция указана, значение по умолчаниюmaxwellВ дополнение к указанию статической темы вы также можете указать динамическую, напримерnamespace_%{database}_%{table},%{database}и%{table}Будет заменена базой данных и таблицей для конкретных сообщений.

Когда Максвелл читает конфигурацию, если элемент конфигурацииkafka.В начале конфигурация будет настроена на параметры подключения клиента Kafka Producer, такие как

kafka.acks = 1
kafka.compression.type = snappy
kafka.retries=5

Ниже приведены параметры конфигурации для универсального производителя Maxwell и производителя Kafka.

опции значение параметра описывать По умолчанию
producer PRODUCER_TYPE тип производителя stdout
custom_producer.factory CLASS_NAME Заводской класс для пользовательского потребителя
producer_ack_timeout PRODUCER_ACK_TIMEOUT Асинхронное потребление учитывает тайм-аут потери сообщения (мс)
producer_partition_by PARTITION_BY Функция разделения для ввода в kafka/kinesis database
producer_partition_columns STRING Если столбец, имена столбцов, разделенные запятыми
producer_partition_by_fallback PARTITION_BY_FALLBACK producer_partition_by=columnтребуется, когда столбец не существует используется
ignore_producer_error BOOLEAN При значении false выход из программы при возникновении ошибки в kafka/kinesis, при значении true записывается только лог См. такжеdead_letter_topic true
kafka.bootstrap.servers STRING список кластеров кафки,HOST:PORT[,HOST:PORT]
kafka_topic STRING kafka topic maxwell
dead_letter_topic STRING Подробности смотрите в официальной документации
kafka_version KAFKA_VERSION Указывает клиентскую версию maxwell производителя kafka, которую нельзя настроить в config.properties. 0.11.0.1
kafka_partition_hash [default | murmur3] Метод хеширования для использования при выборе разделов kafka default
kafka_key_format [array | hash] how maxwell outputs kafka keys, either a hash or an array of hashes hash
ddl_kafka_topic STRING когдаoutput_ddlПри значении true все сообщения DDL будут доставляться в этот раздел. kafka_topic

конфигурация фильтра

Максвелл может пройти--filterЭлементы конфигурации для указания правил фильтрации черезexcludeисключить, поincludeСодержит, значение может быть определенной базой данных, таблицей данных, столбцом данных или даже использовать Javascript для определения сложных правил фильтрации; это может быть описано регулярными выражениями, есть несколько примеров с официального сайта

# 仅匹配foodb数据库的tbl表和所有table_数字的表
--filter='exclude: foodb.*, include: foodb.tbl, include: foodb./table_\d+/'
# 排除所有库所有表,仅匹配db1数据库
--filter = 'exclude: *.*, include: db1.*'
# 排除含db.tbl.col列值为reject的所有更新
--filter = 'exclude: db.tbl.col = reject'
# 排除任何包含col_a列的更新
--filter = 'exclude: *.*.col_a = *'
# blacklist 黑名单,完全排除bad_db数据库,若要恢复,必须删除maxwell库
--filter = 'blacklist: bad_db.*' 

Инициализация данных

После начала Maxwell позиция последней остановки будет получена из библиотеки MaxWell, а Binlog будет прочитан с точки останова. Если бинлог был очищен, как можно скопировать весь стол через Maxwell? То есть как делать инициализацию данных?

Оперировать всей таблицей и искусственно генерировать бинлог? Например, найти поле, которое не влияет на бизнес, такое как update_time, затем добавить одну секунду, а затем вычесть одну секунду?

update test set update_time = DATE_ADD(update_time,intever 1 second);
update test set update_time = DATE_ADD(update_time,intever -1 second);

Очевидно, что здесь есть несколько больших проблем:

  • Что делать, если неважное поле не существует? Каждое поле важно и не может быть изменено случайно!
  • Если вся таблица большая, процесс модификации займет много времени и повлияет на бизнес!
  • Будет генерировать много некоммерческого бинлога!

Maxwell предоставляет командный инструмент для проблемы инициализации данныхmaxwell-bootstrapПомогите нам завершить инициализацию данных,maxwell-bootstrapоснован наSELECT * FROM tableспособ инициализировать полный объем данных без создания избыточного бинлога!

Этот инструмент имеет следующие параметры:

параметр инструкция
--log_level LOG_LEVEL Уровень журнала (DEBUG, INFO, WARN или ERROR)
--user USER имя пользователя mysql
--password PASSWORD пароль mysql
--host HOST адрес mysql
--port PORT порт mysql
--database DATABASE База данных, в которой находится таблица для начальной загрузки
--table TABLE стол для загрузки
--where WHERE_CLAUSE установить фильтры
--client_id CLIENT_ID Указывает экземпляр Maxwell для выполнения действия начальной загрузки.

Поэкспериментируйте, следующее поможетtestв базе данныхtestТаблица, первая заключается в подготовке нескольких фрагментов данных для тестирования

INSERT INTO `test` VALUES (1, 1, '1');
INSERT INTO `test` VALUES (2, 2, '2');
INSERT INTO `test` VALUES (3, 3, '3');
INSERT INTO `test` VALUES (4, 4, '4');

потомreset master;Очистить бинлог, удалить таблицы в библиотеке maxwell. Затем используйте команды быстрого запуска, чтобы запустить потребителей Kafka, Maxwell и Kafka, затем запуститеmaxwell-bootstrap

docker run -it --rm zendesk/maxwell bin/maxwell-bootstrap --user maxwell  \
    --password 123456 --host 10.100.97.246  --database test --table test --client_id maxwell

Уведомление:--bootstrapper=syncПри обработке Bootstrap будет заблокирован обычный разбор Binlog;--bootstrapper=asyncне заблокирует.

Вы также можете выполнить следующий SQL вmaxwell.bootstrapВставить записи в таблицу, запустить вручную

insert into maxwell.bootstrap (database_name, table_name) values ('test', 'test');

Вы можете увидеть управляемые данные на стороне потребителя kafka.

{"database":"maxwell","table":"bootstrap","type":"insert","ts":1552199115,"xid":36738,"commit":true,"data":{"id":3,"database_name":"test","table_name":"test","where_clause":null,"is_complete":0,"inserted_rows":0,"total_rows":0,"created_at":null,"started_at":null,"completed_at":null,"binlog_file":null,"binlog_position":0,"client_id":"maxwell"}}
{"database":"test","table":"test","type":"bootstrap-start","ts":1552199115,"data":{}}
{"database":"test","table":"test","type":"bootstrap-insert","ts":1552199115,"data":{"id":1,"age":1,"name":"1"}}
{"database":"test","table":"test","type":"bootstrap-insert","ts":1552199115,"data":{"id":2,"age":2,"name":"2"}}
{"database":"test","table":"test","type":"bootstrap-insert","ts":1552199115,"data":{"id":3,"age":3,"name":"3"}}
{"database":"test","table":"test","type":"bootstrap-insert","ts":1552199115,"data":{"id":4,"age":4,"name":"4"}}
{"database":"maxwell","table":"bootstrap","type":"update","ts":1552199115,"xid":36756,"commit":true,"data":{"id":3,"database_name":"test","table_name":"test","where_clause":null,"is_complete":1,"inserted_rows":4,"total_rows":0,"created_at":null,"started_at":"2019-03-10 14:25:15","completed_at":"2019-03-10 14:25:15","binlog_file":"mysql-bin.000001","binlog_position":64446,"client_id":"maxwell"},"old":{"is_complete":0,"inserted_rows":1,"completed_at":null}}
{"database":"test","table":"test","type":"bootstrap-complete","ts":1552199115,"data":{}}

4 посередине этоtest.testданные binlog, обратите внимание, что тип здесьbootstrap-insert, вместоinsert.

Затем снова проверьте бинлог,show binlog events;, вы обнаружите, что только сmaxwellсвязанный binlog, и нетtest.testсвязанный бинлог, поэтомуmaxwell-bootstrapКоманда не создает дополнительный двоичный журнал, что будет более очевидно при большом количестве листов данных.

Процесс начальной загрузкиbootstrap-start -> bootstrap-insert -> bootstrap-complete, где поля данных начала и завершения пусты и не содержат данных.

В процессе бутстрапа, если максвелл крашится, при перезагрузке бутстрап запустится заново, сколько бы он ни был сделан до этого.Если вы этого не хотите, то можете задать это в базе данныхis_completeЗначение поля равно 1 (готово), или строка удалена

Максвелл Мониторинг

Максвелл предоставляетbase logging mechanism, JMX, HTTP or by push to DatadogДля этих четырех методов мониторинга элементы конфигурации, связанные с мониторингом, следующие:

опции значение параметра описывать По умолчанию
metrics_prefix STRING префикс индикатора MaxwellMetrics
metrics_type [slf4j | jmx | http | datadog] Как публиковать метрики
metrics_jvm BOOLEAN Собирать информацию о JVM false
metrics_slf4j_interval SECONDS Как часто метрика зарегистрирована в журнал,metrics_typeДолжен быть настроен как slf4j 60
http_port INT metrics_typeЕсли это http, порт, на который публикуется индикатор 8080
http_path_prefix STRING префикс пути http /
http_bind_address STRING http публикует адрес, привязанный к индикатору all addresses
http_diagnostic BOOLEAN Включает ли http диагностический суффикс false
http_diagnostic_timeout MILLISECONDS HTTP диагностическое время отклика 10000
metrics_datadog_type [udp | http] metrics_typeКак публиковать метрики для datadog udp
metrics_datadog_tags STRING Теги, предоставляемые datadog, такие как tag1:value1,tag2:value2
metrics_datadog_interval INT Как часто отправлять метрики в datadog, в секундах 60
metrics_datadog_apikey STRING когдаmetrics_datadog_type=httpКлюч API, используемый datadog
metrics_datadog_host STRING когдаmetrics_datadog_type=udpНажмите индекс, когда целевой адрес localhost
metrics_datadog_port INT когдаmetrics_datadog_type=udpПуш-индикатор порта времени 8125

Какие конкретные показатели мониторинга можно получить? Есть следующие, обратите внимание, что все индикаторы предварительно настроены с префиксами индикатораmetrics_prefix

показатель тип инструкция
messages.succeeded Counters Количество сообщений, успешно отправленных в kafka
messages.failed Counters Количество сообщений, которые не удалось отправить
row.count Counters Количество обработанных строк бинлога, обратите внимание, что не все бинлоги отправляются в kafka
messages.succeeded.meter Meters Скорость, с которой сообщения были успешно отправлены в Kafka
messages.failed.meter Meters Скорость, с которой отправка сообщения не удается кафке
row.meter Meters Скорость, с которой строки поступают в maxwell из коннектора binlog
replication.lag Gauges Время (в миллисекундах), прошедшее между фиксацией транзакции базы данных и обработкой транзакции Maxwell.
inflightmessages.count Gauges Количество сообщений, обрабатываемых в данный момент (ожидающих подтверждения от получателя или до сообщения)
message.publish.time Timers Отправьте время (миллисекунды) в Кафку
message.publish.age Timers Время (в миллисекундах) между генерацией события из базы данных и его отправкой в ​​Kafka с точностью +/- 500 мс.
replication.queue.time Timers Время в миллисекундах для отправки события binlog в очередь обработки

Некоторые из вышеперечисленных индикаторов специфичны для Kafka и не поддерживаются всеми производителями.

Экспериментируйте, получайте показатели мониторинга через http

docker run -p 8080:8080 -it --rm zendesk/maxwell bin/maxwell --user='maxwell' \
    --password='123456' --host='10.100.97.246' --producer=kafka \
    --kafka.bootstrap.servers='10.100.97.246:9092' --kafka_topic=maxwell --log_level=debug \
    --metrics_type=http  --metrics_jvm=true --http_port=8080

Большая часть приведенной выше конфигурации такая же, как и предыдущая, разница-p 8080:8080сопоставление портов докера и--metrics_type=http --metrics_jvm=true --http_port=8080, настроить публикацию индикаторов через http, включить сбор информации JVM, порт 8080, а дальше можно проходитьhttp://10.100.97.246:8080/metricsДоступны все показатели

Maxwell监控

В методе http есть четыре суффикса, соответствующие четырем различным форматам.

endpoint инструкция
/metrics Все метрики возвращаются в формате JSON
/prometheus Все метрики возвращаются в формате Prometheus (Prometheus представляет собой комбинацию открытого исходного кода для мониторинга, оповещения и баз данных временных рядов).
/healthcheck Возвращает, был ли Максвелл здоров в течение последних 15 минут.
/ping Простой тест, который возвращаетpong

Если вы собираете индикаторы мониторинга Maxwell через JMX, вы можетеJAVA_OPTSПеременные среды Настройка доступа к JMX

export JAVA_OPTS="-Dcom.sun.management.jmxremote \
-Dcom.sun.management.jmxremote.port=9010 \
-Dcom.sun.management.jmxremote.local.only=false \
-Dcom.sun.management.jmxremote.authenticate=false \
-Dcom.sun.management.jmxremote.ssl=false \
-Djava.rmi.server.hostname=10.100.97.246"

Несколько экземпляров Maxwell

В различных конфигурациях Maxwell может запускать несколько экземпляров на одном и том же главном сервере. Это полезно, если вы хотите, чтобы производитель работал в другой конфигурации, например, для доставки событий из разных групп таблиц в разные темы. Каждый экземпляр Maxwell должен быть настроен с уникальным идентификатором client_id, чтобы различать расположение binlog.

Поддержка GTID.

Maxwell поддерживает репликацию на основе GTID, начиная с версии 1.8.0 (GTID-based replication), в режиме GTID Maxwell будет прозрачно выбирать новое место для репликации после смены хоста.

Что такое репликация GTID?

GTID (Global Transaction ID) — это номер совершенной транзакции, который является глобальным уникальным номером.

Начиная с MySQL 5.6.5, был добавлен метод репликации на основе GTID. GTID гарантирует, что каждая транзакция, зафиксированная на мастере, имеет уникальный идентификатор в кластере. Такой подход усиливает согласованность базы данных «главный-подчиненный», восстановление после сбоев и отказоустойчивость.

В исходной двоичной репликации на основе журнала подчиненная библиотека должна сообщить главной библиотеке, с какого смещения следует выполнять добавочную синхронизацию.Если указанная ошибка неверна, это приведет к пропуску данных, что приведет к несогласованности данных. С помощью GTID в случае переключения master-slave другие ведомые библиотеки MySQL могут автоматически находить правильное место репликации в новой master базе данных, что значительно упрощает обслуживание кластера при сложной топологии репликации и снижает вероятность искусственного установка места репликации Риск неправильного использования. Кроме того, репликация на основе GTID может игнорировать уже выполненные транзакции, что снижает риск несогласованности данных.

Меры предосторожности

timestamp column

maxwell для типов времени (datetime, timestamp, date)Рассматривать как строкуДа, это также необходимо для обеспечения согласованности данных (например,0000-00-00 00:00:00Такое время является недопустимым в метке времени, но mysql распознает, что оно равно null/None при анализе в типе java или python).

Если поле в таблице MySQL имеет тип временной метки, это концепция часового пояса,Бинлог анализирует стандартное время UTC.Но пользователь видит местное время. Напримерf_create_time timestampВремя создания - пекинское время.2018-01-05 21:01:01, то на самом деле mysql хранит2018-01-05 13:01:01, что также является строкой времени в binlog. Если нет потребителя и нет преобразования часового пояса, это будет на 8 часов меньше.

Вместо того, чтобы рассматривать эту проблему для каждого клиента, я думаю, что более разумно предоставить параметры часового пояса, а затем maxwell автоматически обрабатывает проблему часового пояса, в противном случае клиенту нужно сначала знать, какие столбцы являются типами меток времени, или подключиться к оригинальный библиотечный кеш для этих типов.

binary column

maxwell может обрабатывать столбцы двоичного типа, такие как blob, varbinary, и его подход заключается в использовании двоичных столбцов.base64_encode, как строковый вывод в json. После того, как потребители получат этот столбец данных, они не могут быть собраны напрямую, им нужноbase64_decode.

Структура таблицы не синхронизирована

Если вы возьмете более старый бинлог и поместите его на новый сервер mysql и используете maxwell для его извлечения, возможно, структура таблицы изменилась, например соотношение полей в бинлогеschema_hostВ нем есть еще одно поле. На данный момент никаких отклонений от нормы в этой ситуации обнаружено не было, например, Alibaba RDS по умолчанию добавит таблицу без первичного ключа и без уникального индекса.__##alibaba_rds_rowid##__,существуетshow create tableиschemaВ ней этого скрытого первичного ключа не видно, но он будет в бинлоге, который будет синхронизирован с ведомой библиотекой.

Кроме того, мы управляли версией структуры через git, если есть такой сценарий, мы также можем справиться с этим.

Большая транзакция Binlog

Когда объем двоичного журнала, сгенерированного транзакцией, очень велик, например при переносе данных ежедневной таблицы, для контроля использования памяти maxwell автоматически помещает необработанный двоичный журнал в файловую систему.

Using kafka version: 0.11.0.1
21:16:07,109 WARN  MaxwellMetrics - Metrics will not be exposed: metricsReportingType not configured.
21:16:07,380 INFO  SchemaStoreSchema - Creating maxwell database
21:16:07,540 INFO  Maxwell - Maxwell v?? is booting (RabbitmqProducer), starting at Position[BinlogPosition[mysql-bin.006235:24980714],
lastHeartbeat=0]
21:16:07,649 INFO  AbstractSchemaStore - Maxwell is capturing initial schema
21:16:08,267 INFO  BinlogConnectorReplicator - Setting initial binlog pos to: mysql-bin.006235:24980714
21:16:08,324 INFO  BinaryLogClient - Connected to rm-xxxxxxxxxxx.mysql.rds.aliyuncs.com:3306 at mysql-bin.006235/24980714 (sid:637
9, cid:9182598)
21:16:08,325 INFO  BinlogConnectorLifecycleListener - Binlog connected.
03:15:36,104 INFO  ListWithDiskBuffer - Overflowed in-memory buffer, spilling over into /tmp/maxwell7935334910787514257events
03:17:14,880 INFO  ListWithDiskBuffer - Overflowed in-memory buffer, spilling over into /tmp/maxwell3143086481692829045events

Но столкнулись с другой проблемой, переполнение, затем ненормальноеEventDataDeserializationException: Failed to deserialize data of EventHeaderV4, Когда я запускаю другую позицию binlog до того, как maxwell укажет на начало синтаксического анализа, исключение не создается. Посмертные данные также показали, что данные не были потеряны.

Причина проблемы до сих пор неизвестна.Caused by: java.net.SocketException: Connection reset, такое ощущение, что при чтении потока binlog не было прочитано полное событие, и соединение аварийно закрывается. Эта проблема относительно упорная, и аналогичные проблемы на github не достигли четкого решения. (Это также говорит нам со стороны о том, что миграция больших табличных данных также должна производиться пакетами, а не однимinsert into .. selectВозьми

03:18:20,586 INFO  ListWithDiskBuffer - Overflowed in-memory buffer, spilling over into /tmp/maxwell5229190074667071141events
03:19:31,289 WARN  BinlogConnectorLifecycleListener - Communication failure.
com.github.shyiko.mysql.binlog.event.deserialization.EventDataDeserializationException: Failed to deserialize data of EventHeaderV4{time
stamp=1514920657000, eventType=WRITE_ROWS, serverId=2115082720, headerLength=19, dataLength=8155, nextPosition=520539918, flags=0}
        at com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.deserializeEventData(EventDeserializer.java:216) ~[mys
ql-binlog-connector-java-0.13.0.jar:0.13.0]
        at com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.nextEvent(EventDeserializer.java:184) ~[mysql-binlog-c
onnector-java-0.13.0.jar:0.13.0]
        at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:890) [mysql-binlog-connector-java-0
.13.0.jar:0.13.0]
        at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:559) [mysql-binlog-connector-java-0.13.0.jar:0.13
.0]
        at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:793) [mysql-binlog-connector-java-0.13.0.jar:0.13.0
]
        at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121]
Caused by: java.net.SocketException: Connection reset
        at java.net.SocketInputStream.read(SocketInputStream.java:210) ~[?:1.8.0_121]
        at java.net.SocketInputStream.read(SocketInputStream.java:141) ~[?:1.8.0_121]
        at com.github.shyiko.mysql.binlog.io.BufferedSocketInputStream.read(BufferedSocketInputStream.java:51) ~[mysql-binlog-connector-
java-0.13.0.jar:0.13.0]
        at com.github.shyiko.mysql.binlog.io.ByteArrayInputStream.readWithinBlockBoundaries(ByteArrayInputStream.java:202) ~[mysql-binlo
g-connector-java-0.13.0.jar:0.13.0]
        at com.github.shyiko.mysql.binlog.io.ByteArrayInputStream.read(ByteArrayInputStream.java:184) ~[mysql-binlog-connector-java-0.13
.0.jar:0.13.0]
        at com.github.shyiko.mysql.binlog.io.ByteArrayInputStream.readInteger(ByteArrayInputStream.java:46) ~[mysql-binlog-connector-jav
a-0.13.0.jar:0.13.0]
        at com.github.shyiko.mysql.binlog.event.deserialization.AbstractRowsEventDataDeserializer.deserializeLong(AbstractRowsEventDataD
eserializer.java:212) ~[mysql-binlog-connector-java-0.13.0.jar:0.13.0]
        at com.github.shyiko.mysql.binlog.event.deserialization.AbstractRowsEventDataDeserializer.deserializeCell(AbstractRowsEventDataD
eserializer.java:150) ~[mysql-binlog-connector-java-0.13.0.jar:0.13.0]
        at com.github.shyiko.mysql.binlog.event.deserialization.AbstractRowsEventDataDeserializer.deserializeRow(AbstractRowsEventDataDeserializer.java:132) ~[mysql-binlog-connector-java-0.13.0.jar:0.13.0]
        at com.github.shyiko.mysql.binlog.event.deserialization.WriteRowsEventDataDeserializer.deserializeRows(WriteRowsEventDataDeserializer.java:64) ~[mysql-binlog-connector-java-0.13.0.jar:0.13.0]
        at com.github.shyiko.mysql.binlog.event.deserialization.WriteRowsEventDataDeserializer.deserialize(WriteRowsEventDataDeserializer.java:56) ~[mysql-binlog-connector-java-0.13.0.jar:0.13.0]
        at com.github.shyiko.mysql.binlog.event.deserialization.WriteRowsEventDataDeserializer.deserialize(WriteRowsEventDataDeserializer.java:32) ~[mysql-binlog-connector-java-0.13.0.jar:0.13.0]
        at com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.deserializeEventData(EventDeserializer.java:210) ~[mysql-binlog-connector-java-0.13.0.jar:0.13.0]
        ... 5 more
03:19:31,514 INFO  BinlogConnectorLifecycleListener - Binlog disconnected.
03:19:31,590 WARN  BinlogConnectorReplicator - replicator stopped at position: mysql-bin.006236:520531744 -- restarting
03:19:31,595 INFO  BinaryLogClient - Connected to rm-xxxxxx.mysql.rds.aliyuncs.com:3306 at mysql-bin.006236/520531744 (sid:6379, cid:9220521)

tableMapCache

Как упоминалось ранее, если я хочу получить только изменения binlog определенных таблиц, мне нужно использовать include_tables для фильтрации, но если таблица t1 удалена на сервере mysql, но мой binlog был прочитан со вчерашнего дня, тот, который был удалена Таблица t1 не может извлечь структуру таблицы при запуске maxwell. Тогда вчерашний бинлог имеет изменение t1, потому что структуру таблицы не найти, чтобы собрать ее в json, и будет выброшено исключение.

вручную вmaxwell.tables/columnsВнутри можно вставлять записи. Но корень этой проблемы в том, что maxwell обрабатывает row_event только при фильтрации binlog и требует, чтобы все таблицы в binlog имели tableMapCache.

Я (seanlook) отправил фиксацию и могу запросить кэширование таблиц include_dbs/tables только при выполнении tableMapCache: https://github.com/seanlook/maxwell/commit/2618b70303078bf910a1981b69943cca75ee04fb

Улучшить потребительские качества

При использовании rabbitmq,routing_keyда%db%.%table%, но приращение binlog, генерируемое некоторыми таблицами, очень велико, что приведет к тому, что количество сообщений в каждой очереди будет очень неравномерным.В настоящее время, поскольку параллельное воспроизведение на уровне транзакции xid или thread_id еще не достигнуто, минимальная очередь гранулярность тоже таблица, попробуй поставить одну отдельно Очередь, а остальные небольшие объемы данных объединяются вместе.

binlog

Максвелл поддерживает некоторые из причин, по которым информация о бинарном журнале Максвелла вытесняется в библиотеку, например, из-заreset master;, записи в библиотеке maxwell не соответствуют фактическому binlog, и будет сообщено об исключении Это нужно вручную исправить смещение binlog или напрямую очистить/удалить библиотеку maxwell для перестроения

com.github.shyiko.mysql.binlog.network.ServerException: Could not find first log file name in binary log index file
        at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:885)
        at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:564)
        at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:796)
        at java.lang.Thread.run(Thread.java:748)

а также

com.github.shyiko.mysql.binlog.network.ServerException: A slave with the same server_uuid/server_id as this slave has connected to the master; the first event 'mysql-bin.000001' at 760357, the last event read from './mysql-bin.000001' at 1888540, the last byte read from './mysql-bin.000001' at 1888540.
        at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:885)
        at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:564)
        at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:796)
        at java.lang.Thread.run(Thread.java:748)

Справочная документация

关注_小旋锋_微信公众号