Практика Флинка по вычислениям в реальном времени в Youzan

Понравилось

Введение

Эта статья в основном состоит из пяти частей:

Во-первых, это потрясающая архитектура платформы реального времени.

Во-вторых, почему мы выбрали Flink на этапе исследования. В этой части приведены некоторые сравнения структурированной потоковой передачи Flink и Spark, а также причины выбора Flink.

В-третьих, более важный контент, у Flink хорошая практика. Сюда входят некоторые подводные камни, с которыми мы столкнулись в процессе использования Flink, а также некоторый специфический опыт.

Четвертая часть — это некоторая практика вычислений в реальном времени и интерфейса на основе SQL.

Последние слова — это некоторые перспективы будущего Flink. Эту часть можно разделить на две части: одна часть посвящена тому, как наша компания будет более глубоко использовать Flink, а другая часть — некоторым новым функциям, которые Flink может добавить в будущем.


2. Архитектура платформы реального времени Youzan

Архитектура платформы реального времени Youzan состоит из нескольких основных компонентов.

Во-первых, для данных в режиме реального времени промежуточное программное обеспечение сообщений, безусловно, необходимо. В Youzan, помимо широко используемого в индустрии Kafka, есть еще и NSQ. В отличие от Kafka, NSQ разработан с использованием Go, поэтому компания запечатывает уровень клиента Java, чтобы гарантировать, что сообщение будет доставлено хотя бы один раз в режимах push и ack, поэтому коннектор также будет иметь относительно большой разрыв, особенно часть который реализует отказоустойчивость. В процессе реализации, ссылаясь на коннектор Rabbit MQ, официально предоставленный Flink, были внесены некоторые модификации в сочетании с характеристиками клиента NSQ.

Далее идет вычислительный движок.Самый старый это Storm,и на Storm еще выполняются некоторые задачи.Что касается новых задач,то они принципиально не будут разрабатываться на его основе,потому что кроме высокой стоимости разработки,семантическую поддержку,поддержку SQL , включая поддержку управления состоянием, не очень хорошо, а пропускная способность все еще относительно низкая.Перенос задач Storm на Flink также является одной из наших следующих задач. Существует также Spark Streaming.Условно говоря, у Spark относительно хорошая экология, но Spark Streaming — это микропакетная обработка, что накладывает на него много ограничений.Помимо высокой задержки, он также будет полагаться на внешнее хранилище для сохранения промежуточных состояние хранения. Flink — относительно новый движок в Youzan, зачем вводить Flink, когда есть Spark и Storm, о нем я расскажу в следующей части.

Механизм хранения, помимо традиционного MySQL, мы также используем HBase, ES и ZanKV. ZanKV — это распределенная база данных KV, совместимая с протоколом Redis, разработанным нашей компанией, поэтому давайте просто будем понимать ее как Redis.

Механизм OLAP реального времени основан на Druid и имеет очень хорошее применение в многомерной статистике.

И, наконец, наша живая платформа. Платформа реального времени обеспечивает функции управления кластером, управления проектами, управления задачами и мониторинга аварийных сигналов. .

Здесь кратко представлена ​​архитектура платформы реального времени, а затем следует этап исследования Flink в Youzan. В этом разделе я буду в основном сравнивать Spark Structured Streaming.


3. Почему стоит представить Flink

Что касается того, почему его сравнивают со структурированной потоковой передачей Spark (SSS)? Потому что это два репрезентативных движка в контексте SQLизации в реальном времени.

Во-первых, это производительность, сравните ее с нескольких точек зрения. Во-первых, это задержка.Нет никаких сомнений в том, что Flink как потоковый движок лучше, чем микропакетный движок SSS. Хотя Spark также представляет механизм непрерывных вычислений, он не так хорош, как Flink, с точки зрения семантической гарантии и зрелости. Насколько я знаю, они делают это, назначая rdd узлу на длительный срок.

Второй наиболее интуитивно понятный показатель — это пропускная способность, которая в некоторых сценариях немного уступает Spark. Но когда речь идет о задачах с большими промежуточными состояниями, управление состоянием Flink на основе RocksDB показывает свои преимущества.

Flink может использовать чистую память или RocksDB для управления промежуточными состояниями. Что касается RocksDB, то в простом понимании это встроенная база данных с кешем. Благодаря возможности сохранения на диск Flink может сохранять гораздо больший объем состояния, чем SSS, и не подвержен OOM. А в чекпойнте выбран инкрементальный режим, нужно только бэкапить sst файл, отличный от последнего чекпойнта. В процессе использования выяснилось, что RocksDB также может удовлетворить наши потребности в качестве производительности управления состоянием.

После разговора о производительности, давайте поговорим о SQLизации, которая сейчас также является общим направлением. Когда я начал экспериментировать с SSS, я попробовал несколько операций агрегирования в одном операторе SQL, но это вызвало исключение. Внимательно прочитав документацию, я обнаружил, что это действительно не поддерживается в SSS. Второе внятно также не поддерживается. Flink намного превосходит SSS по этим двум пунктам. Таким образом, с точки зрения SQL в реальном времени, Flink выиграл для себя еще одно голосование. Кроме того, у Flink более гибкие окна. Для вывода также упоминается модель DataFlow.Flink поддерживает операции удаления и обновления, а SSS поддерживает только операции обновления. (Здесь SSS основан на Spark версии 2.3)

Гибкость API. В SSS правда таблица приносит большое удобство, но для некоторых операций все же хочется оперировать в виде DStream или rdd, но SSS не обеспечивает такого преобразования, и можно только писать какие-то UDF. Но во Flink Table и DataStream можно гибко преобразовывать друг в друга, чтобы иметь дело с более сложными сценариями.


4. Практика Флинка в Youzan

Прежде чем приступить к использованию Flink, первое, что нужно рассмотреть, — это развертывание. Из-за существующего стека технологий мы выбрали развертывание на Yarn и использование режима Single Job.Хотя ApplicationMasters будет больше, это, несомненно, повысит изоляцию.

4.1 Проблема 1: FLINK-9567

Когда я начал развертывание, я столкнулся с довольно странной проблемой. Давайте сначала поговорим о предыстории.Поскольку он все еще находится на стадии исследования, используется очередь Yarn по умолчанию, которая имеет низкий приоритет и ее легко вытеснить при нехватке ресурсов. Однажды утром я запустил задачу и подал заявку на 5 контейнеров для запуска TaskExecutor, относительно простой потоковой задачи со статусом.Я хотел запустить ее на некоторое время, чтобы увидеть, стабильно ли это или нестабильно. Эта задача Flink в итоге заняла более 100 контейнеров, и она продолжает увеличиваться, но работают только пять контейнеров, остальные контейнеры имеют прописанные слоты, а слоты все простаивают. На следующих двух рисунках представлены задачи в нормальном состоянии и рассматриваемые задачи соответственно.

после ошибки

Прежде чем углубляться в детали этой проблемы, позвольте мне представить, как Flink интегрируется с Yarn. Согласно рисунку ниже, мы представляем, что эти компоненты делают один за другим снизу вверх.

TaskExecutor — это исполнитель фактической задачи, он может иметь несколько слотов, и каждый слот выполняет определенную подзадачу. Каждый TaskExecutor зарегистрирует свой собственный слот в SlotManager и сообщит о своем статусе, будь то занятое состояние или состояние ожидания.

SlotManager является не только менеджером слота, но и отвечает за предоставление необходимых слотов для запущенных задач. Текущее отставание по слот-заявкам также фиксируется. Если слотов недостаточно, подайте заявку на контейнер в ResourceManager Flink.

Ожидающие слоты Невыполненные заявки на слоты и счетчики

Менеджер ресурсов Flink отвечает за взаимодействие с менеджером ресурсов Yarn, выполняя ряд операций, таких как применение контейнеров, запуск контейнеров и обработка выхода из контейнера. Поскольку используется метод асинхронного приложения, также необходимо записывать текущее невыполнение контейнерных приложений, чтобы предотвратить получение слишком большого количества контейнеров.

Счетчик невыполненных запросов контейнера в ожидании контейнера

AMRMClient является исполнителем асинхронного приложения, и CallbackHandler уведомляет ResourceManager Flink, когда он получает контейнер и когда контейнер выходит.

Yarn ResourceManager является как бы распределителем ресурсов, отвечает за получение запросов на судно, и судно готово для Клиента.

Здесь сразу вводится много понятий, позвольте мне на простом примере описать роль этих компонентов в работе.

Во-первых, в нашей конфигурации 3 диспетчера задач, и каждый диспетчер задач имеет два слота, то есть всего требуется 6 слотов. В настоящее время есть 4 слота, и планировщику задач нужно еще два слота для запуска подзадач при обращении к слоту.

В это время SlotManager обнаруживает, что все слоты заняты, поэтому помещает запрос этого слота в ожидающие слоты. Таким образом, вы можете видеть, что счетчик ожидающих слотов только что подскочил с 0 до 2. После этого SlotManager подаст заявку на новый TaskExecutor в ResourceManager Flink, который может удовлетворить потребности этих двух слотов. Таким образом, ResourceManager Flink добавляет 1 к ожидающему запросу контейнера и запрашивает ресурсы у Yarn через клиент AMRM.

Когда Yarn подготавливает соответствующий контейнер, он уведомляет ResourceManager Flink через CallbackHandler. Flink запустит TaskExecutor в каждом полученном контейнере и уменьшит ожидающий запрос контейнера на 1. Когда ожидающий запрос контейнера станет равным 0, он немедленно вернется, даже если будет получен новый контейнер.

Когда TaskExecutor запускается, он регистрирует два своих собственных слота в SlotManager, и SlotManager завершает два незавершенных SlotRequests, уведомляя планировщик о том, что две подзадачи могут быть выполнены на этом новом TaskExecutor, а ожидающие запросы также устанавливаются на 0. , До сих пор все, как и ожидалось.

Так как же возникла эта лишняя проблема? Во-первых, давайте посмотрим, что сейчас это обычная работающая задача. Занимает 6 слотов.

Если в это время есть какие-то причины, которые вызывают ненормальное завершение работы TaskExecutor, например, вытеснение ресурсов Yarn. В это время Yarn уведомит ResourceManager Flink о том, что эти три контейнера вышли из строя ненормально. Таким образом, ResourceManager Flink немедленно подаст заявку на три новых контейнера. Мы обсудим здесь наихудший случай, потому что эта проблема нестабильна и не повторяется.

CallbackHandler дважды получил обратный вызов и обнаружил, что контейнер завершил работу ненормально, поэтому он немедленно подал заявку на новый контейнер, а ожидающие запросы контейнера также были установлены на 3.

Если задача перезапустится в это время, планировщик подаст заявку на 6 слотов в SlotManager, а в SlotManager нет доступных слотов, он подаст заявку на 3 контейнера в ResourceManager Flink, а ожидающих запросов контейнеров станет 6.

Окончательный результат показан на рисунке: было запущено 6 TaskExecutors с 12 слотами, но только 6 использовались нормально, а 6 оставались бездействующими.

В процессе исправления этого у меня было две попытки. При первой попытке, после аварийного выхода Контейнера, я не сразу подаю заявку на новый контейнер. Но проблема в том, что если Контейнер совершит ошибку в процессе запуска TaskExecutor, то этот компенсационный механизм будет потерян, а некоторые Слот-запросы будут заблокированы, потому что SlotManager уже подал заявку на Контейнеры для них.

Вторая попытка — проверить ожидающие слоты до того, как ResourceManager Flink подаст заявку на новый контейнер.Если текущее количество слотов может быть удовлетворено за счет невыполненных контейнеров, нет необходимости подавать заявку на новый контейнер.

4.2 Вопрос 2: Мониторинг

Вторая яма, на которую мы наступили во время использования, на самом деле связана с мониторингом задержки. В примере очень простая задача, два источника, два оператора кроме источника, степень параллелизма 2. На каждый источник и оператор в нем по две подзадачи.

Логика задачи очень простая, но когда мы включаем мониторинг задержек. Даже для такой простой задачи он будет записывать данные о задержке от каждой исходной подзадачи до каждой подзадачи оператора. Эти данные о задержке также включают среднюю задержку, максимальную задержку, 99-процентную задержку и т. д. Тогда можно придумать формулу, что количество задержанных данных равно количеству подзадач источника, умноженному на количество источников, умноженному на параллелизм оператора, умноженному на количество операторов. N = n (подзадач на источник) * n (источники) * n (подзадач на оператора) * n (оператор)

Здесь я делаю относительно простое предположение, то есть количество подзадач источника и количество подзадач алгоритма являются p-параллельными. Из приведенной ниже формулы видно, что количество наблюдений увеличивается квадратично по мере увеличения степени параллелизма. N = p^2 * n (источники) * n (оператор)

Если мы увеличим предыдущую задачу до 10 степеней параллелизма, мы получим 400 копий задержанных данных. Это может показаться не большой проблемой, на нормальную работу компонента это не влияет.

Однако в списке рассылки разработчиков Flink пользователь сообщил, что JobMaster зависает вскоре после включения мониторинга задержки. Он получил 24000+ данных мониторинга, и ConcurrentHashMap, содержащий эти данные, занимает в памяти 1,6 Гб памяти. В общем, сколько памяти даст JobMaster от Flink, я обычно выделяю 1-2 g, что в итоге приведет к долгому FullGC и OOM.

Итак, как решить эту проблему? Когда мониторинг задержек начал влиять на нормальную работу системы, проще всего его отключить. Однако при отключенном мониторинге задержек, с одной стороны, мы не можем знать задержку текущей задачи, а с другой стороны, нет возможности сделать какие-то тревожные функции по задержке.

Итак, другое решение заключается в следующем. Первый — Flink-10243, который предоставляет больше возможностей для детализации мониторинга задержки, уменьшая количество в источнике. Например, если мы используем режим Single для сбора этих данных, он будет записывать только задержку подзадач каждого оператора, игнорируя источник или подзадачи источника. Таким образом, может быть получена такая формула, которая также может уменьшить 400 задержек, генерируемых задачами с упомянутыми ранее десятью степенями параллелизма, до 40. Эта функция была выпущена в версии 1.7.0 и перенесена в версии 1.5.5 и 1.6.2.

Также Flink-10246 предлагает улучшить MetricQueryService. Он содержит несколько подзадач.Первые три подзадачи устанавливают выделенную низкоприоритетную систему ActorSystem для службы мониторинга, которая здесь может быть просто понята как предоставление низкоприоритетного потока для независимого пула потоков для обработки связанных задач. Его цель также состоит в том, чтобы предотвратить влияние задач мониторинга на основные компоненты. Эта функция была выпущена в версии 1.7.0.

Другой — Flink-10252, который все еще пересматривается и совершенствуется, чтобы контролировать размер сообщений мониторинга.


4.3 Конкретная практика 1

Далее я расскажу о некоторых конкретных применениях Flink в Youzan.

Первый — это Flink в сочетании с Spring. Почему мы должны объединять их?Во-первых, в Youzan есть много сервисов, которые предоставляют только интерфейс Dubbo, и пользователи часто получают клиент этого сервиса через Spring, и это также верно в некоторых приложениях для вычислений в реальном времени.

Кроме того, многие разработчики приложений для работы с данными также являются инженерами Java и надеются использовать Spring и некоторые компоненты экосистемы для упрощения разработки во Flink. Потребности пользователей должны быть удовлетворены. Далее я расскажу о некоторых типичных ошибках и о том, как их использовать в конце концов.

Типичным примером первой ошибки является запуск среды Spring в пользовательском коде Flink, а затем получение и вызов связанных компонентов в операторе. Но на самом деле последний Spring Context запускается на стороне клиента, то есть на стороне, которая отправляет задачу.На рисунке посередине красная рамка с надписью Spring Context, чтобы указать, где он запускается. Однако пользователь действительно находится в TaskSlot TaskManager, когда выполняется фактический вызов, и все они находятся в разных jvms, что явно неразумно. Итак, у нас снова вторая ошибка.

Вторая ошибка выглядит намного лучше первой, мы используем RichFunction в операторе и получаем Spring Context из конфигурационного файла в методе open. Но не будем говорить о том, зря ли запускать несколько Spring Contexts в одном TaskManager, с запуском двух Spring Contexts в одной JVM будут проблемы. Некоторым пользователям может показаться, что это непросто, просто установите TaskSlot на 1. Однако существует также механизм OperatorChain, связывающий несколько узко зависимых операторов для выполнения в одном TaskSlot. Тогда мы можем закрыть OperatorChain, верно? Тем не менее, Flink может выполнять оптимизацию на основе CoLocationGroup, помещать несколько подзадач в один TaskSlot и выполнять их по очереди.

Но на самом деле окончательное решение относительно простое, это не что иное, как использование режима синглтона для инкапсуляции SpringContext, чтобы в каждом jvm был только один, и через этот синглтон в методе open получить соответствующий бин операторной функции.

Однако при вызове службы Dubbo ответ часто составляет не менее 10 мс и более. Максимальная пропускная способность TaskSlot составляет всего одну тысячу, что можно назвать большой потерей производительности. Таким образом, чтобы решить эту проблему, мы можем использовать асинхронность и кэширование.Для вызовов, которые возвращают одно и то же значение несколько раз, мы можем использовать кэширование, и мы можем использовать асинхронность для повышения пропускной способности.

4.4 Особая практика 2

Но что, если вы хотите использовать асинхронность и кэширование одновременно? Сначала я думал, что эту функцию легко реализовать, но когда я действительно написал RichAsyncFunction, я обнаружил, что нет никакого способа использовать KeyedState, размещенный на Flink. Итак, первой мыслью было сделать LRU-подобный кэш для кэширования данных. Но это вообще не может использовать преимущества управления состоянием Flink. Поэтому я немного изучил реализацию.

Почему бы не поддержать это?

Когда запись входит в оператор, Flink сначала извлечет ключ и укажет KeyedState на пространство хранения, связанное с ключом.Рисунок указывает на пространство хранения, связанное с key4. Однако если асинхронная операция, связанная с ключом1, завершена в это время, и ожидается, что содержимое будет кэшировано, содержимое будет записано в пространство хранения, ограниченное ключом4. В следующий раз, когда запись, относящаяся к ключу 1, войдет в оператор, вернитесь к месту хранения, связанному с ключом 1, для поиска, но данные вообще не могут быть найдены, поэтому их необходимо запросить снова.

Таким образом, решение состоит в том, чтобы настроить оператора, чтобы каждая запись, поступающая в систему, указывала на место для хранения одного и того же открытого ключа. Используйте MapState для кэширования в этом пространстве. Наконец, функция, запускаемая оператором, наследует AbstractRichFunction для получения KeyedState в методе open и реализует интерфейс AsyncFunction для выполнения асинхронных операций.


5. Вычисление SQL в реальном времени и интерфейс

Сначала мы использовали метод SDK для упрощения разработки задач SQL в реальном времени, но это не очень удобно для пользователя, поэтому теперь мы говорим о задачах SQL в реальном времени на основе интерфейса и используем Flink в качестве базового движка. для выполнения этих задач.

При выполнении задач SQL в реальном времени первым является абстракция внешних систем.Источники данных и пулы данных абстрагируются в потоковые ресурсы.Пользователи регистрируют информацию схемы и метаинформацию своих данных в платформе, а платформа управляет чтением и записью. в соответствии с разрешением проектной группы пользователя. Если формат источника сообщения можно здесь унифицировать, это значительно упростит работу. Например, в Youzan пользователи, которые хотят получить доступ, должны убедиться, что сообщение находится в формате Json, а информация о схеме может быть сгенерирована напрямую через образец сообщения.

Следующим шагом является получение соответствующей информации о схеме и метаинформации в соответствии с источником данных и пулом данных, выбранным пользователем, регистрация соответствующего соединителя таблицы внешней системы в задаче Flink, а затем выполнение соответствующей инструкции SQL.

Попробуйте использовать UDF для расширения функций, которые не поддерживаются семантикой SQL.

Существует метаинформация между источниками данных и пулами данных, и можно получить возможные зависимости между задачами в реальном времени, а также можно отслеживать всю связь.


6. Будущее и перспективы

Пакетная обработка Flink и попытки модуля машинного обучения будут сравниваться со Spark для анализа преимуществ и недостатков. Он все еще находится на стадии исследования, и основное внимание уделяется комбинации Flink и Hive, что соответствует проблеме FLINK-10566.

Что касается разработки Flink, я уделяю больше внимания следующей оптимизации планирования и управления ресурсами и участвую в ней. Теперь планирование Flink и график выполнения задач связаны друг с другом с использованием относительно простого механизма планирования. Изолируя планировщик и делая его подключаемым, можно применять больше механизмов планирования. Кроме того, на основе нового планировщика может быть реализован более гибкий механизм пополнения и сокращения ресурсов для реализации Auto Scaling. Это может быть важной функцией в следующей версии. Соответствует двум выпускам FLINK-10404 и FLINK-10429.


Наконец, небольшая реклама, команда инфраструктуры группы больших данных Youzan в основном отвечает за платформу данных Youzan (DP), вычисления в реальном времени (Storm, Spark Streaming, Flink), офлайн-вычисления (HDFS, YARN, HIVE, SPARK). SQL), онлайн-хранилище (HBase), OLAP в реальном времени (Druid) и ряд других технических продуктов, с которыми можно связаться с заинтересованными партнерами.yangshimin@youzan.com