1. Предпосылки
С постепенным развитием Mafengwo у нас появляется все больше и больше бизнес-данных.Простое использование MySQL больше не может удовлетворить наши потребности в запросах данных, таких как многомерный поиск данных, таких как товары и заказы.
Использование Elasticsearch для хранения бизнес-данных вполне может решить потребности в поиске в нашем бизнесе. После разнородного хранения данных возникает проблема синхронизации данных.
2. Существующие методы и проблемы
Для синхронизации данных нашим текущим решением является настройка таблицы центра обработки данных. Необходимость извлечения бизнес-данных, объединенных в таблицу MySQL, эта таблица соответствует средним бизнес-потребностям индекса Elasticsearch, и каждый столбец соответствует индексу сопоставления полей. С помощью скрипта Crontab, считывает UTime промежуточной таблицы MySQL больше, чем время последнего чтения всех данных, т. е. в пределах периода приращения времени, записываемого Elasticsearch.
Следовательно, после того, как данные соответствующих полей изменяется в бизнес-логике, изменение промежуточной таблицы MySQL необходимо учитывать одновременно; если данные в Elasticsearch должны быть более непосредственными, она должна быть написана на Elasticsearch в то же время.
С ростом количества бизнес-данных увеличивается и объем данных в промежуточных таблицах MySQL. Когда в индекс Elasticsearch необходимо добавить новое поле сопоставления, в соответствующую промежуточную таблицу MySQL также необходимо добавить новый столбец.В таблице с огромным объемом данных на расширение столбца уходит много времени.
Более того, поля Mapping в индексе Elasticsearch увеличиваются по мере развития бизнеса, и бизнес-стороне необходимо добавить соответствующий метод записи в промежуточную таблицу MySQL, что также влечет за собой некоторые затраты на разработку.
3. Дизайн схемы
1. Общая идея
Некоторые существующие инструменты синхронизации данных с открытым исходным кодом, такие как DataX от Alibaba, в основном основаны на запросах для получения источников данных, что приведет к проблеме определения приращения (например, использование поля utime для решения и т. д.) и частота опроса, а некоторые из наших бизнес-сценариев предъявляют высокие требования к синхронизации данных в режиме реального времени. Чтобы решить эту проблему, мы предлагаем метод, основанный на MySQL Binlog для синхронизации данных MySQL с идеями Elasticsearch. Binlog по протоколу репликации MySQL используется для синхронизации данных из данных, вызывающих выстрелы, поэтому нам необходимо, чтобы в него записывались данные Elasticsearch и выполнялись требования своевременности синхронизации данных.
Использование данных BINLOG для синхронизации Elasticsearch бизнес-сторона может сосредоточиться на работе бизнес-логики на MySQL и больше не заботится о проблеме синхронизации данных на elasticsearch, что снижает ненужный код синхронизации и позволяет избежать длительной проблемы с трудоустройством расширения Промежуточные столбцы таблицы.
После исследования мы используем Project Project Project Go-Mysql-Elasticsearch и ряд пользовательских разработанных для стопки сотовой связи лошадей и реальной бизнес-среды.
2. Гарантия правильности синхронизации данных
Данные Binlog всех таблиц компании являются конфиденциальными данными и не могут быть получены напрямую.Для удовлетворения потребностей каждого направления бизнеса они предоставляются пользователю в виде доступа к Kafka, и пользователю необходимо подать заявку на соответствующее разрешение на использование данных Binlog. После получения разрешения на использование потребитель читает его в форме группы потребителей.
Этот метод обеспечивает безопасность данных Binglog, но создает проблемы с правильностью синхронизации данных. Поэтому мы разработали некоторые механизмы для обеспечения упорядоченного и полного получения источников данных.
1).Последовательный
Чтобы получить данные BINLOG через KAFKA, сначала необходимо обеспечить последовательность полученных данных. Строго говоря, Кафка не может гарантировать порядок глобальных сообщений, только локальный заказ, поэтому он не может гарантировать, что все данные Binlog могут прибыть на потребителя в порядке.
Но данные по каждому разделу заказываются. Чтобы получить Binglog каждого ряда записей MySQL в порядке, мы хвом каждый Binlog к каждому разделу в соответствии с его первичным ключом, чтобы убедиться, что все данные Binlog той же записи MySQL отправляются в тот же раздел.
В случае нескольких потребителей раздел будет назначен только одному потребителю, что также гарантирует упорядоченное обновление данных в разделе в Elasticsearch.
2). Честность
Учитывая, что программа синхронизации может столкнуться с различными нормальными или аварийными выходами, а также перебалансировкой при изменении количества потребителей, нам необходимо убедиться, что данные Binlog не могут быть потеряны ни при каких обстоятельствах.
Используя механизм смещения Kafka, после подтверждения того, что данные сообщения были успешно записаны в Elasticsearch, смещение сообщения принимает значение Commit, что обеспечивает целостность данных. Для сценария использования синхронизации данных повторное потребление не повлияет на последовательность и целостность данных.
4. Техническая реализация
1. Функциональный модуль
Настроить модуль парсинга
Отвечает за синтаксический анализ файлов конфигурации (в формате toml или json) или строк json, настроенных в центре конфигурации (Skipper). В том числе конфигурация кластера Kafka, конфигурация адреса Elasticsearch, конфигурация метода ведения журнала, таблица и поле базы данных MySQL, а также конфигурация соответствия индекса и сопоставления Elasticsearch и т. д.
модуль правил
Модуль правил определяет, в какой индекс Elasticsearch должен быть записан фрагмент данных Binlog, поле MySQL, соответствующее _id документа, соответствующую связь между каждым полем MySQL в Binlog и отображением индекса, а также тип записи и т. д.
Во время процесса локализации, согласно нашему бизнесу, в соответствии с нашим бизнес-сценарием, мы добавили, где условия суждения за каждое поле MySQL Table отфильтровывают ненужные данные Binlog.
Модули, связанные с Kafka
Этот модуль отвечает за подключение к кластеру Kafka и получение данных Binlog.
В процессе локализации большинство функций этого модуля были инкапсулированы в общий потребительский клиент Golang Kafka. Включая сертификацию SASL, требуемую платформой подписки Dba Binlog, и использование данных из Offset в указанный момент времени.
Модуль разбора данных бинлога
Парсинг данных Binlog в исходном проекте направлен на исходные данные Binlog, включая реализацию парсинга протокола репликации. В нашем сценарии использования данные Binlog уже представляют собой строку json, проанализированную каналом, поэтому функция этого модуля упрощена.
Пример строки binlog json
Вышеупомянутая упрощенная строка binlog json. Правило конфигурации может быть получено через базу данных и таблицу binlog. Согласно правилу конфигурации, ключ-значение в данных строится в карту ключ-значение, которая соответствует соответствующему индексу Elasticsearch. . , включая некоторые преобразования типов данных:
Elasticsearch Связанные модули
Карта «ключ-значение», созданная модулем анализа данных Binlog, собирается модулем в полезную нагрузку обновления, которая запрашивает интерфейс _bulk и записывается в Elasticsearch. Учитывая давление записи на Elasticsearch, когда MySQL часто обновляется, карта ключ-значение будет временно храниться в срезе, а интерфейс _bulk Elasticsearch будет вызываться каждые 200 мс или когда длина среза достигает определенной длины (что может настраивается конфигурацией).
2. Индивидуальная разработка
1) Адаптация к потребностям бизнеса
upsert
Данные индекса, используемые в бизнесе, могут поступать из нескольких разных таблиц. Когда данные одного и того же документа поступают из разных таблиц, данные, которые поступают первыми, являются индексом, а данные, которые поступают позже, являются обновлением. Когда мы не можем контролировать последовательность, необходимо реализовать функцию upsert. Добавьте параметр _bulk
{
"doc_as_upsert" : true
}
Filter
В реальных бизнес-сценариях данные, необходимые бизнесу, могут быть только частью данных в определенной таблице.Например, поле типа используется для идентификации источника данных, а данные типа = 1 или 2 нужны только для синхронизации с Elasticsearch. Мы расширили конфигурацию правила для поддержки требований фильтрации указанных полей Binlog, например:
select * from sometable where type in (1,2)
2) Быстрое увеличение
Синхронизация данных обычно делится на полную и инкрементную. При доступе к бизнесу вам сначала необходимо импортировать существующие исторические данные MySQL о бизнесе в Elasticsearch, что является полной синхронизацией. Данные, добавленные во время процесса полной синхронизации и впоследствии, являются добавочными данными.
После завершения полной синхронизации данных, если Kafka используется из самых старых, а данные в очереди велики, потребуется много времени, чтобы добавочные данные догнали текущий прогресс. Чтобы получить требуемый инкрементный бинлог быстрее, перед тем, как Consumer Group использует Kafka, сначала получите значения смещения раздела каждой темы в указанное время и зафиксируйте эти смещения, чтобы при подключении Consumer Group к Kafka кластер, он начнется прямо сейчас, отправленное смещение начнет потребляться, и требуемый инкрементальный бинлог может быть получен немедленно.
3) Микросервисы и центр конфигурации
Проект развертывается с использованием микросервисов Mafengwo, которые обеспечивают быструю онлайн-поддержку для новых сервисов доступа и могут легко и быстро расширять возможности потребителей при внезапном увеличении объема бизнес-данных Binlog.
Центр конфигурации Mafengwo поддерживает управление конфигурацией каждой службы доступа.По сравнению с файлом конфигурации формата toml в проекте с открытым исходным кодом, центр конфигурации можно использовать для более удобного управления конфигурацией различных служб и различных сред.
V. Регистрация и мониторинг
Как видно из приведенного выше рисунка, средняя задержка синхронизации данных каждой таблицы заказа составляет около 1 с. Подключите задержанные данные к ElastAlert и отправьте уведомление о тревоге, когда будет слишком много задержанных данных.
Другим индикатором мониторинга является обнаружение сердцебиения. Создана отдельная таблица, которая не зависит от бизнеса. Сценарий Crontab изменяет таблицу каждую минуту и проверяет, является ли последняя модификация синхронизирована с указанным индексом. Если нет, уведомление о тревоге отправляется. Обнаружение сердцебиения контролирует кафка, микросервисы и ES во всем процессе. Если в любой ссылке есть проблема, что вызывает данные о синхронизации, это будет первым уведомлением.
6. Заключение
Самая важная деловая сторона, подключенная в настоящее время, — это индекс заказов электронной коммерции, а задержка синхронизации данных стабильна и составляет около 1 с. Практика локализации этого проекта с открытым исходным кодом надеется помочь в некоторых бизнес-сценариях, требующих синхронизации данных Elasticsearch.
автор этой статьи: Чжан Кун (Zhang Kun), старший инженер по исследованиям и разработкам в отделе исследований и разработок в сфере сотовой электронной коммерции Ma.
(Исходное содержание Ma Honeycomb Technology, пожалуйста, сохраните исходный код и QR-код в конце статьи для перепечатки, спасибо)