Решения и лучшие практики для потоковой передачи пакетных данных Volcano Engine

задняя часть Большие данные
Решения и лучшие практики для потоковой передачи пакетных данных Volcano Engine

Платформа качества данных Volcano Engine была отшлифована за годы обслуживания заголовков ByteDance Today, Douyin и других компаний. Столкнувшись со сложными сценариями качества данных различных линеек продуктов, таких как Toutiao и Douyin, как платформа качества данных может удовлетворить разнообразные потребности?

В этой статье будет рассказано, как платформа качества данных Volcano Engine устраняет конфликт между проверкой качества данных, большим потреблением вычислительных ресурсов и длительным временем вычислений для проверки в сценариях с большими данными, а также как платформа качества данных использует набор архитектурных фреймворков для удовлетворения требований потоковая и пакетная обработка, контроль качества данных.

Что такое качество данных

В широком смысле качество данных определяется как степень, в которой данные соответствуют набору неотъемлемых характеристик (параметров качества). Отрасль обычно имеет 6 измерений:

  • честность: Относится к тому, являются ли записи и информация о данных полными и есть ли какие-либо пропущенные ситуации. Отсутствующие данные в основном включают в себя отсутствие записей и отсутствие информации определенного поля в записях, оба из которых приводят к неточным статистическим результатам, поэтому целостность является основной гарантией качества данных. При мониторинге необходимо учитывать два аспекта: меньше ли количество элементов данных, не пропущены ли значения некоторых полей. Мониторинг целостности в основном происходит на уровне журнала, а проверка целостности данных обычно выполняется при доступе к данным.
  • точность: Относится к тому, будет ли информация и данные, записанные данные в данных, и есть ли аномалии или ошибки. Как правило, мониторинг точности в основном фокусируется на мониторинге данных бизнес-результата, таких как ежедневная деятельность, доход и другие данные нормальными.
  • последовательность: Относится к тому, согласуются ли результаты одного и того же индикатора в разных местах. Несоответствие данных обычно возникает, когда система данных достигает определенного уровня сложности, и один и тот же индикатор будет рассчитываться в нескольких местах.Из-за разницы в калибре расчета или разработчиков легко получить разные результаты для одного и того же индикатора.
  • своевременность: Следующим шагом после обеспечения целостности, точности и согласованности данных является обеспечение возможности своевременного предоставления данных, чтобы они отражали ценность данных. Своевременность легко понять.Главный вопрос заключается в том, достаточно ли высока скорость расчета данных.Это может быть отражено в мониторинге качества данных тем, рассчитываются ли данные результатов мониторинга до указанного момента времени.
  • нормативный: Относится к тому, хранятся ли данные в соответствии с требуемыми правилами, такими как проверка почтового ящика, проверка IP-адреса, проверка формата телефона и т. д., что имеет определенное семантическое значение.
  • уникальность: Указывает, повторяются ли данные, например уникальное значение поля, повторяющееся значение поля и т. д.

У нас есть некоторые процессы и спецификации для качества данных, и мы разработали платформу качества данных для некоторых из вышеперечисленных параметров, уделяя особое внимание качеству данных и его производственной цепочке.

На рисунке выше показано, какие функции может выполнять платформа качества данных в процессе разработки данных:

  • исследование данных: Вы можете просматривать детали данных и их распределение по различным измерениям.
  • Сравнение данных: Разработчики часто могут столкнуться с несоответствием онлайн-таблицы и тестовой таблицы, поэтому мы предоставляем функцию сравнения данных в ссылке онлайн-задачи.
  • мониторинг задач: Мониторинг онлайн-данных, обеспечение функций сигнализации и предохранения.

Наиболее репрезентативной функцией платформы качества данных является обнаружение дублирования первичного ключа в данных таблицы Hive, созданных платформой разработки данных, и оповещение в случае дублирования.

Наиболее полезным сценарием для мониторинга качества данных является предотвращение распространения проблем с данными вниз по течению. Например: задача данных создает таблицу Hive, которая может синхронизировать некоторую информацию с хранилищем метаданных Hive (HMS). Может быть определенная задержка в архитектуре ведущий-подчиненный HMS.Если есть проблема с HMS, нижестоящие задачи могут читать грязные данные.В это время, если мы используем мониторинг качества данных, мы можем вовремя обнаружить проблемы и предотвратить последующие задачи от запуска.

Проблемы с качеством данных

Каковы наши текущие проблемы качества данных? Это можно понять через несколько случаев пользователей.

User Story 1

Для коммерческой системы продукта уровня трафика ожидается, что количество журналов уровня M в секунду будет отслеживаться на втором уровне, а задержка журнала и ключевые поля должны быть NULL, а T + 1 для обнаружения логарифмическая частота колебаний.

User Story 2

Во внутренней бизнес-системе логи хранятся в ES, есть надежда, что колебания логов в предыдущем цикле выявляются каждые 5 минут.

User Story 3

Для внутренней индикаторной платформы бизнес-данные регулярно синхронизируются из Hive в ClickHouse, я надеюсь после каждой задачи синхронизации проверять согласованность индикаторов в Hive и ClickHouse.

Благодаря приведенному выше введению у вас должно быть общее представление о текущих проблемах с качеством данных, которые необходимо решить. Кто-то из студентов может сказать, что я тоже работал на платформе качества данных, и задача не сложная, в общем, нужно производить различные вычисления на данных и сравнивать рассчитанные пороги, вообще-то расчет напрямую зависит от Spark двигатель или двигатель Hive. Ведь на самом деле так выглядит наше качество данных в начале. Так почему же он эволюционировал до настоящего времени и с какими проблемами мы столкнулись?

Во-первых, требования к сцене очень сложны:

  1. Больше никакого офлайн-мониторинга, с ним все знакомы, в основном для мониторинга качества данных разных хранилищ, таких как Hive или ClickHouse.
  2. К внутренней рекламной системе ByteDance предъявляются высокие требования по своевременности и точности: по словам студентов-рекламистов, если микропакетная система используется только для одного теста каждые 10 минут, онлайн-потери могут исчисляться миллионами и даже десятками миллионов. . Поэтому у студентов в рекламной системе относительно высокие требования к производительности в реальном времени.
  3. Другой — мониторинг потоковой задержки в сложных топологиях.
  4. Последним является микропакет, который относится к планированию времени в течение определенного периода времени.В некоторых сценариях потоковой передачи, когда Kafka импортируется в ES, необходимо сравнивать предыдущий цикл каждые несколько минут.

Кроме того, различные продукты ByteDance будут генерировать массивные данные журналов, и нам нужно использовать ограниченные ресурсы, чтобы удовлетворить все потребности в мониторинге качества.

Столкнувшись с этими проблемами, каковы наши решения?

Решения для качества потоковой передачи пакетных данных

Функциональная архитектура продукта

Решение Volcano Engine для потоковой передачи пакетных данных имеет 4 основные функции:

  • Автономный мониторинг качества данных: Решайте сценарии пакетного и микропакетного мониторинга, поддерживайте Hive, ClickHouse, ES и другие источники данных, а также используйте несколько параметров мониторинга, таких как поля и уникальность, что позволяет выполнять мониторинг с помощью агрегирования пользовательских параметров SQL.
  • Мониторинг качества потоковых данных: решение сценариев потокового мониторинга и поддержка источников данных, таких как Kafka/BMQ.
  • исследование данных: Решите вопрос о содержании данных до разработки данных и поддержите источник данных Hive.
  • Сравнение данных: решить проблему согласованности данных между старыми и новыми таблицами и поддерживать источники данных Hive/Hive SQL.

структура системы

На приведенном выше рисунке представлена ​​схема архитектуры системы платформы качества данных, которая в основном разделена на 5 частей:

  • Scheduler: Внешний планировщик, запускает автономный мониторинг. В основном есть два типа:
    • Обеспечить внешние задачи вызова API;
    • Планирование времени, вызов данных через calljob.
  • Backend: Серверная служба, частичный уровень службы, обработка бизнес-логики. В основном отвечает за:
    • Взаимодействие между платформой качества и внешним миром, все ответы API проходят через этот уровень;
    • Отправка задачи: правила, настроенные пользователем на платформе качества, будут помещены в бизнес-хранилище.После вызова планировщика Backend отправит конфигурацию параметров, связанных с задачей;
    • Получайте и оценивайте результаты мониторинга качества, а затем взаимодействуйте с внешними системами, чтобы при необходимости отправлять оповещения для уведомления пользователей.
  • Executor: модуль выполнения задач, лежащий в основе платформы, интегрирует некоторые механизмы, такие как исследование данных с использованием механизма OLAP. В части контроля качества для статистики данных используется мера Гриффина.
  • Monitor: это относительно независимый модуль, в основном предназначенный для потока служб состояния, предоставляющий такие функции, как повторные сигналы тревоги.
  • Alert Center: Качество платформы сильно зависит от платформы. Это внешняя тревожная служба, которая получает различные тревожные события.

Процесс обнаружения данных в автономном режиме

Давайте взглянем на процесс обнаружения данных в автономном режиме.

Процесс выполнения автономного мониторинга, исследования и сравнения данных является последовательным и в основном делится на 4 этапа:

  1. триггер монитора: Система планирования вызывает модуль качества Backend API;
  2. отправка домашнего задания: серверная часть отправляет задания Spark в Yarn в режиме кластера;
  3. вернуть результат: задание завершается (успех, сбой), и Драйвер синхронизирует результат с Бэкендом;
  4. триггер сообщения: Бэкэнд инициирует соответствующие действия (например, тревогу, подсказку сообщения) в зависимости от результата.

Мы резюмируем преимущества платформы качества данных:

  • Низкая связанность системы планирования: Платформа качества данных не сильно привязана к системе планирования.Как правило, для реализации взаимных вызовов можно использовать API бизнес-системы.
  • Эффективный запуск событий и широкие возможности горизонтального расширения Backend: Бэкэнд — это служба экземпляра без сохранения состояния.Если есть много бизнес-систем для мониторинга качества, Бэкенд можно развернуть горизонтально, получая запросы и отправляя задания.
  • Нет ограничений по квоте: Платформа сама не поддерживает очередь ресурсов, необходимую для мониторинга качества данных, а открывает это разрешение для пользователей и использует их собственные ресурсы для мониторинга ресурсов. Это превращает вопрос о квоте в вопрос о пользовательском ресурсе.

Конечно, ни один инструмент не может быть идеальным, и в платформе качества данных еще есть области, которые нужно улучшить:

  • Более тяжелые запросы, не требующие интенсивного использования ЦП: Дизайн всей платформы предназначен для выполнения требований офлайн-сценариев в форме отправки задач. Но затем мы обнаружили, что задания, которым на самом деле не нужно запускать Spark, все равно запускают задание Spark, например запрос ES SQL, что очень тяжело.
  • Опора на Yarn для планирования не стабильна: В случае нехватки ресурсов или переполненных задач на платформе выполнение или вызов задачи будет очень медленным.

Выполнение потокового мониторинга

Для мониторинга потоковых данных мы выбираем движок Flink, потому что потоковые данные отличаются от автономных данных, и мы не можем использовать моментальные снимки, чтобы получить процесс по низкой цене. Поэтому нам приходится полагаться на некоторые внешние базы данных временных рядов и механизмы правил для отображения мониторинга данных.

Процесс потокового мониторинга данных на платформе выглядит следующим образом:

  1. Создайте задание Flink в соответствии с определением правила;
  2. Регистрировать события тревоги Bosun в соответствии с условиями тревоги;
  3. Задания Flink потребляют данные Kafka, рассчитывают показатели мониторинга и записывают метрики;
  4. Данные временных рядов на основе Bosun Metrics, определение времени и срабатывание аварийных сигналов;
  5. Backend получает обратный вызов сигнала тревоги и обрабатывает логику отправки сигнала тревоги.

Далее основное внимание уделяется реализации двух модулей.

Реализация исполнителя

Executor — это приложение Spark, основанное на модуле Measure Apache Griffin. Особенности включают в себя:

  • Адаптировать источник данных
  • Преобразование данных в DataFrame
  • Правила транслируются в операции SQL
  • Результаты расчета

Выбор Исполнителя имеет следующие соображения:

  • Масштабируемость должна быть достаточно сильной, чтобы адаптироваться к различным источникам данных, таким как Hive, MySQL и т. д.
  • лучшая вычислительная производительность
  • Поддерживается достаточно типов типов мониторинга

Учитывая приведенную выше информацию, мы выбрали модуль Measure Apache Griffin в качестве исполнителя. Он разработан на основе Spark, может адаптироваться к различным источникам данных и сделал ряд расширений для DSL. Исходя из дизайна платформы, нам нужно больше взаимодействовать с Backend и отправлять данные обратно. На самом деле, сам Griffin Measure поддерживает некоторые базовые функции мониторинга качества данных, такие как обнаружение повторяющихся значений, пользовательский SQL и т. д. Здесь мы сосредоточимся на преобразовании модуля Measure:

  • Измените источник данных и приемник, чтобы он мог получить доступ к удаленному API через HTTP;
  • Некоторые улучшения и модификации функций, такие как: поддержка регулярных выражений;
  • Мониторинг потоковой передачи переключается с Spark Engine на Flink Engine, чтобы оптимизировать общее решение для мониторинга потоковой передачи. Мера сама по себе является частью экосистемы Spark и может использовать Spark Engine только для линейного управления или потоковой передачи микропакетного моделирования для мониторинга. У самого ByteDance есть определенные возможности Flink, а возможности Flink по обработке потоковых данных намного лучше, чем у микропакетов, поэтому мы сделали такое преобразование.

Мониторинг реализации

Модуль мониторинга в основном реализует функцию повторной и повторной аварийной сигнализации, а также инициирует соответствующие события (повторная тревога, повторная попытка сбоя и т. д.) в соответствии с типом события. Поскольку все бизнес-данные хранятся в MySQL, монитор перед платформой также относительно прост для повторных сигналов тревоги, то есть извлекает экземпляры, вызывающие тревогу, из MySQL напрямую посредством опроса, а затем отправляет сигнал тревоги посредством повторных представлений.

С появлением все большего количества правил мониторинга нагрузка на библиотеку будет очень высокой, а сканирование монитора также сталкивается с некоторыми узкими местами. Поэтому мы обновили техническую архитектуру монитора. Конкретное преобразование включает в себя:

  • Служба с отслеживанием состояния, главный узел предоставляет внешние службы; главный узел и резервное копирование обеспечивают высокую доступность.
  • Получение внутренних событий: сбой мониторинга, тревога
  • Очередь синхронизации памяти, механизм триггера событий

Лучшие практики

Некоторые реализации платформы качества данных были представлены ранее. Вот некоторые из наших лучших практик с точки зрения объема данных и ресурсов.

Информация о количестве строк в таблице — приоритетная выборка HMS

При внутреннем автономном мониторинге мониторинг количества строк таблицы очень велик, и по крайней мере 50% автономных правил могут отслеживаться по количеству строк таблицы. Для количества строк таблицы мы использовали Spark, Select Count* для отправки заданий ранее, что потребляет много ресурсов.

Позже мы сделали некоторые оптимизации. Во время представления задач базовый двигатель записывает запись строки таблицы в соответствующей информации раздела во время вывода выходных данных, и мы можем напрямую получить количество информации о количестве таблиц, непосредственно из раздела HMS, тем самым избегая подачи искры задача.,

Эффект от оптимизации очень очевиден.В настоящее время для мониторинга количества строк таблицы количество строк, полученных HMS, составляет около 90%, а среднее время выполнения мониторинга количества строк HMS находится на втором уровне.

Примечание: Эта функция должна продвигать поддержку базовых сервисов.Например, Spark должен записывать информацию, хранящуюся в локальной метрике, в HMS, а также должны поддерживать другие системы передачи данных.

Оптимизация офлайн-мониторинга

Эта часть основана на Мере Гриффина, Мера сама по себе имеет богатые функции, и мы сократили ее, чтобы сэкономить время. Основные отсечения и оптимизации включают в себя:

  • Вырезать некоторые аномальные функции сбора данных;
  • Оптимизируйте ненужные процессы соединения.

Кроме того, мы также оптимизировали параметры выполнения автономного мониторинга, в том числе:

  • В соответствии с разными типами мониторинга добавлять разные параметры (перемешивать в hdfs и т. д.);
  • По характеристикам мониторинга оптимизируются параметры по умолчанию (апгрейд vcore и т.д.).

Например: пользователь пишет SQL для объединения данных, а механизм выполнения может анализировать план выполнения. Для операций соединения перетасовка может быть очень большой, в этом случае некоторые параметры Spark мы будем открывать по умолчанию.

Размер таблицы данных прогнозируется на основе количества строк таблицы.Если таблица данных считается большой, Vcore и память будут точно настроены по умолчанию. Все вышеперечисленные оптимизации могут в определенной степени повысить производительность, а среднее время работы различных типов мониторинга на платформе сократилось более чем на 10%.

Представляем OLAP-движок

На платформе много таблиц данных и бизнес-таблиц (кроме таблицы журнала), а объем данных мониторинга таблицы на верхнем уровне хранилища данных не очень велик, что очень подходит для запросов OLAP.

В этом случае у нас есть сцена профилирования данных, которая вводит presto. В этом сценарии, прежде чем выполнять исследование Spark, после внедрения механизмов происходит сбой в предварительном порядке из-за быстрого, большого объема данных, вычислительная сложность задачи откатывается для отправки исследовательской работы Spark, средняя продолжительность исследования сокращается с предыдущей до текущей менее чем на 7 минут. 40-х, эффект очень значителен.

Проблемная выборка мониторинга потока и оптимизация единой темы

Выборка данных Кафки

Как правило, проблемы потоковой передачи данных являются общими проблемами, и проблемы могут быть обнаружены путем выборки данных. Поэтому мы разработали функцию выборки данных, чтобы уменьшить долю и потребление ресурсов данных. Flink Kafka Connector поддерживает выборку и может напрямую управлять смещением темы kafka для достижения цели выборки. Например, мы делаем выборку со скоростью 1%. Изначально нам нужно только ** машин для поддержки темы с разделами W.

Оптимизация нескольких правил для одной темы

Сначала мы определили правило для темы, а затем запустили задачу Flink для использования и выполнения правила. Позже мы обнаружили, что некоторые ключевые данные необходимо отслеживать в нескольких измерениях, то есть для определения нескольких измерений правил.Открывать задачи для каждого правила для потребления очень ресурсоемко, поэтому мы используем мониторинг не для интенсивного использования ЦП. операции, функции, мультиплексирование прочитанной части, выполнение нескольких правил в одном слоте, единое потребление уровня темы и выполнение всех связанных правил в одной задаче.

Будущее направление эволюции

Эта статья знакомит с реализацией и передовым опытом платформы качества данных и, наконец, обсуждает будущее направление развития платформы.

  • Базовый механизм представляет собой унифицированную поточно-пакетную интеграцию.: В настоящее время большинство автономных задач платформы выполняются на основе Spark, потоковые данные обрабатываются Flink, а в движке OLAP реализован режим presto, что приводит к относительно высоким затратам на эксплуатацию и обслуживание этой системной архитектуры. Мы видим, что текущие возможности Flink presto и возможности Flinkbatch также развиваются, поэтому мы постараемся сократить некоторые задачи в будущем, чтобы добиться действительно унифицированного движка.
  • разумный: внедрить алгоритмы, управляемые данными. Рассмотрите возможность внедрения методов машинного обучения, чтобы помочь в выборе пороговых значений или интеллектуальных аварийных сигналов, а также автоматически рекомендовать правила качества на основе уровней данных. Чтобы привести несколько примеров, например, мы можем решить пик праздничного трафика и увеличение обычного порога жесткого правила на основе интеллектуального мониторинга волатильности алгоритма временных рядов.
  • удобный: OLAP имеет значительное улучшение производительности, но в настоящее время мы используем его только для исследования данных. Впоследствии механизм OLAP можно применять для проверки качества, исследования данных, приложения для сравнения данных и процесса разработки данных.
  • оптимизация: например, с помощью одного задания одновременно запускайте несколько мониторов, сочетая мониторинг и исследование данных. Теперь мы пытаемся объединить создание правил качества данных и исследование данных, чтобы добиться соответствующей взаимосвязи между данными и правилами WYSIWYG.

Q&A

Q: Устранение проблем с качеством данных часто занимает много времени. Выполняли ли вы какую-либо работу по атрибуционному анализу проблем с качеством данных?

A: Эта проблема является очень серьезной проблемой. Здесь мы можем представить наше текущее мышление: учащиеся, работающие с алгоритмом перебора байтов, выполняют бурение данных, то есть выполняют исследование данных в каждой таблице связи данных. Если обнаружены проблемы с качеством, найдите поля выше по течению данных с помощью некоторого отношения, подобного происхождению и полям. В настоящее время мы все еще проводим этот процесс частичного исследования +, чтобы как можно скорее понять исходные данные, и в части анализа атрибуции нет прогресса.

Q: Как осуществляется замкнутый цикл качества данных: например, кто будет решать проблему качества данных? Как измеряется качество данных?

A: Кто решит проблему качества данных? Кто заботится о качестве данных, кто продвигает вперед, кто разрабатывает данные и кто решает проблемы с качеством данных. Это совместная проблема.

Как измерить качество данных? У нас есть некоторые управляемые внутренние индикаторы, такие как количество аварийных сигналов, частота аварийных сигналов любого ядра и так далее.

Q: Как обеспечить сквозную согласованность данных?

A: сквозная согласованность данных не может быть решена с помощью одного инструмента, и могут потребоваться некоторые решения, такие как: данные, сообщаемые с конца, в сочетании с системой скрытых точек для проверки данных, чтобы гарантировать точность данных, когда версия выпущена. Тем не менее, я думаю, что сквозная согласованность данных все еще отсутствует во всей отрасли, и если есть проблема на стороне бизнеса, ее трудно устранить. Если контролировать каждый уровень канала передачи данных, устранение неполадок может быть проще, но такой подход является дорогостоящим.