Почему Robinhood использует Airflow

задняя часть Python Программа перевода самородков редкоземельный

Robinhood распределяет множество задач через задания cron. Эти задания варьируются от анализа данных и агрегации показателей до брокерских операций, таких как выплата дивидендов. Первоначально мы использовали cron для планирования этих заданий, но по мере увеличения их количества и сложности это становилось все более и более сложным:

  • управление зависимостямитрудный. С помощью cron мы должны планировать нисходящие задания с наихудшей ожидаемой продолжительностью вышестоящего задания. Это становится все труднее по мере увеличения сложности этих заданий и их зависимостей.
  • обработка отказови оповещения должны управляться заданием. При наличии зависимостей, если задание не может обрабатывать повторные попытки и сбои восходящего потока, инженеры должны быть на связи.
  • отступлениетрудный. Мы должны просмотреть журналы или оповещения, чтобы проверить, как выполнялась работа в определенный день в прошлом.

Чтобы удовлетворить потребности планирования, мы решили отказаться от cron и заменить его чем-то, что решает вышеуказанные проблемы. Мы исследовали некоторые альтернативы с открытым исходным кодом, такие какPinball,Azkabanа такжеLuigi, и, наконец, решил использоватьAirflow.

Pinball

Разработанный Pinterest, Pinball обладает многими функциями распределенной, горизонтально масштабируемой системы управления рабочими процессами и планирования. Он решает множество проблем, упомянутых выше, но имеет мало документации и относительно небольшое сообщество.

Azkaban

Азкабан, разработанный LinkedIn, вероятно, является самой старой альтернативой, которую мы рассматривали. Он использует файлы свойств для определения рабочих процессов, тогда как в большинстве новых альтернатив используется код. Это затрудняет определение сложных рабочих процессов.

Luigi

Разработанный Spotify, Luigi имеет активное сообщество и, вероятно, наиболее близок к Airflow в нашем исследовании. Он использует Python для определения рабочих процессов и имеет простой пользовательский интерфейс. Но у Луиджи нет планировщика, и пользователям по-прежнему приходится полагаться на cron для планирования заданий.

Привет Воздушный поток!

Airflow, разработанный Airbnb, имеет растущее сообщество и, кажется, лучше всего подходит для наших целей. Это горизонтально масштабируемая распределенная система управления рабочими процессами, которая позволяет нам определять сложные рабочие процессы с помощью кода Python.

управление зависимостями

Использование воздушного потокаоператорв качестве базовой единицы абстракции для определения задач и использованияDAG(Направленный ациклический граф) Рабочий процесс определяется набором операторов. Операторы расширяемы, что упрощает настройку рабочих процессов. Существует 3 типа операторов:

  • действиеОператор, выполняющий какое-либо действие, например выполнение функции Python или отправку задания Spark.
  • перечислитьОператоры, перемещающие данные между системами, например из Hive в Mysql или из S3 в Hive.
  • датчикЗапускайте нижестоящие задачи в сети зависимостей при выполнении определенных условий, например, проверьте, доступен ли файл на S3, прежде чем использовать его ниже по течению. Датчики — это мощная функция Airflow, позволяющая нам создавать сложные рабочие процессы и легко управлять их предварительными условиями.

Ниже приведен пример того, как различные типы датчиков могут использоваться в типичном ETL (Преобразование извлечения данных и загрузка)процесс работы. В примере оператор датчика используется для ожидания доступности данных, а оператор сдвига используется для перемещения данных в нужное место. Затем оператор действия используется на этапе преобразования, а результат загружается с помощью оператора перехода. Наконец, мы используем оператора датчика для проверки правильности сохранения результатов.

| 传感器 -> 转移 -> | 动作 | -> 转移 -> 传感器 |
|      提取        |  转换 |      加载       |

Рабочий процесс ETL с использованием различных типов операторов Airflow

Устранение неполадок и мониторинг

Airflow позволяет нам настраивать конфигурацию политики повторных попыток для отдельных задач и может устанавливать задачи, которые терпят неудачу, повторяют попытку и запускаются.дольше, чем ожидалосьтревога в случае. Airflow имеет интуитивно понятный пользовательский интерфейс с некоторыми мощными инструментами для мониторинга и управления заданиями. Он предоставляет исторический обзор заданий и инструменты для управления состоянием заданий — например, завершение выполняющегося задания или повторный запуск задания вручную. Уникальной особенностью Airflow является возможность создавать диаграммы с использованием данных задания. Это позволяет нам создавать настраиваемые визуализации для тщательного мониторинга заданий и выступать в качестве отличного инструмента отладки при устранении неполадок заданий и планировании.

Масштабируемость

Операторы воздушного потока определяются с использованием классов Python. Это упрощает определение настраиваемых повторно используемых рабочих процессов путем расширения существующих операторов. Мы создали большой набор пользовательских операторов, некоторые известные примеры — OpsGenieOperator, DjangoCommandOperator и KafkaLagSensor.

Умный Крон

DAG воздушного потока определяются с использованием кода Python. Это позволяет нам определять более сложные расписания, чем cron. Например, некоторые из наших DAG нужно запускать только в течение торгового дня. С простейшим cron нам пришлось бы настроить его на работу во все дни недели, а затем обрабатывать праздничные дни в приложении.

Мы также используем датчики воздушного потока, чтобы начать работу сразу после закрытия рынка, даже если день открыт только половину дня. В следующем примере выполняется динамическое обновление на основе рыночного времени в заданную дату путем настройки операторов для рабочих процессов, требующих сложного планирования.

Рабочие процессы, которые динамически планируются на основе рыночного времени на заданную дату

засыпка

Мы используем Airflow для агрегации метрик и пакетной обработки данных. Поскольку потребности продолжают меняться, иногда нам нужно вернуться и изменить способ агрегирования определенных показателей или добавить новые. Для этого требуется возможность обратного заполнения данных за произвольный период времени в прошлом. Airflow предоставляет инструмент командной строки, который позволяет нам выполнять обратную засыпку в произвольные периоды времени с помощью одной команды, а также запускать обратную засыпку из пользовательского интерфейса. Мы используем сельдерей (нашимAsk Solemmake) распределите эти задачи по рабочим ящикам. Возможности распространения Celery позволяют нам использовать больше рабочих блоков при выполнении обратной засыпки, что делает обратную засыпку быстрой и легкой.

Распространенные ошибки и недостатки

В настоящее время мы используем Airflow 1.7.1.3, который отлично работает в продакшене, но имеет свои собственныеСлабые стороны и подводные камни.

  • проблема с часовым поясом- Airflow использует системный часовой пояс (вместо UTC) для планирования. Для этого требуется, чтобы вся настройка Airflow работала в одном часовом поясе.
  • планировщикЗапускайте запланированные задания и задания обратной засыпки отдельно. Это может привести к странным результатам, таким как заполнение, не соответствующее конфигурации max_active_runs группы обеспечения доступности баз данных.
  • Airflow в основном используется для пакетной обработки данных, поэтому его разработчики решили всегдаПодождите некоторое время перед запуском задания. Таким образом, для задания, выполнение которого запланировано на полночь каждый день, в контексте передается время выполнения «2016-12-31 00:00:00», но на самом деле это «2017-01-01 00:00». :00" это действительно работает. Это может сбивать с толку, особенно в заданиях, которые выполняются нечасто.
  • Случайная засыпка— По умолчанию Airflow попытается заполнить пропущенные задачи, когда DAG возобновляет работу после паузы или при добавлении новой DAG с начальной датой в прошлом. Хотя такое поведение ожидаемо, в конце концов, его нельзя обойти, и это может вызвать проблемы, если задание не должно быть заполнено. Представлен воздушный поток 1.8ближайший операторДля решения этой проблемы.

Суммировать

Airflow быстро превратился в важную часть нашей инфраструктуры Robinhood. Возможность определять DAG с помощью кода Python и расширяемого API делает Airflow настраиваемым и мощным инструментом. Надеемся, что эта статья будет полезна всем, кто изучает инструменты планирования и управления рабочими процессами для своих нужд. Мы рады ответить на любые вопросы. Если вас интересуют такие вещи, рассмотрите нашНабор персонала!

благодарныйArpan Shah,Aravind GottipatiJack Randall.

Если вы обнаружите ошибки в переводе или в других областях, требующих доработки, добро пожаловать наПрограмма перевода самородковВы также можете получить соответствующие бонусные баллы за доработку перевода и PR. начало статьиПостоянная ссылка на эту статьюЭто ссылка MarkDown этой статьи на GitHub.


Программа перевода самородковэто сообщество, которое переводит высококачественные технические статьи из Интернета сНаггетсДелитесь статьями на английском языке на . Охват контентаAndroid,iOS,внешний интерфейс,задняя часть,блокчейн,продукт,дизайн,искусственный интеллектЕсли вы хотите видеть более качественные переводы, пожалуйста, продолжайте обращать вниманиеПрограмма перевода самородков,официальный Вейбо,Знай колонку.