- Оригинальный адрес:Python & Big Data: Airflow & Jupyter Notebook with Hadoop 3, Spark & Presto
- Оригинальный автор:Mark Litwintschik
- Перевод с:Программа перевода самородков
- Постоянная ссылка на эту статью:GitHub.com/rare earth/gold-no…
- Переводчик:cf020031308
- Корректор:yqian1991
В последние годы Python стал популярным языком программирования в области науки о данных, машинного обучения и глубокого обучения. Просто соедините его с языком запросов SQL, чтобы выполнить большую часть работы. SQL великолепен, вы можете давать инструкции на английском языке, и вам нужно только сказать, что вы хотите, вам не нужно заботиться о том, как это запрашивать. Это позволяет базовому механизму запросов оптимизировать SQL-запрос, не изменяя его. Python также великолепен, у него есть множество высококачественных библиотек, и им легко пользоваться.
Оркестровка работы — это акт выполнения и автоматизации рутинных задач. В прошлом это обычно делалось с помощью заданий CRON. В последние годы все больше и больше компаний стали использоватьApache AirflowиЛуиджи из Spotifyи т. д., чтобы создать более надежную систему. Эти инструменты могут отслеживать задания, регистрировать результаты и повторно запускать задания в случае сбоев. Если вам интересно, я написал пост в блоге с предысторией Airflow под названием«Построение конвейеров данных с помощью Airflow».
Блокноты как инструменты исследования и визуализации данных также стали очень популярными в мире данных за последние несколько лет. рисунокJupyter NotebookиApache ZeppelinТакие инструменты предназначены для удовлетворения этой потребности. Блокноты не только показывают вам результаты анализа, но также код и запросы, которые привели к этим результатам. Это хорошо для выявления упущений и помогает аналитикам воспроизвести работу друг друга.
Airflow и Jupyter Notebook хорошо работают вместе, вы можете использовать Airflow для автоматического ввода новых данных в базу данных, которые специалисты по данным затем могут анализировать с помощью Jupyter Notebook.
В этой записи блога я установлю Hadoop с одним узлом, запущу Jupyter Notebook и покажу, как создать задание Airflow, которое берет источник данных о погоде, сохраняет их в HDFS, преобразует в формат ORC и экспортирует. в электронную таблицу в формате Microsoft Excel.
Машина, которую я использую, имеет процессор Intel Core i5-4670K с тактовой частотой 3,40 ГГц, 12 ГБ ОЗУ и твердотельный накопитель на 200 ГБ. Я буду использовать новую установку Ubuntu 16.04.2 LTS и согласно моему сообщению в блогеHadoop 3: Руководство по установке одного узлаИнструкции в разделе «Сборка и установка одного узла Hadoop».
Установить зависимости
Далее зависимость от монтирования Ubuntu. git с GitHub для получения наборов данных о погоде, остальные три пакета — это сам Python, пакет Python и установщик пакета Python, изолирующий от окружающей среды.
$ sudo apt install \
git \
python \
python-pip \
virtualenv
Airflow будет полагаться на RabbitMQ, чтобы отслеживать свои задания. Затем установите Erlang, язык, на котором написан RabbitMQ.
$ echo "deb http://binaries.erlang-solutions.com/debian xenial contrib" | \
sudo tee /etc/apt/sources.list.d/erlang.list
$ wget -O - http://binaries.erlang-solutions.com/debian/erlang_solutions.asc | \
sudo apt-key add -
$ sudo apt update
$ sudo apt install esl-erlang
Установите RabbitMQ ниже.
$ echo "deb https://dl.bintray.com/rabbitmq/debian xenial main" | \
sudo tee /etc/apt/sources.list.d/bintray.rabbitmq.list
$ wget -O- https://www.rabbitmq.com/rabbitmq-release-signing-key.asc | \
sudo apt-key add -
$ sudo apt update
$ sudo apt install rabbitmq-server
Далее будут установлены зависимости и приложения на Python, используемые в этом сообщении блога.
$ virtualenv .notebooks
$ source .notebooks/bin/activate
$ pip install \
apache-airflow \
celery \
cryptography \
jupyter \
jupyterthemes \
pyhive \
requests \
xlsxwriter
Настройка ноутбука Jupyter
Я создам папку для Jupyter для хранения его конфигурации, а затем установлю пароль для сервера. Если вы не установите пароль, вы получите длинный URL-адрес с ключом для доступа к веб-интерфейсу Jupyter. Ключ обновляется каждый раз при запуске Jupyter Notebook.
$ mkdir -p ~/.jupyter/
$ jupyter notebook password
Jupyter Notebook поддерживает темы пользовательского интерфейса. Следующая команда устанавливает тему наChesterish.
$ jt -t chesterish
Следующая команда выводит список установленных тем. Встроенные темы доступны на GitHub.Скриншоты.
$ jt -l
Чтобы вернуться к теме по умолчанию, выполните следующую команду.
$ jt -r
Запрос Spark с Jupyter Notebook
Сначала убедитесь, что вы используете Metastore Hive, службу Spark Master & Slaves и сервер Presto. Ниже приведены команды для запуска этих служб.
$ hive --service metastore &
$ sudo /opt/presto/bin/launcher start
$ sudo /opt/spark/sbin/start-master.sh
$ sudo /opt/spark/sbin/start-slaves.sh
Следующее запустит блокнот Jupyter, чтобы вы могли взаимодействовать с PySpark, программным интерфейсом Spark на основе Python.
$ PYSPARK_DRIVER_PYTHON=ipython \
PYSPARK_DRIVER_PYTHON_OPTS="notebook
--no-browser
--ip=0.0.0.0
--NotebookApp.iopub_data_rate_limit=100000000" \
pyspark \
--master spark://ubuntu:7077
Обратите внимание, что указанный выше URL-адрес мастера имеет имя хоста ubuntu. Это имя хоста — это имя хоста, к которому привязан главный сервер Spark. Если вы не можете подключиться к Spark, проверьте в журналах главного сервера Spark имя хоста, к которому он выбрал для привязки, поскольку он не будет принимать подключения, адресованные другим именам хостов. Это может сбивать с толку, так как вы обычно ожидаете, что имя хоста, такое как localhost, будет работать в любом случае.
После запуска службы Jupyter Notebook используйте следующую команду, чтобы открыть веб-интерфейс.
$ open http://localhost:8888/
Вам будет предложено ввести пароль, который вы установили для Jupyter Notebook. После ввода в правом верхнем углу вы можете создать новый блокнот из раскрывающегося списка. Нас интересуют два типа блокнотов: Python и Terminal. Терминальные записные книжки предоставляют вам доступ к оболочке с использованием учетной записи UNIX, из которой вы запустили Jupyter Notebook. И я буду использовать блокнот Python.
После запуска блокнота Python вставьте следующий код в ячейку, и он будет запрашивать данные через Spark. Настройте запрос, чтобы использовать набор данных, созданный при установке.
cab_types = sqlContext.sql("""
SELECT cab_type, COUNT(*)
FROM trips_orc
GROUP BY cab_type
""")
cab_types.take(2)
Это результат вышеуказанного запроса. Была возвращена только одна запись, включая два поля.
[Row(cab_type=u'yellow', count(1)=20000000)]
Запрос Presto с помощью Jupyter Notebook
В записных книжках, которые ранее использовались для запроса Spark, также можно запрашивать Presto. Некоторые запросы Presto могут превосходить Spark по производительности, и хорошая новость заключается в том, что их можно переключать в одном блокноте. В приведенном ниже примере я использую DropboxPyHiveбиблиотека для запроса Presto.
from pyhive import presto
cursor = presto.connect('0.0.0.0').cursor()
cursor.execute('SELECT * FROM trips_orc LIMIT 10')
cursor.fetchall()
Вот частичный вывод вышеуказанного запроса.
[(451221840,
u'CMT',
u'2011-08-23 21:03:34.000',
u'2011-08-23 21:21:49.000',
u'N',
1,
-74.004655,
40.742162,
-73.973489,
40.792922,
...
Если вы хотите создавать графики данных в Jupyter Notebook, вы можете взглянуть наВизуализация данных с помощью SQLite в Jupyter NotebooksЭтот пост в блоге, поскольку он содержит несколько примеров построения с использованием SQL, может использоваться со Spark и Presto.
Начать воздушный поток
Далее будет создана папка ~/airflow, настроена база данных SQLite 3 для хранения состояния Airflow и наборов конфигурации, установленных в веб-интерфейсе, обновлена схема конфигурации и создана папка для кода задания Python, который Airflow будет запускать.
$ cd ~
$ airflow initdb
$ airflow upgradedb
$ mkdir -p ~/airflow/dags
По умолчанию все веб-интерфейсы Presto, Spark и Airflow используют TCP-порт 8080. Если вы сначала запустите Spark, Presto не запустится. Но если вы запустите Spark после Presto, то Presto запустится на 8080, а главный сервер Spark будет использовать 8081, и если он все еще занят, он будет продолжать пробовать более высокие порты, пока не найдет свободный порт. После этого Spark выберет более высокий номер порта для веб-интерфейса Spark Worker. Это перекрытие обычно не является проблемой, потому что в рабочей среде эти службы обычно существуют на разных машинах.
Поскольку в этой установке используются TCP-порты 8080–8082, я запущу веб-интерфейс Airflow на порту 8083.
$ airflow webserver --port=8083 &
Я часто использую одну из следующих команд, чтобы узнать, какие сетевые порты используются.
$ sudo lsof -OnP | grep LISTEN
$ netstat -tuplen
$ ss -lntu
Агент Celery Airflow и хранилище результатов заданий по умолчанию используют MySQL. Вместо этого здесь используется RabbitMQ.
$ vi ~/airflow/airflow.cfg
Найдите и отредактируйте следующие настройки.
broker_url = amqp://airflow:airflow@localhost:5672/airflow
celery_result_backend = amqp://airflow:airflow@localhost:5672/airflow
В приведенном выше примере в качестве имени пользователя и пароля для подключения к RabbitMQ используется airflow. Пароль учетной записи можно настроить по желанию.
Далее будет настроен указанный выше пароль учетной записи для RabbitMQ, чтобы он мог получить доступ к виртуальному хосту Airflow.
$ sudo rabbitmqctl add_vhost airflow
$ sudo rabbitmqctl add_user airflow airflow
$ sudo rabbitmqctl set_user_tags airflow administrator
$ sudo rabbitmqctl set_permissions -p airflow airflow ".*" ".*" ".*"
Подключить Airflow к Presto
Следующее откроет веб-интерфейс Airflow.
$ open http://localhost:8083/
Открыв веб-интерфейс Airflow, щелкните меню навигации «Администратор» вверху и выберите «Подключения». Вы увидите длинный список подключений к базе данных по умолчанию. Нажмите, чтобы изменить соединение Presto. Следующие изменения необходимы для подключения Airflow к Presto.
- Измените схему с улья на схему по умолчанию.
- Измените порт с 3400 на 8080.
Сохраните эти изменения, затем щелкните меню навигации «Профилирование данных» вверху и выберите «Специальный запрос». Выберите «presto_default» в раскрывающемся списке над окном запроса, и вы сможете выполнять код SQL через Presto. Ниже приведен пример запроса к набору данных, который я импортировал при установке.
SELECT count(*)
FROM trips_orc;
Скачать погость на DataSet
можетAirflow DAGРассматривается как плановая работа. В приведенном ниже примере я получу его на GitHub.FiveThirtyEight база данныхПредоставляет данные о погоде, импортирует их в HDFS, конвертирует из CSV в ORC и экспортирует из Presto в формат Microsoft Excel.
Следующее клонирует хранилище данных FiveThirtyEight в локальную папку с именем data.
$ git clone \
https://github.com/fivethirtyeight/data.git \
~/data
Затем я запущу Hive и создам две таблицы. Один для наборов данных в формате CSV, а другой для наборов данных в формате ORC, удобном для Presto и Spark.
$ hive
CREATE EXTERNAL TABLE weather_csv (
date_ DATE,
actual_mean_temp SMALLINT,
actual_min_temp SMALLINT,
actual_max_temp SMALLINT,
average_min_temp SMALLINT,
average_max_temp SMALLINT,
record_min_temp SMALLINT,
record_max_temp SMALLINT,
record_min_temp_year INT,
record_max_temp_year INT,
actual_precipitation DECIMAL(18,14),
average_precipitation DECIMAL(18,14),
record_precipitation DECIMAL(18,14)
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
LOCATION '/weather_csv/';
CREATE EXTERNAL TABLE weather_orc (
date_ DATE,
actual_mean_temp SMALLINT,
actual_min_temp SMALLINT,
actual_max_temp SMALLINT,
average_min_temp SMALLINT,
average_max_temp SMALLINT,
record_min_temp SMALLINT,
record_max_temp SMALLINT,
record_min_temp_year INT,
record_max_temp_year INT,
actual_precipitation DOUBLE,
average_precipitation DOUBLE,
record_precipitation DOUBLE
) STORED AS orc
LOCATION '/weather_orc/';
Создание воздушного потока DAG
Следующий код Python представляет собой задание Airflow (также известное какDAG). Каждые 30 минут он будет делать следующее.
- Удалите все существующие данные в папке /weather_csv/ на HDFS.
- Скопируйте файлы CSV из папки ~/data в папку /weather_csv/ на HDFS.
- Преобразование данных CSV на HDFS в формат ORC с помощью Hive.
- Экспорт данных в формате ORC в формат Microsoft Excel 2013 с помощью Presto.
В приведенном ниже коде Python есть точка для CSV, полный путь — /home/mark/data/us-weather-history/*.csv, замените «mark» на ваше собственное имя пользователя UNIX.
$ vi ~/airflow/dags/weather.py
from datetime import timedelta
import airflow
from airflow.hooks.presto_hook import PrestoHook
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
import numpy as np
import pandas as pd
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': airflow.utils.dates.days_ago(0),
'email': ['airflow@example.com'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 3,
'retry_delay': timedelta(minutes=15),
}
dag = airflow.DAG('weather',
default_args=default_args,
description='将天气数据复制到 HDFS 并导出为 Excel',
schedule_interval=timedelta(minutes=30))
cmd = "hdfs dfs -rm /weather_csv/*.csv || true"
remove_csvs_task = BashOperator(task_id='remove_csvs',
bash_command=cmd,
dag=dag)
cmd = """hdfs dfs -copyFromLocal \
/home/mark/data/us-weather-history/*.csv \
/weather_csv/"""
csv_to_hdfs_task = BashOperator(task_id='csv_to_hdfs',
bash_command=cmd,
dag=dag)
cmd = """echo \"INSERT INTO weather_orc
SELECT * FROM weather_csv;\" | \
hive"""
csv_to_orc_task = BashOperator(task_id='csv_to_orc',
bash_command=cmd,
dag=dag)
def presto_to_excel(**context):
column_names = [
"date",
"actual_mean_temp",
"actual_min_temp",
"actual_max_temp",
"average_min_temp",
"average_max_temp",
"record_min_temp",
"record_max_temp",
"record_min_temp_year",
"record_max_temp_year",
"actual_precipitation",
"average_precipitation",
"record_precipitation"
]
sql = """SELECT *
FROM weather_orc
LIMIT 20"""
ph = PrestoHook(catalog='hive',
schema='default',
port=8080)
data = ph.get_records(sql)
df = pd.DataFrame(np.array(data).reshape(20, 13),
columns=column_names)
writer = pd.ExcelWriter('weather.xlsx',
engine='xlsxwriter')
df.to_excel(writer, sheet_name='Sheet1')
writer.save()
return True
presto_to_excel_task = PythonOperator(task_id='presto_to_excel',
provide_context=True,
python_callable=presto_to_excel,
dag=dag)
remove_csvs_task >> csv_to_hdfs_task >> csv_to_orc_task >> presto_to_excel_task
if __name__ == "__main__":
dag.cli()
Используйте этот код, чтобы открыть веб-интерфейс Airflow и переключить переключатель рядом с DAG «погода» в нижней части домашней страницы в положение «включено».
Планировщик создаст список заданий для выполнения работниками. Следующее запустит службу планировщика Airflow и работника, который выполнит все запланированные задания.
$ airflow scheduler &
$ airflow worker &
Спасибо, что нашли время прочитать эту статью. Я предоставляю консалтинговые, архитектурные и практические услуги по разработке клиентам в Северной Америке и Европе. Если вы заинтересованы в обсуждении того, как мои продукты могут помочь вашему бизнесу, свяжитесь с намиLinkedInСвяжитесь со мной.
Если вы обнаружите ошибки в переводе или в других областях, требующих доработки, добро пожаловать наПрограмма перевода самородковВы также можете получить соответствующие бонусные баллы за доработку перевода и PR. начало статьиПостоянная ссылка на эту статьюЭто ссылка MarkDown этой статьи на GitHub.
Программа перевода самородковэто сообщество, которое переводит высококачественные технические статьи из Интернета сНаггетсДелитесь статьями на английском языке на . Охват контентаAndroid,iOS,внешний интерфейс,задняя часть,блокчейн,продукт,дизайн,искусственный интеллектЕсли вы хотите видеть более качественные переводы, пожалуйста, продолжайте обращать вниманиеПрограмма перевода самородков,официальный Вейбо,Знай колонку.