введение
В последние годы наблюдается растущий спрос на услуги передачи данных в режиме реального времени со стороны предприятий. В этой статье рассматриваются характеристики производительности и применимые сценарии общих компонентов данных в реальном времени, а также рассказывается, как Meituan создает хранилище данных в реальном времени с помощью механизма Flink для предоставления эффективных и надежных услуг данных в реальном времени. Ранее в нашем технологическом блоге Meituan была опубликована статья "Сравнение производительности между Flink и Storm, фреймворком для потоковых вычислений", в котором сравнивается вычислительная производительность Flink и Storm. В этой статье в основном описывается опыт использования Flink в реальном производстве данных.
Исходная архитектура платформы реального времени
На ранней стадии построения системы данных в реальном времени из-за меньшего спроса на данные в реальном времени полную систему данных невозможно было сформировать. Мы используем модель разработки «на всем пути до конца»: развертывая задания Storm на вычислительной платформе в реальном времени для обработки очередей данных в реальном времени, извлечения метрик данных и отправки их непосредственно в службы приложений в реальном времени.
Рис. 1. Исходная архитектура данных реального времениОднако по мере того, как продукты и деловые люди все чаще требуют данных в режиме реального времени, возникают новые проблемы.
- Индикаторов данных становится все больше и больше, а развитие «дымохода» приводит к серьезным проблемам со связностью кода.
- Требований становится все больше, некоторые требуют подробных данных, а некоторые требуют OLAP-анализа. Одной модели разработки сложно справиться с множеством потребностей.
- Отсутствует сложная система мониторинга для обнаружения и устранения проблем до того, как они повлияют на бизнес.
Построение хранилища данных в реальном времени
Чтобы решить вышеуказанные проблемы, мы решили использовать многоуровневую схему проектирования для создания хранилища данных в реальном времени на основе опыта создания автономных данных.Многоуровневая архитектура показана на следующем рисунке:
Рис. 2 Многоуровневая архитектура хранилища данных в реальном времениСхема состоит из следующих четырех слоев:
- Уровень ODS: Binlog и журналы трафика, а также очереди различных сервисов в реальном времени.
- Уровень детализации данных: бизнес-домен интегрирует и извлекает фактические данные, а также создает данные измерений в реальном времени с автономными полными данными и данными об изменениях в реальном времени.
- Слой сводки данных: используйте модель широкой таблицы, чтобы дополнить подробные данные многомерными данными и обобщить общие показатели.
- Уровень приложений. Уровень приложений, созданный для конкретных нужд, предоставляет внешние службы через инфраструктуру RPC.
Благодаря многоуровневому дизайну мы можем разместить процесс обработки данных на каждом уровне. Например, процессы фильтрации, очистки, стандартизации и десенсибилизации данных унифицированы на уровне детализации данных; общие сводные данные многомерного индекса обрабатываются на уровне сводки данных. Повысьте скорость повторного использования кода и общую эффективность производства. При этом типы задач, обрабатываемых на каждом уровне, схожи, и единое техническое решение позволяет оптимизировать производительность и сделать техническую архитектуру хранилища данных более лаконичной.
Технический отбор
1. Исследование механизма хранения
Дизайн хранилища данных в реальном времени отличается от автономного хранилища данных, в котором используется одна и та же схема хранения на всех уровнях, например, стратегии, хранящиеся в Hive и DB. Прежде всего, для таблицы промежуточного процесса принята схема смешения структурированных данных через хранилище очереди сообщений и быстродействующее хранилище КВ. Механизм вычислений в реальном времени может выполнять вычисления в реальном времени, отслеживая данные в очереди сообщений для использования сообщений. Данные в высокоскоростном хранилище KV можно использовать для быстрых ассоциативных вычислений, таких как многомерные данные. Во-вторых, на прикладном уровне настройте схему хранения для прямой записи в соответствии с характеристиками использования данных. Это позволяет избежать задержки обработки, вызванной потоком данных синхронизации на прикладном уровне автономного хранилища данных. Чтобы удовлетворить различные типы требований к данным в режиме реального времени и разумно спроектировать решения для хранения данных на всех уровнях, мы исследовали несколько решений для хранения данных, которые широко используются в Meituan.
Таблица 1 Список решений для хранения данныхплан | Преимущество | недостаток |
---|---|---|
MySQL | 1. Он имеет полные функции транзакций и может обновлять данные. 2. Поддержка SQL, низкая стоимость разработки. | 1. Стоимость горизонтального расширения высока, а хранилище легко стать узким местом 2. Частота обновления и запросов данных в реальном времени очень высока, а один онлайн-запрос приложения в реальном времени имеет более 1000 запросов в секунду. стоимость использования MySQL слишком высока. |
Elasticsearch | 1. Высокая пропускная способность, одна машина может поддерживать более 2500 запросов в секунду, а кластер может быстро масштабироваться. 2. Скорость ответа на запрос Term очень высока.Когда одна машина имеет более 2000 запросов в секунду, задержка запроса составляет 20 мс. | 1. Без встроенной поддержки SQL запрос DSL имеет определенный порог обучения 2. Производительность значительно падает при выполнении операций агрегации. |
Druid | 1. Поддерживает огромные объемы данных. При получении данных в реальном времени через Kafka одно задание может поддерживать 6W+ QPS 2. Данные могут быть агрегированы с помощью предварительных вычислений во время импорта данных, что сокращает объем хранилища данных. Повысить эффективность фактической обработки данных 3. Существует множество платформ анализа OLAP с открытым исходным кодом. Реализовано как суперсет. | 1. Предварительная агрегация не поддерживает подробный запрос 2. Не поддерживается операция соединения 3. Только добавление не поддерживает модификацию данных. Замена может производиться только в единицах Сегментов. |
Cellar | 1. Он поддерживает большой объем данных и использует архитектуру памяти и распределенного хранилища, а производительность хранилища очень высока 2. Пропускная способность хорошая, а средняя задержка составляет около 1 мс при обработке 3W+ QPS при чтении и записи запросы; Поддержка 10W+ QPS. | 1. Интерфейс поддерживает только KV, Map, List, атомарное сложение и вычитание и т. д. 2. Одно значение ключа не может превышать 1 КБ, а когда значение значения превышает 100 КБ, производительность значительно снижается. |
В соответствии с различными бизнес-сценариями решения для хранения, используемые на каждом уровне модели хранилища данных реального времени, примерно следующие:
Рис. 3 Многоуровневая архитектура хранилища данных в режиме реального времени- уровень детализации данныхДля некоторых сценариев данных измерений соответствующая частота может достигать 10w+ TPS, мы выбираем Cellar (внутренняя система хранения Meituan) в качестве хранилища, а служба измерения пакетов предоставляет данные измерений для хранилища данных в реальном времени.
- Уровень агрегации данныхДля общих сводных показателей, данных, которые нужно сопоставить с историческими данными, используется та же схема, что и с размерными данными, в качестве хранения через Подвал, а операция корреляции выполняется в виде сервисов.
- Уровень приложения данныхДизайн прикладного уровня относительно сложен, и после сравнения нескольких различных решений для хранения. Мы сформулировали основу для суждения на основе частоты чтения и записи данных 1000 запросов в секунду. Для приложений реального времени со средней частотой чтения и записи выше 1000 запросов в секунду, но менее сложными запросами, такими как бизнес-данные продавцов в реальном времени. Используйте Cellar в качестве хранилища для предоставления услуг передачи данных в режиме реального времени. Для некоторых приложений со сложными запросами и подробными списками в качестве хранилища больше подходит Elasticsearch. Частота некоторых запросов низкая, например, некоторые внутренние рабочие данные. Druid создает индексы, обрабатывая сообщения в режиме реального времени, и может быстро предоставлять функции анализа данных OLAP в реальном времени посредством предварительной агрегации. Для преобразования в реальном времени некоторых исторических версий продуктов данных хранилище MySQL также может использоваться для облегчения итерации продукта.
2. Исследование вычислительных движков
На ранней стадии создания платформы реального времени мы использовали движок Storm для обработки данных в реальном времени. Хотя движок Storm хорошо работает с точки зрения гибкости и производительности. Однако, поскольку API слишком низкоуровневый, в процессе разработки данных необходимо реализовать некоторые общие операции с данными. Например, сопоставление таблиц, агрегация и т. д. влекут за собой много дополнительной работы по разработке, не только вводят множество внешних зависимостей, таких как кэширование, но и производительность при реальном использовании не очень идеальна. В то же время, функции, поддерживаемые объектом данных Tuple в Storm, также очень просты, которые обычно требуют преобразования в объекты Java для обработки. Для такого типа модели данных, определенной на основе кода, обычно мы можем поддерживать ее только с помощью документации. Это не только требует дополнительных работ по обслуживанию, но также создает проблемы при добавлении и изменении полей. В целом сложно построить хранилище данных в реальном времени с помощью движка Storm. Нам нужна новая схема обработки в реальном времени, которая может обеспечить:
- Предоставляет высокоуровневый API для поддержки общих операций с данными, таких как ассоциативное агрегирование, предпочтительно SQL.
- Благодаря управлению состоянием и автоматической поддержке схем сохраняемости это снижает зависимость от хранилища.
- Облегчает доступ к службам метаданных и позволяет избежать управления структурами данных с помощью кода.
- Производительность обработки как минимум такая же, как у Storm.
Мы провели технический обзор основных вычислительных машин реального времени. В следующей таблице приведены характеристики различных двигателей:
Таблица 2 Список решений для вычислений в реальном временипроект/движок | Storm | Flink | spark-treaming |
---|---|---|---|
API | Гибкий низкоуровневый API и Trident API с гарантиями транзакций | Streaming API и Table API и поддержка Flink SQL больше подходят для разработки данных | Streaming API и Structured-Streaming API также могут использовать Spark SQL, который больше подходит для разработки данных. |
Отказоустойчивость | Механизм подтверждения | Точка сохранения распределенного снимка состояния | точка сохранения RDD |
государственное управление | Государственное управление Trident | Состояние ключа и состояние оператора можно использовать для поддержки нескольких схем сохраняемости. | Существуют API-интерфейсы, такие как UpdateStateByKey для изменений с отслеживанием состояния, поддерживающие несколько схем сохранения. |
Режим обработки | обработка одного потока | обработка одного потока | Пакетная обработка микрофона |
Задерживать | миллисекунда | миллисекунда | секунды |
Семантическая гарантия | Хоть раз, ровно раз | Ровно раз, хотя бы раз | At Least Once |
Судя по результатам опроса, Flink и Spark Streaming API, механизм отказоустойчивости и механизм сохранения состояния могут решить некоторые проблемы, с которыми мы сталкиваемся в настоящее время при использовании Storm. Однако Flink ближе к Storm с точки зрения задержки данных и оказывает минимальное влияние на существующие приложения. И во внутреннем тесте компании пропускная способность Flink примерно в десять раз выше, чем у Storm. После всестороннего рассмотрения мы выбрали механизм Flink в качестве механизма разработки для хранилищ данных в реальном времени.
Что еще больше привлекло наше внимание, так это абстракция таблиц Flink и поддержка SQL. Хотя структурированные данные также можно обрабатывать с помощью движка Strom. Но ведь это все же API обработки на основе сообщений, который не может в полной мере насладиться удобством работы со структурированными данными на уровне кода. Flink не только поддерживает большое количество часто используемых операторов SQL, но также в основном охватывает наши сценарии разработки. Более того, таблицей Flink можно управлять через TableSchema, которая поддерживает разнообразные типы данных, структуры данных и источники данных. Может быть легко интегрирован с существующими системами управления метаданными или системами управления конфигурацией. На рисунке ниже хорошо видна разница между Storm и Flink в процессе разработки.
Рис. 4 Flink — сравнительная диаграмма StormПри разработке с помощью Storm логика обработки и реализация должны быть закреплены в коде Bolt. Flink может быть разработан с помощью SQL, код более читаем, а реализация логики гарантируется платформой с открытым исходным кодом, которая надежна и эффективна.Оптимизация конкретных сценариев требует только изменения реализации функции оптимизатора Flink SQL без влияет на логический код. Позволяет нам вкладывать больше энергии в разработку данных, а не в реализацию логики. Когда требуется сценарий с унифицированными автономными данными и калибром данных в реальном времени, нам нужно лишь немного изменить SQL-скрипт автономного калибра, что значительно повышает эффективность разработки. В то же время, сравнивая модели данных, используемые Flink и Storm на рисунке, Storm необходимо определить структуру данных через класс Java, а таблицу Flink можно определить через метаданные. Его можно хорошо сочетать с метаданными, управлением данными и другими системами разработки данных для повышения эффективности разработки.
Опыт использования Флинка
В процессе создания хранилища данных в реальном времени с помощью Flink-Table. Мы обобщаем опыт использования некоторых распространенных операций при создании хранилищ данных, таких как размерное расширение индикаторов данных, сопоставление данных по темам и операции агрегирования данных через Flink.
1. Расширение измерения
Для расширения измерений индикаторов данных мы используем службу измерений для получения информации об измерениях. Хотя службы измерения на основе Cellar обычно имеют задержки ответа менее 1 мс. Однако для дальнейшей оптимизации пропускной способности Flink мы используем метод доступа к асинхронному интерфейсу для ассоциации многомерных данных, что позволяет избежать использования вызовов RPC для воздействия на пропускную способность данных. Например, для некоторых потоков с большим объемом данных объем данных журнала трафика составляет порядка 10 Вт в секунду. При связывании UDF встроен механизм кэширования, который может исключить кэш в соответствии с частотой попаданий и временем, а также использовать связанное значение ключа для секционирования, что значительно сокращает количество запросов к внешним службам, эффективно уменьшая задержки обработки и внешние системы давление.
2. Ассоциация данных
Слияние субъектов данных — это, по сути, ассоциация нескольких источников данных, которая представляет собой просто операцию соединения. Таблица Флинка построена на концепции бесконечных потоков. При выполнении операции соединения невозможно связать две полные таблицы, такие как автономные данные. Принята схема связывания данных в пределах времени окна, что эквивалентно выполнению операции соединения путем перехвата данных за период времени из каждого из двух потоков данных. Это чем-то похоже на автономные данные, связанные путем ограничения разделов. В то же время следует отметить, что таблица ассоциации Flink должна иметь хотя бы одно условие ассоциации «равно», потому что для группировки будут использоваться значения по обе стороны от знака равенства. Поскольку Flink будет кэшировать все данные в окне для ассоциации, объем кэшированных данных пропорционален размеру связанного окна. Таким образом, связанный запрос Flink больше подходит для обработки некоторых сценариев, в которых временной диапазон связанных данных может быть ограничен бизнес-правилами. Например, связать журналы просмотров пользователя, разместившего заказ в течение 30 минут до покупки. Чрезмерно большие окна не только потребляют больше памяти, но и создают большие контрольные точки, что приводит к снижению пропускной способности или времени ожидания контрольных точек. В реальном производстве вы можете использовать RocksDB и включить режим добавочной точки сохранения, чтобы уменьшить влияние процесса Checkpoint на пропускную способность. Например, для некоторых сценариев, требующих длительного окна корреляции, коррелированные данные могут быть данными, полученными несколько дней назад. Для этих исторических данных мы можем понимать это как фиксированное «измерение». Исторические данные, которые необходимо связать, могут обрабатываться так же, как и многомерные данные: «кэш + автономное» хранилище данных, и связываться с интерфейсом. Кроме того, следует отметить, что Flink напрямую и последовательно связывает ассоциации с несколькими таблицами, поэтому в первую очередь необходимо обратить внимание на ассоциацию небольших наборов результатов.
3. Операция агрегации
При использовании операций агрегирования Flink поддерживает общие операции агрегирования, такие как сумма, экстремальное значение, среднее значение и т. д. Ложкой дегтя является поддержка Distinct.Решение, принятое до Flink-1.6, заключается в том, чтобы сначала сгруппировать поля дедупликации, а затем агрегировать их. В сценариях, где необходимо деагрегировать несколько полей, очень неэффективно выполнять только отдельные вычисления, а затем выполнять обработку сопоставления. С этой целью мы разработали специальный UDAF для реализации точной дедупликации MapView, неточной дедупликации BloomFilter и дедупликации HyperLogLog со сверхмалым объемом памяти для работы с различными сценариями дедупликации в реальном времени. Однако при использовании пользовательского UDAF следует учитывать, что режим RocksDBStateBackend требует много времени для сериализации и десериализации операции обновления большего ключа. Вместо этого рассмотрите возможность использования шаблона FsStateBackend. Еще один момент, который следует отметить, это то, что когда платформа Flink вычисляет аналитические функции, такие как Rank, она должна кэшировать все данные в каждом окне группировки для сортировки, что потребляет много памяти. В этом сценарии рекомендуется сначала преобразовать логику в TopN, чтобы увидеть, можно ли выполнить требование.
На следующем рисунке показан полный процесс использования механизма Flink для создания таблицы данных в реальном времени:
Рисунок 5 Блок-схема расчета в реальном времениРезультаты хранилища данных в режиме реального времени
Заменив исходный процесс хранилищем данных в реальном времени, мы абстрагируем каждый процесс производства данных от каждого уровня хранилища данных в реальном времени. Он реализует унификацию источников данных для всех приложений данных в реальном времени и обеспечивает согласованность индикаторов и измерений данных приложений. В сценарии, когда калибр данных изменяется несколько раз, мы можем выполнить переключение калибра всех приложений без изменения кода приложения путем изменения сведений и сводок хранилища. В процессе разработки строго контролируются уровни данных, разделение предметной области, стандартная спецификация организации контента и правила именования. Сделайте связь разработки данных более четкой и уменьшите связанность кода. В сочетании с использованием Flink SQL для разработки код стал более лаконичным. Объем кода для одного задания был уменьшен в среднем с 300+ строк кода JAVA до десятков строк сценария SQL. Время разработки проекта также значительно сократилось, и нередко один человек разрабатывает несколько индикаторов данных в реальном времени в день.
Кроме того, мы можем проводить целевую оптимизацию производительности и настройку параметров в соответствии с различными характеристиками рабочего контента на каждом уровне хранилища данных. Например, уровень ODS в основном выполняет такие операции, как анализ и фильтрация данных, и не требует вызовов RPC и операций агрегирования. Мы оптимизируем процесс анализа данных, чтобы уменьшить ненужный анализ полей JSON и использовать более эффективные пакеты JSON. С точки зрения распределения ресурсов, один ЦП может быть сконфигурирован только с 1 ГБ памяти для удовлетворения спроса. Уровень агрегации в основном выполняет операции агрегации и ассоциации, которые могут повысить производительность и снизить затраты за счет оптимизации алгоритмов агрегации и совместной работы с внутренней и внешней памятью. В конфигурации ресурсов также будет выделено больше памяти, чтобы избежать переполнения памяти. Благодаря этим методам оптимизации, несмотря на то, что производственная линия хранилища данных в реальном времени длиннее исходного процесса, задержка данных существенно не увеличивается. В то же время вычислительные ресурсы, используемые приложениями данных в реальном времени, также значительно сокращаются.
Перспектива
Наша цель — превратить хранилище данных в режиме реального времени в систему данных, которая может соответствовать по точности и согласованности данным автономного хранилища. Предоставление своевременных и надежных услуг передачи данных для продавцов, бизнес-персонала и пользователей Meituan. В то же время он служит унифицированным экспортом данных о прибытии еды в режиме реального времени, помогая другим бизнес-подразделениям группы. В будущем мы будем уделять больше внимания надежности данных и управлению индикаторами данных в реальном времени. Установите надежный мониторинг данных, обнаружение родства данных, механизм перекрестной проверки. Своевременно отслеживайте и предупреждайте об аномальных данных или задержке данных. В то же время процесс разработки оптимизируется, а стоимость разработки обучения данным в реальном времени снижается. Позвольте большему количеству людей, нуждающихся в данных в режиме реального времени, решать проблемы самостоятельно.
использованная литература
Сравнение производительности между Flink и Storm, фреймворком для потоковых вычислений
Об авторе
Вэй Лунь, руководитель отдела данных в реальном времени в отделе технологий общественного питания Meituan Daodian, присоединился к Meituan в 2017 году и долгое время занимался разработкой платформ данных, вычислением данных в реальном времени и архитектурой данных. Есть некоторый опыт и результаты использования Flink для производства данных в реальном времени и повышения эффективности производства. В то же время он также активно продвигает практический опыт Flink в области обработки данных в реальном времени.
Предложения о работе
Студенты, которые заинтересованы в разработке данных и раскрытии ценности данных через сервисный бизнес, могут отправить свои резюме наhuangweilun@meituan.com. У нас есть много неизвестных, но значимых областей, которые ждут вас для изучения в хранилище данных в реальном времени, управлении данными в реальном времени, среде разработки продуктов с данными в реальном времени и инновационных продуктах на основе данных для продаж и продавцов.