Airflow стал стандартом де-факто для систем планирования задач, используя тот же код и задачи настройки, что и terraform.
Airflow использует Python в качестве языка разработки, который очень прост в изучении и использовании.
Полный код кейса выложен на github:GitHub.com/аккуратная жизнь/нет…
Получить экземпляр воздушного потока
Вы можете использовать докер, чтобы начать одним щелчком мыши
git clone https://github.com/puckel/docker-airflow
cd docker-airflow
docker-compose -f docker-compose-LocalExecutor.yml up -d
Посетите ip:8080, чтобы увидеть эффект
Вы можете видеть, что воздушный поток уже доступен
отредактировать даг-файл
Этот файл dag используется для определения последовательности и зависимостей между задачами и задачами.
настройки параметров
Ссылаться на:airflow.apache.org/tutorial.contracts…
Ниже приведены некоторые из наиболее важных параметров:
имя параметра | эффект |
---|---|
start_date | время начала задачи |
end_date | Время окончания задания, если не заполнено, значит навсегда |
retries | Количество повторных попыток для сбоев выполнения задачи |
retry_delay | интервал повтора |
Код реализован следующим образомvim mydag.py
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
"start_date": datetime(2019, 7, 10),
"retries": 1,
"retry_delay": timedelta(minutes=5),
}
dag = DAG("mydag", default_args=default_args, schedule_interval=timedelta(1))
Приведенный выше «schedule_interval=timedelta(1)» означает запуск этой задачи один раз в день, вы также можете использовать синтаксис crontab, см.airflow.apache.org/scheduler Также…
Этот новый файл dag необходимо импортировать с помощью python для выполнения этого скрипта, который можно импортировать с помощью следующей команды.
docker-compose -f docker-compose-LocalExecutor.yml exec webserver bash
python dags/mydag.py
Обновите веб-интерфейс, чтобы увидеть недавно добавленную задачу mydag.
Загрузка задачи этого воздушного потока происходит относительно медленно, если статус отображения отличается от вышеприведенного, необходимо некоторое время подождать.Определить задачи кейса
Здесь определены три задачи, эхо 1, 2 и 3 соответственно, взаимосвязь следующая
Код реализован следующим образом
Определить содержимое выполнения задачи
t1 = BashOperator(task_id="echo1", bash_command="echo 1", dag=dag)
t2 = BashOperator(task_id="echo2", bash_command="echo 2", dag=dag)
t3 = BashOperator(task_id="echo3", bash_command="echo 3", dag=dag)
Вышеупомянутое заставляет задачу использовать bash, выполняемую воздушным потоком, воздушный поток также может выполнять много ввода, ссылка на полный список:airflow.apache.org/_API/люблю грудь…
Определение межзадачных отношений
t2.set_upstream(t1)
t3.set_upstream(t1)
Загрузите измененный файл оркестровки задач воздушного потока.
Измените файл dag в первый раз, воздушный поток будет загружен автоматически, нажмите кнопку «Обновить», чтобы вручную загрузить файл конфигурации.
Эффект операции следующий
Вы можете видеть, что зависимости echo1, echo2, echo3 определены успешно
начать задачу
Сначала включите эту задачу, а затем нажмите кнопку «Выполнить», эффект операции будет следующим.
Вы также можете только включить эту задачу, а затем дождаться автоматического запуска воздушного потока в соответствии с установленным временем.
Вы можете видеть, что задача начала выполнятьсяПосмотреть статус выполнения задачи
Статус выполнения этой задачи представлен разными цветами
Вы можете видеть, что все задачи были успешно выполненыПросмотр результатов выполнения задачи
Нажмите «Браузер» > «Экземпляр задачи», чтобы просмотреть список всех выполненных задач.
Щелкните идентификатор конкретной задачи, чтобы перейти на страницу сведений о выполнении задачи. Нажмите «Журнал» на странице сведений, чтобы просмотреть вывод журнала задачи. Эффект операции следующий:
Просмотр результатов рендеринга шаблона jinjia
Если вы используете функцию шаблона воздушного потока, вы можете просмотреть результат рендеринга шаблона выполненной задачи на странице сведений о выполнении задачи.Эффект операции следующий:
удалить задачу
Вам необходимо удалить файл определения dag, например rm dags/mydag.py, а затем удалить его в веб-интерфейсе.
некоторые замечания
Этот воздушный поток зависит от многого, потому что до версии 1.0 есть некоторые ошибки, рекомендуется использовать докер для запуска, основная функция планирования задач dag в порядке, другие функции можно понять в первую очередь.
airflow может использовать функцию шаблона для создания сценариев, см.:airflow.apache.org/tutorial.contracts…
Справочник по синтаксису шаблона Jinjia:jinja.pocoo.org/docs/dev/
Airflow работает в часовом поясе UTC по умолчанию.Если вам нужно рассчитать правильное время, вам нужно преобразовать время в часовой пояс.Основной код выглядит следующим образом
#将本地时间转换为utc时间,再设置为start_date
tz = pytz.timezone('Asia/Shanghai')
dt = datetime(2018, 7, 26, 12, 20, tzinfo=tz)
utc_dt = dt.astimezone(pytz.utc).replace(tzinfo=None)