задний план
В моделировании хранилища данных необработанные данные бизнес-уровня без какой-либо обработки называются данными ODS (Operational Data Store). В интернет-предприятиях общие данные ODS включают данные бизнес-журнала (Log) и данные бизнес-базы данных (DB). Для бизнес-данных БД сбор бизнес-данных из реляционных баз данных, таких как MySQL, и их импорт в Hive является важной частью создания хранилища данных.
Как точно и эффективно синхронизировать данные MySQL с Hive? Обычно используемое решение состоит в том, чтобы получать и загружать данные пакетами: напрямую подключать MySQL к данным в таблице Select, затем сохранять их в локальном файле в качестве промежуточного хранилища и, наконец, загружать файл в таблицу Hive. Преимущество этого решения в том, что оно простое в реализации, но с развитием бизнеса постепенно обнажаются недостатки:
- Узлотешение производительности: с ростом масштабирования бизнеса, поток данных выбора из MySQL -> Сохранить на LocalFile -> нагрузку на улей занимает дольше и дольше, что не может соответствовать временным требованиям продукции склада данных.
- Выбор большого объема данных непосредственно из MySQL оказывает большое влияние на MySQL, что, вероятно, приведет к медленным запросам и повлияет на нормальные услуги в бизнес-линии.
- Поскольку синтаксис самого Hive не поддерживает примитивы SQL, такие как обновление и удаление, он не может поддерживать данные в MySQL, где происходит обновление/удаление.
Чтобы полностью решить эти проблемы, мы постепенно перешли к техническому решению CDC (Change Data Capture) + Merge, то есть набору решений, таких как сбор Binlog в реальном времени + автономная обработка Binlog для восстановления бизнес-данных. Binlog — это двоичный журнал MySQL, который записывает все изменения данных в MySQL.Синхронизация master-slave самого кластера MySQL основана на Binlog.
В этой статье в основном рассказывается, как реализовать точный и эффективный ввод данных БД в хранилище данных с помощью двух аспектов сбора Binlog в реальном времени и автономной обработки бизнес-данных восстановления Binlog.
Общая структура
Общая архитектура показана на рисунке выше. Что касается сбора Binlog в режиме реального времени, мы используем проект Alibaba с открытым исходным кодом Canal, который отвечает за извлечение Binlog из MySQL в режиме реального времени и выполнение надлежащего анализа. После того, как Binlog будет собран, он будет временно сохранен в Kafka для дальнейшего использования. Общая часть сбора данных в реальном времени показана красной стрелкой на рисунке.
Binlog деталь в автономном режиме, черные стрелки, как показано, по следующим шагам для восстановления таблицы MySQL на Tive:
- Используя проект Linkedin с открытым исходным кодом Camus, он отвечает за ежечасную передачу данных Binlog из Kafka в Hive.
- Для каждой таблицы ODS сначала необходимо сделать одноразовый снимок (Snapshot) и прочитать существующие данные в MySQL в Hive.Нижний уровень этого процесса — прямое подключение к MySQL для выбора данных.
- Для каждой таблицы ODS слияние выполняется каждый день на основе данных запасов и двоичного журнала, создаваемого постепенно в течение дня для восстановления бизнес-данных.
Давайте вернемся назад и посмотрим на различные проблемы, возникающие в схеме пакетной выборки и загрузки, представленной в фоновом режиме.Почему эта схема может решить вышеуказанные проблемы?
- Во-первых, Binlog генерируется потоковой передачей.Благодаря сбору Binlog в реальном времени часть требований к обработке данных распределяется на поток в реальном времени из пакетной обработки один раз в день. Независимо от производительности или ограничения доступа к MySQL, будут значительные улучшения.
- Во-вторых, Бинлог сам фиксирует тип изменения данных (Вставка/Обновление/Удалить), благодаря некоторой семантической обработке можно добиться точного восстановления данных.
Binlog Collection в реальном времени
Сбор Binlog в режиме реального времени включает в себя два основных модуля: один — CanalManager, который в основном отвечает за назначение задач сбора, мониторинг и сигнализацию, управление метаданными и связь с внешними зависимыми системами; другой — Canal и CanalClient, которые фактически выполняют задачи по сбору.
Когда пользователь отправляет запрос на сбор Binlog для БД, CanalManager сначала вызывает соответствующий API платформы DBA, чтобы получить информацию об экземпляре MySQL, где находится БД, чтобы выбрать наиболее подходящую машину для сбора Binlog. Затем экземпляр коллекции (экземпляр Canal) распространяется на соответствующий сервер Canal, а именно CanalServer. При выборе конкретного CanalServer CanalManager будет учитывать такие факторы, как балансировка нагрузки и передача данных между машинными помещениями, и предпочтительно выбирать машины с более низкой нагрузкой и передачей в том же регионе.
После того, как CanalServer получит запрос на сбор, он зарегистрирует информацию о сборе в ZooKeeper. Регистрация включает в себя:
- Постоянный узел, названный в честь имени экземпляра.
- Зарегистрируйте временный узел с собственным ip:port под этим постоянным узлом.
Это сделано для двух целей:
- Высокая доступность: CanalManager распределяется по экземплярам, вы выбираете два CanalServer, один из которых является рабочим узлом, а другой — резервным узлом. Резервный узел следит за экземпляром.После отказа работающего узла временный узел исчезает, а резервный узел захватывается. Это достигает цели катастрофы.
- Взаимодействие с CanalClient: после того, как CanalClient обнаружит работающий CanalServer, где находится экземпляр, за который он отвечает, он подключится для получения данных Binlog, отправленных CanalServer.
Подписка на Binlog основана на базе данных MySQL в качестве степени детализации, а Binlog базы данных соответствует теме Kafka. В базовой реализации все подписанные БД в экземпляре MySQL обрабатываются одним и тем же экземпляром Canal. Это связано с тем, что генерация Binlog основана на степени детализации экземпляра MySQL. CanalServer отбрасывает данные Binlog без подписки, а затем CanalClient распространяет полученный Binlog в Kafka в соответствии с гранулярностью БД.
Восстановление данных MySQL в автономном режиме
После завершения сбора Binlog следующим шагом будет использование Binlog для восстановления бизнес-данных. Первая проблема, которую необходимо решить, это синхронизация Binlog от Kafka до Hive.
Kafka2Hive
Управление всей задачей Kafka2Hive осуществляется в рамках ETL платформы данных Meituan, включая механизм выражения и планирования примитивов задач, которые аналогичны другим ETL. Нижний уровень принимает проект Camus с открытым исходным кодом LinkedIn и выполняет целевую вторичную разработку для завершения реальной работы по передаче данных Kafka2Hive.
Вторичная разработка для Камю
Binlog, хранящийся в Kafka, не имеет схемы, в то время как таблица Hive должна иметь схему, а структура ее разделов и полей должна быть удобной для эффективного последующего использования. Первая модификация Camus заключается в том, чтобы преобразовать Binlog в Kafka в формат, соответствующий целевой схеме.
Вторая модификация Camus определяется структурой ETL Meituan. В нашей системе планирования задач в настоящее время анализируются только восходящие и нисходящие зависимости для задач в одной и той же очереди планирования, и зависимости не могут быть установлены между очередями планирования. Во всем процессе MySQL2Hive задача Kafka2Hive должна выполняться один раз в час (почасовая очередь), а задача слияния должна выполняться один раз в день (дневная очередь). Запуск задачи слияния должен строго зависеть от завершения ежечасной задачи Kafka2Hive.
Для решения этой проблемы введем задачу Checkdone. Задача Checkdone — это ежедневная задача, которая в основном отвечает за определение того, успешно ли завершен Kafka2Hive предыдущего дня. Если она завершается успешно, задача Checkdone выполняется успешно, так что последующая задача Merge может быть запущена правильно.
Логика обнаружения Checkdone
Как обнаруживается Checkdone? После того, как каждая задача Kafka2Hive успешно завершает передачу данных, Camus отвечает за запись времени начала задачи в соответствующем каталоге HDFS. Checkdone просканирует все временные метки предыдущего дня.Если максимальная временная метка превысила 0, это означает, что задачи Kafka2Hive предыдущего дня были успешно выполнены, и Checkdone завершил обнаружение.
Кроме того, поскольку сам Camus только завершает процесс чтения Kafka, а затем записывает файлы HDFS, он также должен завершить загрузку раздела Hive, прежде чем его можно будет запрашивать ниже по течению. Итак, последний шаг всей задачи Kafka2Hive — загрузить разделы Hive. Таким образом, вся задача считается успешно выполненной.
Каждая задача Kafka2Hive отвечает за чтение определенной темы и запись данных Binlog в таблицу в библиотеке original_binlog, которая является original_binlog на предыдущем рисунке.db, в котором хранятся все бинлоги, соответствующие базе данных MySQL.
На приведенном выше рисунке показана структура каталогов файлов в HDFS после завершения Kafka2Hive. Если БД MySQL называется user, соответствующий Binlog хранится в таблице original_binlog.user. В готовом каталоге время запуска всех успешно выполненных задач Kafka2Hive за день сохраняется для использования Checkdone. Бинлог каждой таблицы организован в раздел, например, Бинлог таблицы userinfo хранится в разделе table_name=userinfo. Под разделом первого уровня каждого table_name раздел второго уровня организован с помощью dt. Файлы xxx.lzo и xxx.lzo.index на рисунке хранят данные binlog, сжатые с помощью lzo.
Merge
После успешного размещения Binlog в хранилище следующим шагом будет восстановление данных MySQL на основе Binlog. Процесс слияния выполняет две функции: во-первых, он сохраняет данные Binlog, сгенерированные за день, в таблице Delta, а затем выполняет слияние на основе первичного ключа с существующими данными о запасах. Данные в дельта-таблице — это последние данные за день.Если часть данных изменяется несколько раз в течение одного дня, в дельта-таблице сохраняются только данные после последнего изменения.
В процессе слияния дельта-данных и биржевых данных требуется уникальный ключ, чтобы определить, являются ли они одними и теми же данными. Если одна и та же часть данных появляется и в таблице inventory, и в таблице Delta, это означает, что эта часть данных была обновлена, и в качестве окончательного результата выбраны данные в таблице Delta; в противном случае никаких изменений не произошло, и данные в исходной инвентарной таблице сохраняются как окончательный результат Окончательный результат. Результатом слияния будут данные Insert Overwrite в исходную таблицу, а именно origindb на рисунке.table.
Пример процесса слияния
Следующее подробное описание с примером процесса слияния.
Таблица данных имеет два столбца: идентификатор и значение, где идентификатор — это первичный ключ. При извлечении дельта-данных для нескольких обновлений одних и тех же данных выбирается только последнее обновленное. Таким образом, для данных с id=1 последнее обновленное значение value=120 записывается в таблицу Delta. После слияния дельта-данных и биржевых данных в окончательный результат вставляется новый фрагмент данных (id=4), два фрагмента данных обновляются (id=1 и id=2), а один фрагмент данных остается неизменным ( идентификатор = 3).
По умолчанию мы используем первичный ключ таблицы MySQL в качестве уникального ключа для этого решения. Бизнес также может настроить уникальный ключ, отличный от MySQL, в соответствии с реальной ситуацией.
Выше представлена общая архитектура сбора данных на основе Binlog и восстановления данных ODS. Далее в основном представлены практические бизнес-задачи, которые мы решаем с двух сторон.
Практика 1: Поддержка подбазы данных и подтаблицы
С расширением масштаба бизнеса MySQL имеет все больше и больше подбаз данных и подтаблиц, а количество подтаблиц во многих компаниях составляет порядка нескольких тысяч. Студенты, занимающиеся общей разработкой данных, должны собрать эти данные вместе для анализа. Если каждая таблица сегментов синхронизируется вручную, а затем агрегируется в Hive, такие затраты трудно принять. Поэтому нам необходимо завершить агрегацию подтаблиц на уровне ODS.
Прежде всего, когда бинлог собирается в реальном времени, мы поддерживаем запись бинлога разных БД в один и тот же топик Kafka. При подаче заявки на сбор Binlog пользователи могут одновременно выбрать несколько физических БД с одной и той же бизнес-логикой. Через коллекцию на уровне коллекции Binlog журналы Binlog всех подбаз данных будут записаны в одну и ту же таблицу Hive, поэтому, когда нисходящий поток выполняет слияние, необходимо прочитать только одну таблицу Hive.
Во-вторых, конфигурация задачи слияния поддерживает регулярное сопоставление. Настроив регулярное выражение, которое соответствует правилам именования бизнес-подтаблиц, задача слияния может узнать, какие бинарные журналы таблиц MySQL ей нужно агрегировать, а затем выбрать данные соответствующего раздела для выполнения.
Таким образом, благодаря работе двух уровней завершается слияние подбазы данных и подтаблиц на уровне ODS.
Здесь есть техническая оптимизация: при выполнении Kafka2Hive мы обрабатываем имя таблицы в соответствии с правилами разделения бизнес-таблиц и преобразуем имя физической таблицы в имя логической таблицы. Например, имя таблицы userinfo123 будет преобразовано в userinfo, а его данные Binlog будут храниться в разделе table_name=userinfo таблицы original_binlog.user. Целью этого является предотвращение основного давления, вызванного чрезмерными маленькими файлами HDFS и разделами Hive.
Практика 2. Удаление поддержки событий
Операция удаления очень распространена в MySQL.Поскольку Hive не поддерживает Delete, если вы хотите удалить данные, удаленные в MySQL, в Hive, вам нужно использовать метод «карусели».
Для процесса Merge, который должен обрабатывать событие Delete, используются следующие два шага:
- Во-первых, извлеките данные, которые произошли, поскольку BinLog сам записывает тип события, этот шаг прост. Объем данных (таблица A) удаляется вместе с удаленными данными (таблица B), и если все данные могут быть объединены с обеих сторон, данные удаляются. Следовательно, выбираются данные, которые записывают NULL, соответствующие таблице B, то есть данные, которые следует сохранить.
- Затем выполните обычное слияние сохраненных данных, полученных выше, в соответствии с описанным выше процессом.
Резюме и перспективы
В качестве основы для производства хранилища данных служба MySQL2Hive на основе Binlog, предоставляемая платформой данных Meituan, в основном охватывает все направления бизнеса в Meituan.В настоящее время она способна удовлетворить потребности большинства предприятий в синхронизации данных и добиться точной и эффективной Данные БД, складирование. В будущем мы сосредоточимся на решении одноточечной проблемы CanalManager и создадим архитектуру аварийного восстановления для нескольких машинных комнат, чтобы обеспечить более стабильное развитие бизнеса.
В этом документе в основном представлена архитектура этой службы из коллекции потоковой передачи Binlog и восстановления данных ODS на основе Binlog, а также представлены некоторые типичные проблемы и решения, с которыми мы столкнулись на практике. Я надеюсь, что это может дать другим разработчикам некоторую справочную информацию, и приглашаю всех общаться с нами.
Набор персонала
Если вы заинтересованы в нашей работе, пожалуйста, отправьте свое резюме наwangmengmeng05@meituan.com, Присоединяйтесь к нам в решении проблемы сбора и передачи массивных данных!