Практика Flink в Vipshop

Redis Kafka Flink HDFS

Эта статья взята из выступления Ван Синьчуня 29 июля 2018 года на оффлайновой станции Meetup · Шанхайской станции китайского сообщества Flink. Ван Синьчунь в настоящее время отвечает за контент, связанный с платформой реального времени в Vipshop, в основном включая вычислительную среду в реальном времени и предоставление основных данных в реальном времени, а также работу платформы машинного обучения. Ранее в Meituan Dianping он также отвечал за платформу больших данных. Он накопил богатый опыт работы в области обработки больших данных в режиме реального времени.

Основное содержание этой статьи в основном включает следующие аспекты:

  1. Статус платформы реального времени Vipshop
  2. Практика Flink в Vipshop
  3. Flink On K8S
  4. Последующее планирование

1. Статус-кво платформы реального времени Vipshop

В настоящее время платформа реального времени Vipshop представляет собой не единую вычислительную среду, а три основные вычислительные среды, включая Storm, Spark и Flink. По историческим причинам текущее количество рабочих мест на платформе Storm является самым большим, но с прошлого года фокус бизнеса постепенно сместился на Flink, поэтому количество приложений на Flink в этом году значительно увеличилось.

Основной бизнес платформы реального времени состоит из восьми частей: рекомендации в реальном времени, как ключевой бизнес электронной коммерции, включают в себя множество функций в реальном времени; большой канбан продвижения, включая статистические показатели различных размеров (например, заказы различные измерения, UV, коэффициент конверсии, воронка и т. д.) для руководства, операций и принятия решений о продукте; очистка данных в реальном времени, сбор данных с пользовательских сайтов, выполнение очистки и корреляции в реальном времени, а также предоставление более качественных данных для последующих предприятий. ; помимо интернет-финансов, контроля рисков безопасности, сравнения цен с друзьями и другими предприятиями, а также Logview, Mercury, Titan в качестве внутренней системы мониторинга услуг, системы синхронизации данных в реальном времени VDRC и т. д.

В обязанности платформы реального времени в основном входит вычислительная платформа в реальном времени и базовые данные в реальном времени. Основанная на вычислительных платформах, таких как Storm, Spark и Flink, вычислительная платформа реального времени гарантирует мониторинг и стабильность, а также обеспечивает ввод и вывод данных для развития бизнеса. Базовые данные в режиме реального времени включают в себя определение и нормализацию скрытых точек восходящего потока, очистку и расширение данных о поведении пользователей, журналы MySQL Binlog и другие данные для обеспечения данных контроля качества нисходящего потока.

В архитектурный проект включены два источника данных. Один из них — это данные скрытых точек в таких приложениях, как App, WeChat и H5, а исходные данные собираются и отправляются в kafka; другой — это журнал MySQL Binlog онлайн-данных в реальном времени. Данные очищаются и коррелируются в вычислительной среде, а исходные данные предоставляются с более простыми в использовании данными для последующих бизнес-приложений (включая автономные расширенные таблицы и т. д.) посредством ETL в реальном времени.

2. Практика Flink в Vipshop

Сценарий 1: Канбан Dataeye в реальном времени

Канбан Dataeye в режиме реального времени предназначен для поддержки расчета в реальном времени всех данных закопанных точек, данных заказов и т. д., которые имеют характеристики большого объема данных, и существует множество измерений, которые необходимо учитывать, например, вся станция. , вторичная платформа, отдел, расписание, толпа, виды деятельности, измерения времени и т.д., повышают сложность расчета, а показатели вывода статистических данных могут достигать сотен тысяч в секунду.

В качестве примера UV-вычислений встроенные данные в Kafka сначала очищаются, а затем связываются с данными Redis, и связанные данные записываются в Kafka; последующие вычислительные задачи Flink используют связанные данные Kafka. Обычно объем результатов расчета задачи также велик (из-за большого количества расчетных измерений и показателей, которое может достигать десятков миллионов), а вывод данных также буферизуется через Kafka, и, наконец, синхронизируется с HBase с помощью задача синхронизации для отображения данных в реальном времени. Задача синхронизации ограничит текущий поток данных, записываемых в HBase, и объединит индикаторы одного типа для защиты HBase. В то же время есть еще одно вычислительное решение для аварийного восстановления.

При использовании Storm для вычислений в вычислительном движке Redis необходимо использовать в качестве промежуточного хранилища состояния.После перехода на Flink Flink сам имеет хранилище состояний, что экономит место на диске; поскольку ему не требуется доступ к Redis, это также повышает производительность и Общие ресурсы.Потребление снижено до 1/3 от первоначального.

В процессе постепенного переноса вычислительных задач со Storm на Flink двусторонние решения были последовательно перенесены, а вычислительные задачи и задачи синхронизации были разделены одновременно, что уменьшило нагрузку на запись данных в HBase.

После перехода на Flink необходимо отследить и исправить некоторые проблемы. Чтобы FlinkKafkaConsumer по бизнес-причинам модифицировал Aotu Commit в kafka и установил смещение, нужно реализовать функцию поддержки переключения кластера kafka. Данные состояния без окна необходимо очистить вручную. Существует также общая проблема с вычислительными фреймворками — необходимо решить проблему перекоса данных. При этом для задачи отслеживания количества задач синхронизации Storm может получить значение от Redis, а Flink остается только ждать.

Сценарий 2: данные Kafka попадают в HDFS

Раньше его реализовывал Spark Streaming, но сейчас он постепенно переходит на Flink.OrcBucketingTableSinkПоместите данные скрытых точек в таблицу Hive на HDFS. При обработке Flink скорость записи одной задачи может достигать примерно 3,5 К/с. После использования Flink потребление ресурсов снижается на 90 %, а задержка уменьшается с 30 с до менее чем 3 с. В настоящее время Flink по-прежнему поддерживает Spark Bucket Table.

Сценарий 3: ETL в реальном времени

Для обработки ETL болевой момент заключается в том, что таблица словаря хранится в HDFS и постоянно изменяется, а поток данных в реальном времени необходимо объединять со таблицей словаря. Изменения в таблице словаря вызваны автономными пакетными задачами, текущая практика заключается в использованииContinuousFileMonitoringFunctionа такжеContinuousFileReaderOperatorРегулярно отслеживайте изменения данных HDFS, постоянно обновляйте новые данные и используйте последние данные для объединения данных в реальном времени.

Мы планируем использовать более общий подход для поддержки соединения между таблицами и потоками Hive, чтобы добиться эффекта автоматической отправки данных после изменения данных таблицы Hive.

3. Флинк на K8S

В Vipshop есть несколько различных вычислительных сред, включая вычисления в реальном времени, машинное обучение и автономные вычисления, поэтому для управления необходима единая базовая структура, поэтому Flink перенесен на K8S.

На K8S используются сетевые компоненты Cisco, а каждый докер-контейнер имеет независимый ip, который также виден внешнему миру. Общая архитектура устройства слияния платформы реального времени показана на следующем рисунке.

Между схемой реализации Vipshop на K8S и схемой, предоставленной сообществом Flink, все еще есть большая разница. Vipshop использует режим K8S StatefulSet для развертывания и внутренне реализует некоторые интерфейсы, связанные с кластером. Задание соответствует мини-кластеру и поддерживает высокую доступность. Для Flink основной причиной использования StatefulSet является то, что имя хоста модуля упорядочено; потенциальные преимущества этого:

  1. Поды с именами хостов -0 и -1 могут быть напрямую назначены менеджером заданий; кластер можно запустить с набором состояний, и должно быть два развертывания; менеджер заданий и менеджер задач — это отдельные развертывания.

  2. После сбоя модуля по разным причинам, поскольку имя хоста модуля, повторно запущенного StatefulSet, остается неизменным, скорость восстановления кластера теоретически может быть выше, чем скорость развертывания (имя хоста каждого развертывания является случайным).

Описание параметров переменных окружения, которые необходимо задать в сценарии точки входа docker образа:

имя переменной окружения параметр Образец **** контента иллюстрировать
JOB_MANGER_HOSTS StatefulSet.name-0,StatefulSet.name-1 flink-cluster-0,flink-cluster-1 Имя хоста JM, короткое имя хоста; нельзя использовать полное доменное имя
FLINK_CLUSTER_IDENT namespace/StatefulSet.name default/flink-cluster корневой каталог для установки zk ha и проверки hdfs
TASK_MANAGER_NUMBER_OF_TASK_SLOTS containers.resources.cpu.limits 2 Количество слотов ТМ, заданное согласно resources.cpu.limits
FLINK_ZK_QUORUM env:FLINK_ZK_QUORUM 10.198.199.112:2181 Адрес HA ZK
JOB_MANAGER_HEAP_MB env:JOB_MANAGER_HEAP_MBvalue:containers.resources.memory.limit -1024 4096 Размер кучи JM должен быть меньше, чем container.resources.memory.limits из-за существования памяти вне кучи, иначе легко уничтожить OOM.
TASK_MANAGER_HEAP_MB env:TASK_MANAGER_HEAP_MB value: containers.resources.memory.limit -1024 4096 Размер кучи TM должен быть меньше, чем container.resources.memory.limits из-за существования памяти Netty вне кучи, в противном случае OOM легко убить

Другие конфигурации, такие как HDFS, от которых зависит кластер Flink, управляются и поддерживаются путем создания карты конфигурации.

kubectl create configmap hdfs-conf --from-file=hdfs-site.xml --from-file=core-site.xml

4. План дальнейших действий

В современных системах реального времени данные, которые должны обрабатываться платформой машинного обучения, распределяются по различным компонентам хранения данных, таким как Kafka, Redis, Tair и HDFS. большая проблема.Для текущего доступа к данным и их синтаксического анализа часто требуется много усилий, и основные болевые точки включают в себя:

  1. Для двоичных (PB/Avro и других форматов) данных в Kafka, Redis и Tair пользователи не могут быстро и напрямую понять схему и содержание данных, а стоимость сбора содержимого данных и общения с авторами высока.
  2. Из-за отсутствия независимой службы унифицированной системы данных доступ к двоичным данным в Kafka, Redis, Tair и т. д. должен основываться на информации, предоставленной писателем, такой как класс генерации прототипа, вики-определение формата данных и т. д. который является дорогостоящим в обслуживании и подвержен ошибкам.
  3. Отсутствие реляционной схемы не позволяет пользователям напрямую развивать бизнес на основе более эффективных и простых в использовании API уровня SQL или LINQ.
  4. Невозможно легко публиковать и обмениваться данными через независимый сервис.
  5. Данные в режиме реального времени не могут быть предоставлены непосредственно механизму пакетного SQL для использования.
  6. Кроме того, в доступе к большинству текущих источников данных также отсутствуют такие функции, как аудит, управление правами, мониторинг доступа и отслеживание.

UDM (унифицированная система управления данными) включает в себя такие модули, как Location Manager, Schema Metastore и Client Proxy.Основные функции включают в себя:

  1. Предоставляет услугу сопоставления имен с адресами, и пользователи получают доступ к данным через абстрактные имена, а не через конкретные адреса.

  2. Пользователи могут легко просматривать схему данных через графический веб-интерфейс и исследовать содержимое данных.

  3. Предоставляет прокси-сервер API клиента, который поддерживает дополнительные функции, такие как аудит, мониторинг и отслеживание.

  4. В таких фреймворках, как Spark/Flink/Storm, инкапсуляция этих источников данных предоставляется в наиболее удобном для использования виде.

Общая архитектура UDM показана на рисунке ниже.

К пользователям UDM относятся производители и потребители данных в режиме реального времени, машинного обучения и автономных платформ. При использовании Sql API или Table API сначала завершите регистрацию Schema, а затем используйте Sql для разработки, что сокращает объем кода разработки.

Внутренний процесс UDM иллюстрируется диаграммой последовательности доступа Spark к данным Kafka PB.

Во Flink UDMExternalCatalog используется для открытия моста между вычислительной инфраструктурой Flink и UDM.За счет реализации различных интерфейсов ExternalCatalog и реализации TableSourceFactory соответствующих источников данных выполняются различные функции, такие как схема и контроль доступа.