Построение и использование инструмента планирования рабочего процесса Airflow1.8

задняя часть база данных Python MySQL

Цель написания

В последнее время рабочей задаче необходимо перенести процесс ETL, который изначально использовал Kettle, на платформу Hadoop, поэтому необходимо найти инструмент для замены части рабочего процесса Kettle. В среде больших данных обычно используются Oozie, Airflow или Azkaban. После краткой оценки мы выбрали облегченный Airflow в качестве рабочего инструмента.

Airflow — это система управления распределением рабочих процессов, которая управляет процессами задач через направленный ациклический граф, устанавливая зависимости задач и планируя время. Airflow не зависит от задачи, которую мы хотим запустить, просто предоставьте Airflow название задачи и способ ее запуска в качестве задачи.

Процесс установки

Эта установкаAirflow 1.8, а не последней версии apache-airflow 1.9, основная причина в том, что все операции в версии 1.9 основаны на времени UTC, что делает настройку информации о расписании менее интуитивно понятной. Версия 2.0, находящаяся в настоящее время в разработке, имеет возможность устанавливать местный часовой пояс, но она не была опубликована.

подготовка системы

Python 3.5: среда Anaconda 4.2

MySQL 5.6: в режиме LocalExecutor вся информация о DAG сохраняется во внутренней базе данных.

Пользователи ОС: etl

создать базу данных

Серверная часть использует базу данных MySQL для сохранения информации о задаче, сначала создайте базу данных и пользователя в базе данных. следующее

create database airflow;

grant all privileges on airflow.* to 'airflow'@'%' identified by 'airflow';

flush privileges;

переменная среды

Во время работы Airflow будет использовать глобальные переменные среды, поэтому сначала необходимо добавить следующие переменные в ~/.bash_profile.

export AIRFLOW_HOME=/home/etl/airflow

установить воздушный поток

После использования pip для установки воздушного потока и зависимых драйверов базы данных требуется инициализация. Этот процесс создаст файл конфигурации по умолчаниюariflow.cfg, и последующие изменения конфигурации выполняются через этот файл.

# 默认安装1.8版本,因为1.9版本的名字变成了apache-airflow
pip install airflow

# 因为需要连接MySQL数据库,所以需要安装驱动
pip install airflow[mysql]

# 初始化数据库,这一步是必须的,否则无法生成默认配置文件
airflow initdb

# 创建需要的文件夹,否则运行时会报错找不到默认文件夹
mkdir dags
mkdir logs

Включить веб-разрешения

По умолчанию веб-консоль управления airflow не имеет пароля пользователя, поэтому перед переходом на официальную среду нам необходимо включить механизм разрешений.

существуетairflow.cfgУстановите следующие параметры в

[webserver]
authenticate = True
auth_backend = airflow.contrib.auth.backends.password_auth

После включения разрешений первоначальный пользователь должен быть установлен вручную через python REPL перед первым входом в систему.

import airflow
from airflow import models, settings
from airflow.contrib.auth.backends.password_auth import PasswordUser

user = PasswordUser(models.User())
user.username = 'airflow'
user.email = 'airflow@xxx.com'
user.password = 'airflow'

session = settings.Session()
session.add(user)
session.commit()
session.close()

exit()

Изменить подключение к базе данных

Поскольку в качестве метабазы ​​используется MySQL, вам также необходимо настроить параметры подключения к базе данных. существуетairflow.cfgУстановите следующие параметры в

[core]
executor = LocalExecutor
sql_alchemy_conn = mysql://airflow:airflow@192.168.100.57:3306/airflow?charset=utf8

После изменения метода подключения к базе данных необходимо снова выполнить операцию инициализации.

airflow initdb

Изменение другого параметра

Есть еще какие-то разрозненные конфигурации, которые непросто классифицировать, поэтому они записаны здесь.

Конфигурация для отправки уведомления по электронной почте после успешного выполнения задачи, сбоя или повторной попытки

[email]
email_backend = airflow.utils.email.send_email_smtp

[smtp]
smtp_host = smtp.mxhichina.com
smtp_starttls = False
smtp_ssl = False

# Uncomment and set the user/pass settings if you want to use SMTP AUTH
smtp_user = bialert@xxx.com
smtp_password = ******
smtp_port = 25
smtp_mail_from = bialert@xxx.com

По умолчанию веб-интерфейс отображает образцы DAG очень запутанно. Помимо удаления образца DAG из базы данных, вы также можете настроить, чтобы эта часть образца не отображалась.

# 不显示样例DAG
load_examples = False

Механизм подхвата Airflow будет выполнять задания, которые не были выполнены до текущего времени, по очереди, когда вы запускаете DAG. Преимущество этого в том, что недостающие задачи планирования могут быть дополнены, но во многих случаях нам не нужна эта функция. Изменив конфигурацию, наверстывание можно отключить следующим образом.

[scheduler]
# 避免执行catchup,即避免把当前时间之前未执行的job都执行一次
catchup_by_default = False

ВЕБ-управление

Веб интерфейс

На странице порта 8080 по умолчанию вы можете выполнять ежедневные операции с DAG, включая, помимо прочего, запуск, остановку, просмотр журналов и т. д. Интерфейс показан ниже

image-20180529152649601

Управление скриптами

Текущая версия Airflow не предоставляет сценарий завершения работы и не предоставляет удобный способ полного удаления DAG. Чтобы облегчить тестирование, я написал сценарий управления для выполнения связанных задач.

Скрипт называется следующим образом

$ ./airflow_util.py -h
usage: airflow_util.py [-h] [-k] [-s] [--clear CLEAR] [--delete DELETE]

optional arguments:
  -h, --help       show this help message and exit
  -k, --kill       关闭Airflow
  -s, --start      启动Airflow
  --clear CLEAR    删除历史日志
  --delete DELETE  提供需要删除的DAG ID

Исходный код скрипта управления выглядит следующим образом

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import argparse
import pymysql
import subprocess
import time

#连接配置信息
config = {
     'host':'127.0.0.1',
     'port':3306,
     'user':'airflow',
     'password':'airflow',
     'db':'airflow',
     'charset':'utf8',
     }

# 删除历史日志
def clear_log(num):
    print("Clear logs before {0} days ...".format(num))
    cmd = "find %s -maxdepth 1 -type d -mtime +%d | xargs -i rm -rf {}" 
    subprocess.call(cmd % ('./logs',num), shell=True)
    subprocess.call(cmd % ('./logs/scheduler',num), shell=True)

# 通过杀掉后台进程来关闭Airflow
def kill_airflow():
    print("Stoping Airflow ...")
    # exclude current file in case the file name contains keyword 'airflow'
    cmd = "ps -ef | grep -Ei 'airflow' | grep -v 'grep' | grep -v '%s' | awk '{print $2}' | xargs -i kill -9 {}" % (__file__.split('/')[-1])
    subprocess.call(cmd, shell=True)

# 启动Airflow 
def start_airflow():
    kill_airflow()
    time.sleep(3)

    print("Starting Airflow Webserver ...")
    subprocess.call("rm logs/webserver.log", shell=True)
    subprocess.call("nohup airflow webserver >>logs/webserver.log 2>&1 &", shell=True)
    
    print("Starting Airflow Scheduler ...")
    subprocess.call("rm logs/scheduler.log", shell=True)
    subprocess.call("nohup airflow scheduler >>logs/scheduler.log 2>&1 &", shell=True)

# 删除指定DAG ID在数据库中的全部信息。
# PS:因为SubDAG的命名方式为 parent_id.child_id ,所以也会把符合这种规则的SubDAG删除!
def delete_dag(dag_id):
    # 创建连接
    connection = pymysql.connect(**config)
    cursor = connection.cursor()

    sql="select dag_id from airflow.dag where (dag_id like '{}.%' and is_subdag=1) or dag_id='{}'".format(dag_id, dag_id)
    cursor.execute(sql)
    rs = cursor.fetchall()
    dags = [r[0] for r in rs ] 

    for dag in dags:
        for tab in ["xcom", "task_instance", "sla_miss", "log", "job", "dag_run", "dag_stats", "dag" ]:
            sql="delete from airflow.{} where dag_id='{}'".format(tab, dag)
            print(sql)
            cursor.execute(sql)

    connection.commit()
    connection.close()

#
def main_process():
    parser = argparse.ArgumentParser()

    parser.add_argument("-k", "--kill", help="关闭Airflow", action='store_true')
    parser.add_argument("-s", "--start", help="启动Airflow", action='store_true')
    parser.add_argument("--clear", help="删除历史日志", type=int)
    parser.add_argument("--delete", help="提供需要删除的DAG ID")

    args = parser.parse_args()

    if args.kill:
        kill_airflow()
    if args.start:
        start_airflow()
    if args.clear:
        clear_log(args.clear)
    if args.delete:
        delete_dag(args.delete)

if __name__ == '__main__':
    main_process()

Написание DAG

Рабочие процессы в собственном Airflow определяются простыми скриптами Python (есть некоторые сторонние расширения, которые позволяют определять режим перетаскивания).

Обычный ДАГ

Для сценариев, где задач не слишком много, все задачи можно определить в одном py-файле. Таким образом, определяются 4 задачи

img

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import airflow
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import timedelta, datetime

template_caller = "sh /home/etl/jupyter_home/etl_script/spark_scheduler/subdir/caller_spark.sh -m {0} -f {1} "
template_file = '/home/etl/jupyter_home/etl_script/spark_scheduler/subdir/{0}'
default_spark_master = 'spark://192.168.100.51:7077'

#-------------------------------------------------------------------------------
default_args = {
    'owner': '测试',
    'depends_on_past': False,
    'start_date': datetime(2018,4,24,14,0,0),
    'email': ['xxx@xxx.com'],
    'email_on_failure': True,
}

#-------------------------------------------------------------------------------
dag = DAG(
    'demo_spark_normal',
    default_args=default_args,
    description='测试-调用Spark',
    schedule_interval='*/20 * * * *')

#-------------------------------------------------------------------------------
# spark operator
cmd = template_caller.format(default_spark_master, template_file.format('hive_rw.ipynb'))
t1 = BashOperator( task_id='spark_hive', bash_command=cmd , dag=dag)

cmd = template_caller.format(default_spark_master, template_file.format('jdbc_rw.ipynb'))
t2 = BashOperator( task_id='spark_jdbc', bash_command=cmd , dag=dag)

cmd = template_caller.format(default_spark_master, template_file.format('csv_relative.py'))
t3 = BashOperator( task_id='spark_csv', bash_command=cmd , dag=dag)

cmd = template_caller.format(default_spark_master, template_file.format('pure_sql.sql'))
t4 = BashOperator( task_id='spark_sql', bash_command=cmd , dag=dag)
#-------------------------------------------------------------------------------
# dependencies
t1 >> t2 >> t4
t1 >> t3 >> t4

SubDAG

Когда в рабочем процессе слишком много задач, дисплей пользовательского интерфейса будет переполнен В этом случае это может быть достигнуто путем классификации задач по разным SubDAG. С точки зрения конкретного написания его можно разделить на один файл py и несколько схем файлов py.

отдельный файл

В этом случае мы записываем и DAG, и SubDAG в py-файл. Преимущество в том, что легко записать только один файл, а недостаток в том, что если задач много, файлом непросто управлять.

img

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from datetime import datetime, timedelta
from airflow.models import DAG
from airflow.operators.subdag_operator import SubDagOperator
from airflow.operators.bash_operator import BashOperator
from datetime import datetime

PARENT_DAG_NAME = 'atest_04'
#CHILD_DAG_NAME = 'child_dag'

default_args = {
        'owner': '测试',
        'depends_on_past': False,
}

main_dag = DAG(
  dag_id=PARENT_DAG_NAME,
  default_args=default_args,
  description='测试-内嵌SubDAG',
  start_date=datetime(2018,4,21,16,0,0),
  schedule_interval='*/30 * * * *'
)


# Dag is returned by a factory method
def sub_dag(parent_dag_name, child_dag_name, start_date, schedule_interval):
  dag = DAG(
    '%s.%s' % (parent_dag_name, child_dag_name),
    schedule_interval=schedule_interval,
    start_date=start_date,
  )

  t1 = BashOperator(
      task_id='print_{}'.format(child_dag_name),
      bash_command='echo sub key_{}  `date` >> /home/etl/airflow/test.log'.format(child_dag_name),
      dag=dag)

  return dag

#
sub_dag_1 = SubDagOperator(
  subdag=sub_dag(PARENT_DAG_NAME, 'child_01', main_dag.start_date, main_dag.schedule_interval),
  task_id='child_01',
  dag=main_dag,
)

sub_dag_2 = SubDagOperator(
  subdag=sub_dag(PARENT_DAG_NAME, 'child_02', main_dag.start_date, main_dag.schedule_interval),
  task_id='child_02',
  dag=main_dag,
)

#
sub_dag_1 >> sub_dag_2

несколько файлов

Когда есть много SubDAG, лучше сохранить файл DAG в отдельном файле py. Структура файлового каталога выглядит следующим образом

img

Основной файл выглядит следующим образом

PS: Из-за ошибки, что модель не может быть найдена в процессе вызова других файлов в airflow, к основному файлу добавлен оператор, обрабатывающий путь. Это можно заменить, если есть лучший способ.

sys.path.append(os.path.abspath(os.path.dirname(__file__)))
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import sys, os
sys.path.append(os.path.abspath(os.path.dirname(__file__)))

from datetime import datetime, timedelta
from airflow.models import DAG
from airflow.operators.subdag_operator import SubDagOperator
from datetime import datetime

from sub.subdag import sub_dag

PARENT_DAG_NAME = 'atest_03'
CHILD_DAG_NAME = 'child_dag'

default_args = {
        'owner': '测试',
        'depends_on_past': False,
}

main_dag = DAG(
  dag_id=PARENT_DAG_NAME,
  default_args=default_args,
  description='测试-独立SubDAG',
  start_date=datetime(2018,4,14,19,0,0),
  schedule_interval='*/10 * * * *',
  catchup=False
)

sub_dag = SubDagOperator(
  subdag=sub_dag(PARENT_DAG_NAME, CHILD_DAG_NAME, main_dag.start_date,
                 main_dag.schedule_interval),
  task_id=CHILD_DAG_NAME,
  dag=main_dag,
)

Подфайлы следующие

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator

# Dag is returned by a factory method
def sub_dag(parent_dag_name, child_dag_name, start_date, schedule_interval):
  dag = DAG(
    '%s.%s' % (parent_dag_name, child_dag_name),
    schedule_interval=schedule_interval,
    start_date=start_date,
  )

  t1 = BashOperator(
      task_id='print_1',
      bash_command='echo sub 1 `date` >> /home/etl/airflow/test.log',
      dag=dag)

  return dag

Расписание и триггер

В DAG запуск задач определяется двумя основными параметрами:start_dateиschedule_interval. Первый раз, когда DAG запускается, это start_date + schedule_interval. Пример выглядит следующим образом:

start_date         2018-04-20 14:00:00 
schedule_interval  */30 * * * *

Этот первый триггер произойдет в 14:30, но задача 14:00 будет выполнена. В зависимости от того, когда активируется DAG, возникают разные триггеры.

  • Если время активации меньше, чем время первого запуска (например, активация в 14:10), первый запуск будет выполнен вовремя в 14:30.
  • Если время активации превышает время первого триггера (например, активация в 15:40), то согласно конфигурации catchup=True произойдет несколько операций обратной засыпки, т. е.Все вакансииЧасти запускаются последовательно, в частности задачи в 14:00, 14:30 и 15:00. Почему последний раз не в 15:30? Потому что в 15:30 этот момент времени на самом деле является одной задачей в 15:00.
  • Если время активации превышает время первого триггера (например, активация в 15:40), то согласно конфигурации catchup=Falseпоследняя вакансиядля запуска.

Подводя итог, если вы хотите запускать каждые 30 минут, а первый запуск происходит в 14:00, то set start_date должен быть 13:30:00, чтобы в 14:00 запустилась первая задача.

приложение

Изменить уровень журнала по умолчанию

В версии 1.8 УРОВЕНЬ РЕГИСТРАЦИИ нельзя настроить напрямую через файл cfg, поэтому эта функция реализована путем модификации исходного кода.

PS: В версиях после 1.9 говорят, что можно настраивать напрямую, не проверял.

vi /opt/anaconda3/lib/python3.5/site-packages/airflow/settings.py

# 修改此处代码,把默认的INFO修改成WARN即可
LOGGING_LEVEL = logging.WARN