Практика использования системы анализа журналов Suning в реальном времени на основе Spark Streaming

Архитектура Kafka анализ данных Spark

предисловие

В настоящее время базовая вычислительная платформа, основанная на технологическом стеке Hadoop в отрасли, становится все более стабильной и зрелой, а вычислительная мощность больше не является основным узким местом. Разнообразные данные, сложные требования к бизнес-анализу, стабильность системы и надежность данных — эти мягкие требования постепенно стали основными проблемами, с которыми сталкиваются системы анализа журналов. В 2018 году интеграция онлайн и офлайн стала общей тенденцией. Suning.com предложила и внедрила модель двухлинейной интеграции, а также предложила общую стратегию умной розничной торговли. Суть ее заключается в том, чтобы предоставлять потребителям более качественные услуги на основе данных. Первый этап анализа данных закладывает прочную основу для операций с данными.

Введение в процесс и архитектуру анализа данных

бизнес фон

Онлайн- и оффлайн-операторы Suning предъявляют разнообразные требования к анализу данных и все более высокой своевременности. В настоящее время система анализа журналов в реальном времени обрабатывает миллиарды журналов трафика каждый день, что не только необходимо для обеспечения низкой задержки, отсутствия потери данных и т. д., но также сталкивается со сложной логикой анализа и расчета, все из которых выдвигают высокие стандарты. и высокие требования к построению системы. Как показано ниже:

  • Богатые источники данных: данные о трафике онлайн и офлайн, данные о продажах, данные обслуживания клиентов и т. д.
  • Разнообразные потребности бизнеса: поддержка потребностей в данных для маркетинга, закупок, финансов, продавцов цепочки поставок и т. д.

Для получения дополнительных галантерейных товаров, пожалуйста, обратите внимание на публичный аккаунт WeChat «AI Frontline» (ID: ai-front)

Процесс и архитектура

Базовая обработка данных системы анализа журналов Suning в реальном времени разделена на три этапа: сбор, очистка и расчет индекса, как показано на рис. 1.

  • Модуль сбора: собирает журналы каждого источника данных и отправляет Kafka в режиме реального времени через Flume.
  • Модуль очистки: получайте данные журнала в режиме реального времени, выполняйте обработку и преобразование данных и реализуйте задачи очистки на основе Storm.В настоящее время ежедневно обрабатываются миллиарды данных о трафике, а структурированные данные, обработанные задачами очистки, будут отправляться в Kafka. снова очередь.
  • Расчет индикатора: Получайте структурированные данные о трафике от Kafka в режиме реального времени и рассчитывайте соответствующие показатели в режиме реального времени. Существует два основных типа задач расчета индикатора: задачи Storm и задачи Spark Streaming. Оба метода имеют свои сценарии применения. Подходит Spark Streaming для сценариев квазиреального времени Его преимущества: высокая пропускная способность, поддержка стандартного SQL, простота разработки, поддержка расчета оконных функций Storm, Spark Благодаря поддержке, предоставляемой Suning Data Cloud Platform, Suning Data Cloud Platform в настоящее время интегрирует: Hive , Spark, Storm, Druid, ES, Hbase, Kafka и другие компоненты для разработки больших данных поддерживают требования группы к обработке и хранению больших данных.

После расчета индикатора данные в основном сохраняются в механизмах хранения, таких как HBase и Druid, а бизнес-система считывает рассчитанные данные индикатора в режиме реального времени для предоставления услуг анализа данных для операторов.

Spark Streaming в практике индикаторного анализа

Введение в потоковую передачу Spark

Как мы все знаем, Spark — это среда пакетной обработки, а Spark Streaming использует концепцию пакетной обработки для реализации вычислительной среды квазиреального времени. требованиям к задержке, как показано на следующем рисунке. Spark Streaming поддерживает несколько источников данных: Kafka, Flume, HDFS, Kenisis и т. д. Платформа изначально поддерживает запись на носители, такие как HDFS, и обычные реляционные базы данных.

По сравнению со Storm, Spark Streaming имеет архитектуру квазиреального времени, более высокую пропускную способность, поддерживает SQL, лучшую поддержку HDFS, базы данных и других носителей данных, прост в разработке и поддерживает функции Windows, которые могут поддерживать сложные расчеты оконных функций.

Анализ индикатора NDCG

Normalized Discounted Cumulative Gain, или NDCG, часто используется в качестве оценочного индекса для поискового ранжирования.В идеале, чем выше ранжирование результатов поиска, тем больше вероятность клика, то есть выше оценка (прирост). CG = сумма баллов отсортированных результатов, дисконтированная на основе рейтинга, для каждого результата балл * рейтинговый вес, вес = 1/log(1 + рейтинг), чем выше рейтинг, тем выше вес. Сначала мы вычисляем идеальную DCG (называемую IDCG), а затем вычисляем реальную DCG по результату клика пользователя, NDCG = DCG/IDCG, чем ближе значение к 1, тем лучше результат поиска. DCG Рассчитывается следующим образом:

Выполните поиск по ключевому слову «яблоко» на Suning.com и возьмите в качестве примера 4 результата в первой строке.

Подсчитано, что IDCG = 1, DCG = 0,5, NDCG = DCG / IDCG = 0,5, и, наконец, для каждого поиска рассчитывается показатель NDCG, который используется в качестве индекса оценки для оценки качества результатов поиска.

Разработка схемы расчета NDCG

По статистическому промежутку времени поискового поведения 86 % поисковых действий выполняются в течение 5 минут, 90 % — в течение 10 минут (интервал времени от начала поиска до последнего нажатия на список результатов). Установите значение 15 минут. Это создает две вычислительные трудности:

  • Расчет временного окна: каждый раз проводится общий анализ данных за первые 15 минут.
  • Дедупликация: гарантирует, что поиск выполняется только один раз в течение временного окна.

В конце концов, мы выбрали платформу Spark Streaming и использовали ее функцию Window для расчета временного окна. Временное окно составляет 15 минут, а шаг — 5 минут, что означает, что он рассчитывается каждые 5 минут. В каждом расчете выполняется только расчет NDCG для поведения поиска, инициированного в интервале [15 минут назад, 10 минут назад], так что двойной расчет не будет выполняться.

После разработки плана онлайн-тест быстро обнаружил проблему, а данные, сохраненные в течение 15 минут, потребляли слишком много ресурсов.В результате анализа было обнаружено, что данные поиска составляют лишь небольшую часть данных о трафике, а Задача очистки хранит отдельные поисковые данные в Kafka, а NDCG рассчитывается. Подписка на новые поисковые данные значительно снижает потребление ресурсов.

Обеспечение производительности и безопасности данных

Гарантия выполнения

Оценка емкости и расширение

Оценка мощностей — это не статическая работа

  • Журналы трафика растут, а вычислительная мощность системы ограничена
  • Большие рекламные акции могут вызвать дополнительные всплески данных.

Ввиду этих ситуаций наиболее важной гарантией является заранеее расширение мощностей в соответствии с ростом бизнеса. Расширение емкости зависит от возможностей горизонтального расширения системы. Путем настройки таких параметров, как количество разделов Kafka Topic, количество узлов обработки Storm и параллелизма, а также количество параллелизма Spark Streaming, производительность обработки данных гарантированно соответствует требованиям бизнеса. потребности.

Оптимизация расчетов многомерного анализа

Если взять в качестве примера индикатор NDCG, в настоящее время он поддерживает расчет комбинации 4 измерений: регион, город, канал и поисковый запрос.Для поддержки любой комбинации 4 измерений требуется 15 вычислений и 15 обновлений хранилища. операции выполняются в HBase. Как показано ниже.

Текущая степень детализации времени – только дни. Если добавить временные параметры, такие как часы, недели и месяцы, количество задач и объем хранилища удвоятся. В настоящее время более актуальным становится высокопроизводительный вычислительный движок OLAP для повышения эффективности анализа индикаторов.

Во второй половине 2016 года облачная платформа данных начала создание механизма OLAP и официально предоставила услуги Druid в 2017 году. Druid поддерживает общие вычисления агрегирования, такие как сумма, максимум, минимум, среднее значение, количество, количество отдельных элементов и т. д., а также доступ к данным в режиме реального времени из Kafka Его столбчатая структура хранения повышает эффективность поиска данных и повышает эффективность вычислений за счет предварительной агрегации данных. .

После предварительного исследования решения и тестирования производительности Druid значительно повысил эффективность расчета и анализа таких индикаторов, как NDCG, что упростило задачу анализа индикаторов, а возможность многомерного анализа индикаторов была передана Druid для решения.

защита данных

Гарантированная потеря данных

Задачи данных Storm и Spark часто необходимо перезапускать для операций публикации, что особенно важно для обеспечения того, чтобы данные не были потеряны в течение определенного периода времени. Его необходимо разложить, чтобы обеспечить два момента:

  • Источник данных гарантирует, что данные не будут потеряны
  • Задачи данных обеспечивают обработку данных.

Во-первых, Kafka гарантирует, что данные не будут потеряны благодаря механизмам хранения и резервного копирования.

Во-вторых, Storm предоставляет механизм подтверждения, гарантирующий, что данные должны быть обработаны.

Spark Streaming предоставляет механизм резервного копирования контрольной точки (журнала WAL). После сбоя или перезапуска задачи данные контрольной точки можно использовать для восстановления, чтобы убедиться, что данные обработаны. Однако в журнале wal будет храниться копия всех данных в HDFS, что очень трудоемко, Spark Streaming оптимизирован для Kafka и предоставляет прямой API Kafka.При записи журнала WAL вам нужно только записать смещение очереди Kafka.Когда задача возобновится, вы можете перечитать Данные Kafka по смещению Весь процесс показан на следующем рисунке.

ровно один раз семантические гарантии

Для данных о продажах необходимо не только обеспечить обработку данных, но и обеспечить, чтобы данные обрабатывались только один раз, а данные, включающие финансовые показатели продаж, должны быть точными на 100%.

Первое решение: архитектура Labmda + дедупликация Redis

  • Дедупликация в реальном времени: после расчета заказа номер заказа записывается в Redis, и номер заказа сравнивается, чтобы гарантировать, что данные не обрабатываются повторно.
  • Автономное обновление: пересчитывайте показатели продаж каждое утро и обновляйте данные показателей предыдущего дня.

Вторая схема: MPP + первичный ключ

  • Сценарии использования: подходит для внешних сценариев использования, внешние системы запрашивают и анализируют данные из данных Mpp.
  • Техническое решение: MPP выбирает базу данных PG CITUS, создает таблицу в базе данных MPP и устанавливает уникальные поля, такие как номер заказа, в качестве первичного ключа.

Эволюция и оптимизация будущей архитектуры

В настоящее время вся базовая система обработки основана на отраслевой структуре с открытым исходным кодом, и система далека от совершенства, особенно низкоуровневые данные — это относительно кропотливая и тяжелая работа, и часто возникают проблемы с качеством данных. нет системы мониторинга, она часто встречается пассивно. , решить проблему. Из-за приятного роста новых предприятий изменения логики очистки данных стали обычным явлением, а выпуски кода — частыми.

В конце 2017 года началась оптимизация архитектуры системы, в основном добавление двух модулей.

  • Мониторинг качества данных: настраивая правила мониторинга качества, он может выполнять проверку правил для данных в режиме реального времени и в автономном режиме. Он поддерживает два метода: выборочная проверка и полная проверка. Разработчики своевременно уведомляются об отклонениях данных с помощью сигналов тревоги.
  • Система конфигурации правил очистки данных: абстрагируйте логику очистки в настраиваемые правила и реализуйте изменение логики очистки данных, определяя правила очистки изменений. использовать Drools и Groovy.Метод взаимодействует для реализации конфигурации правил очистки.

Резюме и перспективы

Система обработки и анализа журналов, как внутренняя поддержка приложений высокого уровня, таких как интеллектуальный анализ данных и анализ BI, играет связующую роль, особенно для сценариев с несколькими бизнес-направлениями и большими объемами данных, без систематической поддержки платформы, больших данных. в итоге будут пустые разговоры. Я считаю, что не только алгоритмическая модель, но и лежащие в ее основе качество данных, своевременность и стабильность системы станут победителями и проигравшими умной розничной торговли.

об авторе

Ван Фупин, руководитель отдела технологии центров обработки данных Suning Tesco Big Data Center, работал старшим инженером отдела больших данных Baidu и архитектором отдела поиска и точности Yihaodian. На протяжении многих лет он занимается исследованием и разработкой больших данных, имеет глубокое понимание инструментов больших данных и машинного обучения, имеет богатый опыт в области вычислений в реальном времени, а также имеет глубокое понимание Storm и Spark. Потоковое. Страстно увлечен обменом и распространением технологий, в настоящее время сосредоточен на создании платформы анализа данных, стремясь связать моделирование данных с анализом данных на основе технологий Druid, Kylin и других OLAP, предоставить сервис индикаторов данных на уровне платформы и создать «данные». как услуга» универсальное решение.

благодарныйЦай ФанфанОбзор этой статьи.