- Оригинальный адрес:Понимание ключевых концепций Apache Airflow
- Оригинальный автор:Dustin Stansbury
- Перевод с:Программа перевода самородков
- Постоянная ссылка на эту статью:GitHub.com/rare earth/gold-no…
- Переводчик:Starrier
- Корректор:yqian1991
Третья часть из четырех частей
существуетQuizletизПоиск оптимальной системы управления рабочим процессомизпервая частьиВторая частьВ , мы продвигаем потребность в системе управления рабочими процессами (WMS) в современной деловой практике и предоставляем список желаемых функций и функций, которые привели нас к выборуApache Airflowкак наш выбор WMS. Этот пост предназначен для того, чтобы предоставить любознательному читателю подробный обзор компонентов и операций Airflow. Мы будем реализовывать эту сериюпервая частьПример рабочего процесса, описанный в (см.Рисунок 3.1), чтобы представить ключевые концепции Airflow.
Рисунок 3.1: Пример рабочего процесса обработки данных.
Airflow — это WMS, то есть: он обрабатывает задачи и их зависимости как код, регулирует выполнение задач в соответствии с этими расписаниями и распределяет задачи для выполнения между рабочими процессами. Airflow предоставляет отличный пользовательский интерфейс для отображения состояния текущих и прошлых задач, а также позволяет пользователям вручную управлять выполнением и статусом задач.
Рабочие процессы представляют собой «ориентированные ациклические графы».
Рабочий процесс в Airflow — это набор задач с направленными зависимостями. В частности, Airflow использует направленные ациклические графы — или сокращенно DAG — для представления рабочих процессов. Каждый узел в графе представляет собой задачу, а ребра в графе представляют собой зависимости между задачами (граф должен быть безцикловым, поэтому циклические зависимости не могут возникать, что приводит к бесконечным циклам выполнения).
Рисунок 3.2Вверху показано, как можно монетизировать наш пример рабочего процесса как DAG в Airflow. обрати внимание наРисунок 1.1Структура плана выполнения нашего примера задачи рабочего процесса вРисунок 3.2Структура DAG аналогична.
Рисунок 3.2. Снимок экрана пользовательского интерфейса Airflow, представляющий пример DAG рабочего процесса. верхняя часть панели: 25 январяDagRun
вид диаграммы. Темно-зеленые узлы указываютTaskInstance
Статус «Успех». светло-зеленый изображаетTaskInstance
Состояние «бег».нижняя субпанель:example_workflow
Древовидная диаграмма DAG. На скриншоте выделены основные компоненты Airflow, в том числе датчики, операторы, задачи,DagRuns
иTaskInstances
.DagRuns
Представлены на диаграмме в виде столбцов —DagRun
Обозначено голубым цветом 25 января. Каждая ячейка на иллюстрации представляет собойTaskInstance
—— 25 январяperform_currency_conversion
миссияTaskInstance
(«Рабочее состояние») отображается синим цветом.
На высоком уровне группу обеспечения доступности баз данных можно рассматривать как контейнер, содержащий контекст задач, которые чрезвычайно зависят от того, когда и как устанавливать эти задачи. Каждая DAG имеет набор свойств, наиболее важным из которых является ееdag_id
, уникальный идентификатор среди всех DAG, егоstart_date
Он используется для описания времени выполнения задачи DAG,schedule_interval
Используется для указания частоты выполнения задачи. также,dag_id
,start_date
иschedule_interval
, каждая DAG может использовать наборdefault_arguments
для инициализации. Эти параметры по умолчанию наследуются всеми задачами в группе обеспечения доступности баз данных.
В следующем блоке кода мы определяем группу обеспечения доступности баз данных в Airflow, которая реализует рабочий процесс нашего примера игровой компании.
# 每个工作流/DAG 都必须要有一个唯一的文本标识符
WORKFLOW_DAG_ID = 'example_workflow_dag'
# 开始/结束时间是 datetime 对象
# 这里我们在 2017 年 1 月 1 号开始执行
WORKFLOW_START_DATE = datetime(2017, 1, 1)
# 调度器/重试间隔是 timedelta 对象
# 这里我们每天都执行 DAG 任务
WORKFLOW_SCHEDULE_INTERVAL = timedelta(1)
# 默认参数默认应用于所有任务
# 在 DAG 中
WORKFLOW_DEFAULT_ARGS = {
'owner': 'example',
'depends_on_past': False,
'start_date': WORKFLOW_START_DATE,
'email': ['example@example_company.com'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 5,
'retry_delay': timedelta(minutes=5)
}
# 初始化 DAG
dag = DAG(
dag_id=WORKFLOW_DAG_ID,
start_date=WORKFLOW_START_DATE,
schedule_interval=WORKFLOW_SCHEDULE_INTERVAL,
default_args=WORKFLOW_DEFAULT_ARGS,
)
Operators
,Sensors
и задачи
Хотя группа обеспечения доступности баз данных используется для организации и установки контекста выполнения, она не выполняет никаких реальных вычислений. Вместо этого задача на самом деле является элементом Airflow, который мы хотим выполнить «сделанную работу». Задачи имеют две характеристики: они могут выполнять некоторую операцию отображения, в этом случае ониOperator, или они могут приостановить выполнение зависимых задач до тех пор, пока не будет выполнено какое-либо условие, и в этом случае ониSensors. В принципе, Оператор может выполнять любую функцию, которая выполняется в Python. Точно так же датчики могут проверять состояние любого процесса или структуры данных.
В следующем блоке кода показано, как определить некоторые (гипотетические) классы операторов и датчиков для реализации нашего примера рабочего процесса.
##################################################
# 自定义 Sensors 示例/ Operators (NoOps) #
##################################################
class ConversionRatesSensor(BaseSensorOperator):
"""
An example of a custom Sensor. Custom Sensors generally overload
the `poke` method inherited from `BaseSensorOperator`
"""
def __init__(self, *args, **kwargs):
super(ConversionRatesSensor, self).__init__(*args, **kwargs)
def poke(self, context):
print 'poking {}'.__str__()
# poke functions should return a boolean
return check_conversion_rates_api_for_valid_data(context)
class ExtractAppStoreRevenueOperator(BaseOperator):
"""
An example of a custom Operator that takes non-default
BaseOperator arguments.
Extracts data for a particular app store identified by
`app_store_name`.
"""
def __init__(self, app_store_name, *args, **kwargs):
self.app_store_name = app_store_name
super(ExtractAppStoreRevenueOperator, self).__init__(*args, **kwargs)
def execute(self, context):
print 'executing {}'.__str__()
# pull data from specific app store
json_revenue_data = extract_app_store_data(self.app_store_name, context)
# upload app store json data to filestore, can use context variable for
# date-specific storage metadata
upload_appstore_json_data(json_revenue_data, self.app_store_name, context)
class TransformAppStoreJSONDataOperator(BaseOperator):
"""
An example of a custom Operator that takes non-default
BaseOperator arguments.
Extracts, transforms, and loads data for an array of app stores
identified by `app_store_names`.
"""
def __init__(self, app_store_names, *args, **kwargs):
self.app_store_names = app_store_names
super(TransformJSONDataOperator, self).__init__(*args, **kwargs)
def execute(self, context):
print 'executing {}'.__str__()
# load all app store data from filestores. context variable can be used to retrieve
# particular date-specific data artifacts
all_app_stores_extracted_data = []
for app_store in self.app_store_names:
all_app_stores_extracted_data.append(extract_app_store_data(app_store, context))
# combine all app store data, transform to proper format, and upload to filestore
all_app_stores_json_data = combine_json_data(all_app_stores_extracted_data)
app_stores_transformed_data = transform_json_data(all_app_stores_json_data)
upload_data(app_stores_transformed_data, context)
код определяетBaseSensorOperator
подклассConversionRatesSensor
. Этот класс реализует всеBaseSensorOperator
требуется объектpoke
метод. Если нижестоящая задача должна продолжить выполнение,poke
метод должен возвращатьTrue
, иначе возвратFalse
. В нашем примере этот датчик будет использоваться для определения доступности обменного курса для внешнего API.
ExtractAppStoreRevenueOperator
иTransformAppStoreJSONDataOperator
Оба класса наследуются от класса Airflow.BaseOperator
класс и реализуетexecute
метод. В нашем примере два классаexecute
Все методы берут данные из API хранилища приложений и преобразовывают их в предпочтительный для компании формат хранения. УведомлениеExtractAppStoreRevenueOperator
Также принимает пользовательский параметрapp_store_name
, который сообщает классу, откуда магазин приложений должен получить данные запроса.
Обратите внимание, что операторы и датчики обычно определяются в отдельных файлах и импортируются в то же пространство имен, где мы определяем DAG. Но мы также можем добавить эти определения классов в тот же файл определения DAG.
Формально Airflow определяет задачи, которые должны быть созданы как классы датчиков или операторов. Экземпляр задачи должен предоставлять уникальныйtask_id
и контейнер DAG для добавления задач (примечание: в версиях выше 1.8 объект DAG больше не требуется). В приведенном ниже блоке кода показано, как создать все задачи, необходимые для выполнения примера рабочего процесса. (Примечание: мы предполагаем, что все операторы, упомянутые в примере, определены или импортированы в пространстве имен).
########################
# 实例化任务 #
########################
# 实例化任务来提取广告网络收入
extract_ad_revenue = ExtractAdRevenueOperator(
task_id='extract_ad_revenue',
dag=dag)
# 动态实例化任务来提取应用程序存储数据
APP_STORES = ['app_store_a', 'app_store_b', 'app_store_c']
app_store_tasks = []
for app_store in APP_STORES:
task = ExtractAppStoreRevenueOperator(
task_id='extract_{}_revenue'.format(app_store),
dag=dag,
app_store_name=app_store,
)
app_store_tasks.append(task)
# 实例化任务来等待转换率、数据均衡
wait_for_conversion_rates = ConversionRatesSensor(
task_id='wait_for_conversion_rates',
dag=dag)
# 实例化任务,从 API 中提取转化率
extract_conversion_rates = ExtractConversionRatesOperator(
task_id='get_conversion_rates',
dag=dag)
# 实例化任务来转换电子表格数据
transform_spreadsheet_data = TransformAdsSpreadsheetDataOperator(
task_id='transform_spreadsheet_data',
dag=dag)
# 从所有应用程序存储中实例化任务转换 JSON 数据
transform_json_data = TransformAppStoreJSONDataOperator(
task_id='transform_json_data',
dag=dag,
app_store_names=APP_STORES)
# 实例化任务来应用
perform_currency_conversions = CurrencyConversionsOperator(
task_id='perform_currency_conversions',
dag=dag)
# 实例化任务来组合所有数据源
combine_revenue_data = CombineDataRevenueDataOperator(
task_id='combine_revenue_data',
dag=dag)
# 实例化任务来检查历史数据是否存在
check_historical_data = CheckHistoricalDataOperator(
task_id='check_historical_data',
dag=dag)
# 实例化任务来根据历史数据进行预测
predict_revenue = RevenuePredictionOperator(
task_id='predict_revenue',
dag=dag)
Этот код создания экземпляра задачи выполняется в том же файле/пространстве имен, что и определение DAG. Мы видим, что код для добавления задач очень чистый и позволяет встроенную документацию с помощью аннотаций. Строки 10–19 демонстрируют одно из преимуществ определения рабочего процесса в коде. Мы смогли динамически определить три разные задачи для использованияfor
Цикл извлекает данные из каждого хранилища приложений. Этот подход, вероятно, не принесет нам большой пользы в этом небольшом примере, но по мере роста количества магазинов приложений преимущества будут становиться все более значительными.
Определение зависимостей задач
Ключевым преимуществом Airflow является простота и интуитивно понятные соглашения для определения зависимостей между задачами. В следующем коде показано, как мы определяем граф зависимостей задач для примера рабочего процесса:
###############################
# 定义任务依赖关系 #
###############################
# 依赖设置使用 `.set_upstream` 和/或
# `.set_downstream` 方法
# (in version >=1.8.1,也可以使用
# `extract_ad_revenue << transform_spreadsheet_data` 语法)
transform_spreadsheet_data.set_upstream(extract_ad_revenue)
# 动态定义应用程序存储依赖项
for task in app_store_tasks:
transform_json_data.set_upstream(task)
extract_conversion_rates.set_upstream(wait_for_conversion_rates)
perform_currency_conversions.set_upstream(transform_json_data)
perform_currency_conversions.set_upstream(extract_conversion_rates)
combine_revenue_data.set_upstream(transform_spreadsheet_data)
combine_revenue_data.set_upstream(perform_currency_conversions)
check_historical_data.set_upstream(combine_revenue_data)
predict_revenue.set_upstream(check_historical_data)
В то же время этот код выполняется в том же файле/пространстве имен, что и определение DAG. использование в зависимости от задачиset_upstream
иset_downstream
операторы для установки (но в версиях выше 1.8 используйте оператор сдвига<<
и>>
выполнять аналогичные операции более лаконично). Задача также может иметь несколько зависимостей одновременно (например,combine_revenue_data
) или ни одного (например, всеextract_*
Задача).
Рисунок 3.2 Верхняя субпанельГруппа DAG Airflow, созданная с помощью приведенного выше кода, отображается как пользовательский интерфейс Airflow (подробнее о пользовательском интерфейсе позже). Структура зависимостей DAG связана сРисунок 1.1План выполнения, который мы показываем для нашего примера рабочего процесса, очень похож. Когда DAG выполняется, Airflow использует эту структуру зависимостей, чтобы автоматически определять, какие задачи могут выполняться одновременно в любой момент времени (например, всеextract_*
Задача).
DagRuns и TaskInstances
После того как мы определили DAG, то есть создали экземпляры задач и определили их зависимости, мы можем выполнять задачи на основе параметров DAG. Ключевой концепцией Airflow являетсяexecution_time
. Когда планировщик Airflow работает, он определяет график запланированных дат для выполнения задач, связанных с DAG, через равные промежутки времени. Время выполнения из DAGstart_date
начать и повторять для каждогоschedule_interval
. В нашем примере время планирования равно(‘2017–01–01 00:00:00’, ‘2017–01–02 00:00:00’, ...)
. для каждогоexecution_time
, будет создаватьDagRun
и работать в контексте времени выполнения. следовательно,DagRun
это просто DAG с определенным временем выполнения (см.Нижняя часть рисунка 3.2.).
все сDagRun
сопутствующие задачи называютсяTaskInstance
. другими словами,TaskInstance
является экземпляром и имеетexecution_date
контекстные задания (см.Нижняя часть рисунка 3.2.).DagRun
песокTaskInstance
является основной концепцией Airflow. каждыйDagRun
and TaskInstance
связаны с записью в метабазе Airflow, в которой записан их статус (например, «в очереди», «выполняется», «сбой», «пропущено», «ожидается повторная попытка»). Чтение и обновление этих состояний является ключом к процессу планирования и выполнения Airflow.
Архитектура воздушного потока
По своей сути Airflow представляет собой систему очередей, построенную на метабазе. В базе данных хранится состояние задач в очереди, которое планировщик использует для определения приоритетов добавления других задач в очередь. Эта функциональность управляется четырьмя основными компонентами. (видетьЛевая часть рисунка 3.2.):
- метабаза: В этой базе данных хранится информация о статусе задач. База данных выполняет обновления, используя уровень абстракции, реализованный в SQLAlchemy. Этот уровень абстракции четко отделяет оставшуюся функциональность компонента Airflow от базы данных.
- планировщик: планировщик — это процесс, который использует определение DAG в сочетании с состоянием задачи в метаданных, чтобы решить, какие задачи необходимо выполнить, и определить приоритет выполнения задачи. Планировщик обычно работает как сервис.
-
Актуатор: Executor — это процесс очереди сообщений, связанный с планировщиком для определения рабочего процесса, который фактически выполняет каждое запланированное задание. Существуют различные типы исполнителей, каждый из которых использует класс, определяющий рабочий процесс для выполнения задач. Например,
LocalExecutor
Выполнение задач с использованием параллельного процесса, работающего на том же компьютере, что и процесс планировщика. другой какCeleryExecutor
Исполнитель выполняет задачи, используя рабочие процессы, находящиеся в отдельных кластерах рабочих машин. - Workers: это процессы, которые фактически выполняют логику задачи, определяемую используемым исполнителем.
Рисунок 3.2: Общая архитектура Airflow. Операции Airflow строятся на основе метабазы (или DAG), в которой хранятся состояния задач и рабочие процессы. Планировщики и исполнители отправляют задачи в очереди для выполнения рабочих процессов. Веб-сервер запускается (часто на том же компьютере, что и планировщик) и взаимодействует с базой данных, представляя статус задачи и журналы выполнения задач в веб-интерфейсе. Каждая цветная рамка указывает на то, что каждый компонент может существовать независимо от других, в зависимости от типа конфигурации развертывания.
Работа планировщика
Во-первых, операции планировщика Airflow больше похожи на черную магию, чем на логические программы. Тем не менее, если вы обнаружите, что отлаживаете его выполнение, понимание того, как работает планировщик, может сэкономить много времени и уберечь читателей от увязания в исходном коде Airflow (хотя мы настоятельно рекомендуем это!). Мы используем псевдокод. основные операции планировщика:
步骤 0. 从磁盘中加载可用的 DAG 定义(填充 DagBag)
当调度器运行时:
步骤 1. 调度器使用 DAG 定义来标识并且/或者初始化在元数据的 db 中的任何 DagRuns。
步骤 2. 调度器检查与活动 DagRun 关联的 TaskInstance 的状态,解析 TaskInstance 之间的任何依赖,标识需要被执行的 TaskInstance,然后将它们添加至 worker 队列,将新排列的 TaskInstance 状态更新为数据库中的“排队”状态。
步骤 3. 每个可用的 worker 从队列中取一个 TaskInstance,然后开始执行它,将此 TaskInstance 的数据库记录从“排队”更新为“运行”。
步骤 4. 一旦一个 TaskInstance 完成运行,关联的 worker 就会报告到队列并更新数据库中的 TaskInstance 的状态(例如“完成”、“失败”等)。
步骤 5. 调度器根据所有已完成的相关 TaskInstance 的状态更新所有活动 DagRuns 的状态(“运行”、“失败”、“完成”)。
步骤 6. 重复步骤 1-5
Web UI
В дополнение к основным компонентам планирования и выполнения Airflow также поддерживает полнофункциональные компоненты веб-интерфейса (см.Рисунок 3.2некоторые примеры пользовательского интерфейса), в том числе:
- Webserver: этот процесс запускает простое приложение Flask, которое считывает все состояния задач из метабазы и позволяет веб-интерфейсу отображать эти состояния.
- Web UI: этот компонент позволяет пользователям клиента просматривать и редактировать статус задачи в метабазе. Из-за связи между планировщиком и базой данных пользовательский веб-интерфейс позволяет пользователю манипулировать поведением планировщика.
- журнал выполнения: эти журналы записываются рабочими процессами и хранятся на диске или в удаленном файловом хранилище (например,GCSилиS3)середина. Веб-сервер получает доступ к журналу и предоставляет его веб-интерфейсу.
Хотя ни одно из этих дополнений не является необходимым для базовой работы Airflow, они отличают Airflow от других текущих средств управления рабочими процессами с функциональной точки зрения. В частности, пользовательский интерфейс и встроенные журналы выполнения позволяют пользователям проверять и диагностировать выполнение задач, а также просматривать и управлять состоянием задач.
Интерфейс командной строки
В дополнение к планировщику и веб-интерфейсу Airflow предоставляет функции надежности через интерфейс командной строки (CLI). В частности, когда мы разрабатывали Airflow, мы обнаружили, что следующие команды очень полезны:
-
airflow test DAG_ID TASK_ID EXECUTION_DATE
. Позволяет пользователям запускать задачи независимо, не затрагивая метабазу и не сосредотачиваясь на зависимостях задач. Эта команда отлично подходит для автономного тестирования базового поведения пользовательского класса Operator. -
airflow backfill DAG_ID TASK_ID -s START_DATE -e END_DATE
. существуетSTART_DATE
иEND_DATE
Выполняйте заполнение исторических данных между ними без запуска планировщика. Это удобно, когда вам нужно изменить некоторую бизнес-логику существующего рабочего процесса и обновить исторические данные. (Обратите внимание, что засыпканенужныйсоздать в базе данныхDagRun
записи, потому что они не[SchedulerJob](https://github.com/apache/incubator-airflow/blob/master/airflow/jobs.py#L471)
класс работает). -
airflow clear DAG_ID
. УдалитьDAG_ID
в метабазеTaskInstance
записывать. Это может быть полезно при повторении функций рабочего процесса/DAG. -
airflow resetdb
: хотя обычно вы не хотите запускать эту команду очень часто, она очень полезна, если вам нужно создать «чистую историю», которая может возникнуть при первоначальной настройке Airflow (примечание: эта команда влияет только на базу данных, журнал не удален).
Подводя итог, мы предлагаем несколько более абстрактных концепций в качестве основы для Airflow. в этой сериипоследний выпуск части, мы обсудим некоторые из наиболее практических соображений при развертывании Airflow в производственной среде.
благодарныйLaura Oppenheimer.
Если вы обнаружите ошибки в переводе или в других областях, требующих доработки, добро пожаловать наПрограмма перевода самородковВы также можете получить соответствующие бонусные баллы за доработку перевода и PR. начало статьиПостоянная ссылка на эту статьюЭто ссылка MarkDown этой статьи на GitHub.
Программа перевода самородковэто сообщество, которое переводит высококачественные технические статьи из Интернета сНаггетсДелитесь статьями на английском языке на . Охват контентаAndroid,iOS,внешний интерфейс,задняя часть,блокчейн,продукт,дизайн,искусственный интеллекти другие поля, если вы хотите видеть больше качественных переводов, пожалуйста, продолжайте обращать вниманиеПрограмма перевода самородков,официальный Вейбо,Знай колонку.