Python и большие данные: Airflow, Jupyter Notebook и Hadoop 3, Spark, Presto

Python Программа перевода самородков Hadoop Spark

В последние годы Python стал популярным языком программирования в области науки о данных, машинного обучения и глубокого обучения. Просто соедините его с языком запросов SQL, чтобы выполнить большую часть работы. SQL великолепен, вы можете давать инструкции на английском языке, и вам нужно только сказать, что вы хотите, вам не нужно заботиться о том, как это запрашивать. Это позволяет базовому механизму запросов оптимизировать SQL-запрос, не изменяя его. Python также великолепен, у него есть множество высококачественных библиотек, и им легко пользоваться.

Оркестровка работы — это акт выполнения и автоматизации рутинных задач. В прошлом это обычно делалось с помощью заданий CRON. В последние годы все больше и больше компаний стали использоватьApache AirflowиЛуиджи из Spotifyи т. д., чтобы создать более надежную систему. Эти инструменты могут отслеживать задания, регистрировать результаты и повторно запускать задания в случае сбоев. Если вам интересно, я написал пост в блоге с предысторией Airflow под названием«Построение конвейеров данных с помощью Airflow».

Блокноты как инструменты исследования и визуализации данных также стали очень популярными в мире данных за последние несколько лет. рисунокJupyter NotebookиApache ZeppelinТакие инструменты предназначены для удовлетворения этой потребности. Блокноты не только показывают вам результаты анализа, но также код и запросы, которые привели к этим результатам. Это хорошо для выявления упущений и помогает аналитикам воспроизвести работу друг друга.

Airflow и Jupyter Notebook хорошо работают вместе, вы можете использовать Airflow для автоматического ввода новых данных в базу данных, которые специалисты по данным затем могут анализировать с помощью Jupyter Notebook.

В этой записи блога я установлю Hadoop с одним узлом, запущу Jupyter Notebook и покажу, как создать задание Airflow, которое берет источник данных о погоде, сохраняет их в HDFS, преобразует в формат ORC и экспортирует. в электронную таблицу в формате Microsoft Excel.

Машина, которую я использую, имеет процессор Intel Core i5-4670K с тактовой частотой 3,40 ГГц, 12 ГБ ОЗУ и твердотельный накопитель на 200 ГБ. Я буду использовать новую установку Ubuntu 16.04.2 LTS и согласно моему сообщению в блогеHadoop 3: Руководство по установке одного узлаИнструкции в разделе «Сборка и установка одного узла Hadoop».

Установить зависимости

Далее зависимость от монтирования Ubuntu. git с GitHub для получения наборов данных о погоде, остальные три пакета — это сам Python, пакет Python и установщик пакета Python, изолирующий от окружающей среды.

$ sudo apt install \
    git \
    python \
    python-pip \
    virtualenv

Airflow будет полагаться на RabbitMQ, чтобы отслеживать свои задания. Затем установите Erlang, язык, на котором написан RabbitMQ.

$ echo "deb http://binaries.erlang-solutions.com/debian xenial contrib" | \
    sudo tee /etc/apt/sources.list.d/erlang.list
$ wget -O - http://binaries.erlang-solutions.com/debian/erlang_solutions.asc | \
    sudo apt-key add -
$ sudo apt update
$ sudo apt install esl-erlang

Установите RabbitMQ ниже.

$ echo "deb https://dl.bintray.com/rabbitmq/debian xenial main" | \
    sudo tee /etc/apt/sources.list.d/bintray.rabbitmq.list
$ wget -O- https://www.rabbitmq.com/rabbitmq-release-signing-key.asc | \
    sudo apt-key add -
$ sudo apt update
$ sudo apt install rabbitmq-server

Далее будут установлены зависимости и приложения на Python, используемые в этом сообщении блога.

$ virtualenv .notebooks
$ source .notebooks/bin/activate
$ pip install \
    apache-airflow \
    celery \
    cryptography \
    jupyter \
    jupyterthemes \
    pyhive \
    requests \
    xlsxwriter

Настройка ноутбука Jupyter

Я создам папку для Jupyter для хранения его конфигурации, а затем установлю пароль для сервера. Если вы не установите пароль, вы получите длинный URL-адрес с ключом для доступа к веб-интерфейсу Jupyter. Ключ обновляется каждый раз при запуске Jupyter Notebook.

$ mkdir -p ~/.jupyter/
$ jupyter notebook password

Jupyter Notebook поддерживает темы пользовательского интерфейса. Следующая команда устанавливает тему наChesterish.

$ jt -t chesterish

Следующая команда выводит список установленных тем. Встроенные темы доступны на GitHub.Скриншоты.

$ jt -l

Чтобы вернуться к теме по умолчанию, выполните следующую команду.

$ jt -r

Запрос Spark с Jupyter Notebook

Сначала убедитесь, что вы используете Metastore Hive, службу Spark Master & Slaves и сервер Presto. Ниже приведены команды для запуска этих служб.

$ hive --service metastore &
$ sudo /opt/presto/bin/launcher start
$ sudo /opt/spark/sbin/start-master.sh
$ sudo /opt/spark/sbin/start-slaves.sh

Следующее запустит блокнот Jupyter, чтобы вы могли взаимодействовать с PySpark, программным интерфейсом Spark на основе Python.

$ PYSPARK_DRIVER_PYTHON=ipython \
    PYSPARK_DRIVER_PYTHON_OPTS="notebook
        --no-browser
        --ip=0.0.0.0
        --NotebookApp.iopub_data_rate_limit=100000000" \
    pyspark \
    --master spark://ubuntu:7077

Обратите внимание, что указанный выше URL-адрес мастера имеет имя хоста ubuntu. Это имя хоста — это имя хоста, к которому привязан главный сервер Spark. Если вы не можете подключиться к Spark, проверьте в журналах главного сервера Spark имя хоста, к которому он выбрал для привязки, поскольку он не будет принимать подключения, адресованные другим именам хостов. Это может сбивать с толку, так как вы обычно ожидаете, что имя хоста, такое как localhost, будет работать в любом случае.

После запуска службы Jupyter Notebook используйте следующую команду, чтобы открыть веб-интерфейс.

$ open http://localhost:8888/

Вам будет предложено ввести пароль, который вы установили для Jupyter Notebook. После ввода в правом верхнем углу вы можете создать новый блокнот из раскрывающегося списка. Нас интересуют два типа блокнотов: Python и Terminal. Терминальные записные книжки предоставляют вам доступ к оболочке с использованием учетной записи UNIX, из которой вы запустили Jupyter Notebook. И я буду использовать блокнот Python.

После запуска блокнота Python вставьте следующий код в ячейку, и он будет запрашивать данные через Spark. Настройте запрос, чтобы использовать набор данных, созданный при установке.

cab_types = sqlContext.sql("""
 SELECT cab_type, COUNT(*)
 FROM trips_orc
 GROUP BY cab_type
""")

cab_types.take(2)

Это результат вышеуказанного запроса. Была возвращена только одна запись, включая два поля.

[Row(cab_type=u'yellow', count(1)=20000000)]

Запрос Presto с помощью Jupyter Notebook

В записных книжках, которые ранее использовались для запроса Spark, также можно запрашивать Presto. Некоторые запросы Presto могут превосходить Spark по производительности, и хорошая новость заключается в том, что их можно переключать в одном блокноте. В приведенном ниже примере я использую DropboxPyHiveбиблиотека для запроса Presto.

from pyhive import presto

cursor = presto.connect('0.0.0.0').cursor()
cursor.execute('SELECT * FROM trips_orc LIMIT 10')
cursor.fetchall()

Вот частичный вывод вышеуказанного запроса.

[(451221840,
  u'CMT',
  u'2011-08-23 21:03:34.000',
  u'2011-08-23 21:21:49.000',
  u'N',
  1,
  -74.004655,
  40.742162,
  -73.973489,
  40.792922,
...

Если вы хотите создавать графики данных в Jupyter Notebook, вы можете взглянуть наВизуализация данных с помощью SQLite в Jupyter NotebooksЭтот пост в блоге, поскольку он содержит несколько примеров построения с использованием SQL, может использоваться со Spark и Presto.

Начать воздушный поток

Далее будет создана папка ~/airflow, настроена база данных SQLite 3 для хранения состояния Airflow и наборов конфигурации, установленных в веб-интерфейсе, обновлена ​​схема конфигурации и создана папка для кода задания Python, который Airflow будет запускать.

$ cd ~
$ airflow initdb
$ airflow upgradedb
$ mkdir -p ~/airflow/dags

По умолчанию все веб-интерфейсы Presto, Spark и Airflow используют TCP-порт 8080. Если вы сначала запустите Spark, Presto не запустится. Но если вы запустите Spark после Presto, то Presto запустится на 8080, а главный сервер Spark будет использовать 8081, и если он все еще занят, он будет продолжать пробовать более высокие порты, пока не найдет свободный порт. После этого Spark выберет более высокий номер порта для веб-интерфейса Spark Worker. Это перекрытие обычно не является проблемой, потому что в рабочей среде эти службы обычно существуют на разных машинах.

Поскольку в этой установке используются TCP-порты 8080–8082, я запущу веб-интерфейс Airflow на порту 8083.

$ airflow webserver --port=8083 &

Я часто использую одну из следующих команд, чтобы узнать, какие сетевые порты используются.

$ sudo lsof -OnP | grep LISTEN
$ netstat -tuplen
$ ss -lntu

Агент Celery Airflow и хранилище результатов заданий по умолчанию используют MySQL. Вместо этого здесь используется RabbitMQ.

$ vi ~/airflow/airflow.cfg

Найдите и отредактируйте следующие настройки.

broker_url = amqp://airflow:airflow@localhost:5672/airflow

celery_result_backend = amqp://airflow:airflow@localhost:5672/airflow

В приведенном выше примере в качестве имени пользователя и пароля для подключения к RabbitMQ используется airflow. Пароль учетной записи можно настроить по желанию.

Далее будет настроен указанный выше пароль учетной записи для RabbitMQ, чтобы он мог получить доступ к виртуальному хосту Airflow.

$ sudo rabbitmqctl add_vhost airflow
$ sudo rabbitmqctl add_user airflow airflow
$ sudo rabbitmqctl set_user_tags airflow administrator
$ sudo rabbitmqctl set_permissions -p airflow airflow ".*" ".*" ".*"

Подключить Airflow к Presto

Следующее откроет веб-интерфейс Airflow.

$ open http://localhost:8083/

Открыв веб-интерфейс Airflow, щелкните меню навигации «Администратор» вверху и выберите «Подключения». Вы увидите длинный список подключений к базе данных по умолчанию. Нажмите, чтобы изменить соединение Presto. Следующие изменения необходимы для подключения Airflow к Presto.

  • Измените схему с улья на схему по умолчанию.
  • Измените порт с 3400 на 8080.

Сохраните эти изменения, затем щелкните меню навигации «Профилирование данных» вверху и выберите «Специальный запрос». Выберите «presto_default» в раскрывающемся списке над окном запроса, и вы сможете выполнять код SQL через Presto. Ниже приведен пример запроса к набору данных, который я импортировал при установке.

SELECT count(*)
FROM trips_orc;

Скачать погость на DataSet

можетAirflow DAGРассматривается как плановая работа. В приведенном ниже примере я получу его на GitHub.FiveThirtyEight база данныхПредоставляет данные о погоде, импортирует их в HDFS, конвертирует из CSV в ORC и экспортирует из Presto в формат Microsoft Excel.

Следующее клонирует хранилище данных FiveThirtyEight в локальную папку с именем data.

$ git clone \
    https://github.com/fivethirtyeight/data.git \
    ~/data

Затем я запущу Hive и создам две таблицы. Один для наборов данных в формате CSV, а другой для наборов данных в формате ORC, удобном для Presto и Spark.

$ hive
CREATE EXTERNAL TABLE weather_csv (
    date_                 DATE,
    actual_mean_temp      SMALLINT,
    actual_min_temp       SMALLINT,
    actual_max_temp       SMALLINT,
    average_min_temp      SMALLINT,
    average_max_temp      SMALLINT,
    record_min_temp       SMALLINT,
    record_max_temp       SMALLINT,
    record_min_temp_year  INT,
    record_max_temp_year  INT,
    actual_precipitation  DECIMAL(18,14),
    average_precipitation DECIMAL(18,14),
    record_precipitation  DECIMAL(18,14)
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
  LOCATION '/weather_csv/';

CREATE EXTERNAL TABLE weather_orc (
    date_                 DATE,
    actual_mean_temp      SMALLINT,
    actual_min_temp       SMALLINT,
    actual_max_temp       SMALLINT,
    average_min_temp      SMALLINT,
    average_max_temp      SMALLINT,
    record_min_temp       SMALLINT,
    record_max_temp       SMALLINT,
    record_min_temp_year  INT,
    record_max_temp_year  INT,
    actual_precipitation  DOUBLE,
    average_precipitation DOUBLE,
    record_precipitation  DOUBLE
) STORED AS orc
  LOCATION '/weather_orc/';

Создание воздушного потока DAG

Следующий код Python представляет собой задание Airflow (также известное какDAG). Каждые 30 минут он будет делать следующее.

  • Удалите все существующие данные в папке /weather_csv/ на HDFS.
  • Скопируйте файлы CSV из папки ~/data в папку /weather_csv/ на HDFS.
  • Преобразование данных CSV на HDFS в формат ORC с помощью Hive.
  • Экспорт данных в формате ORC в формат Microsoft Excel 2013 с помощью Presto.

В приведенном ниже коде Python есть точка для CSV, полный путь — /home/mark/data/us-weather-history/*.csv, замените «mark» на ваше собственное имя пользователя UNIX.

$ vi ~/airflow/dags/weather.py
from datetime import timedelta

import airflow
from   airflow.hooks.presto_hook         import PrestoHook
from   airflow.operators.bash_operator   import BashOperator
from   airflow.operators.python_operator import PythonOperator
import numpy  as np
import pandas as pd


default_args = {
    'owner':            'airflow',
    'depends_on_past':  False,
    'start_date':       airflow.utils.dates.days_ago(0),
    'email':            ['airflow@example.com'],
    'email_on_failure': True,
    'email_on_retry':   False,
    'retries':          3,
    'retry_delay':      timedelta(minutes=15),
}

dag = airflow.DAG('weather',
                  default_args=default_args,
                  description='将天气数据复制到 HDFS 并导出为 Excel',
                  schedule_interval=timedelta(minutes=30))

cmd = "hdfs dfs -rm /weather_csv/*.csv || true"
remove_csvs_task = BashOperator(task_id='remove_csvs',
                                bash_command=cmd,
                                dag=dag)

cmd = """hdfs dfs -copyFromLocal \
            /home/mark/data/us-weather-history/*.csv \
            /weather_csv/"""
csv_to_hdfs_task = BashOperator(task_id='csv_to_hdfs',
                                bash_command=cmd,
                                dag=dag)

cmd = """echo \"INSERT INTO weather_orc
                SELECT * FROM weather_csv;\" | \
            hive"""
csv_to_orc_task = BashOperator(task_id='csv_to_orc',
                               bash_command=cmd,
                               dag=dag)


def presto_to_excel(**context):
    column_names = [
        "date",
        "actual_mean_temp",
        "actual_min_temp",
        "actual_max_temp",
        "average_min_temp",
        "average_max_temp",
        "record_min_temp",
        "record_max_temp",
        "record_min_temp_year",
        "record_max_temp_year",
        "actual_precipitation",
        "average_precipitation",
        "record_precipitation"
    ]

    sql = """SELECT *
             FROM weather_orc
             LIMIT 20"""

    ph = PrestoHook(catalog='hive',
                    schema='default',
                    port=8080)
    data = ph.get_records(sql)

    df = pd.DataFrame(np.array(data).reshape(20, 13),
                      columns=column_names)

    writer = pd.ExcelWriter('weather.xlsx',
                            engine='xlsxwriter')
    df.to_excel(writer, sheet_name='Sheet1')
    writer.save()

    return True

presto_to_excel_task = PythonOperator(task_id='presto_to_excel',
                                      provide_context=True,
                                      python_callable=presto_to_excel,
                                      dag=dag)

remove_csvs_task >> csv_to_hdfs_task >> csv_to_orc_task >> presto_to_excel_task

if __name__ == "__main__":
    dag.cli()

Используйте этот код, чтобы открыть веб-интерфейс Airflow и переключить переключатель рядом с DAG «погода» в нижней части домашней страницы в положение «включено».

Планировщик создаст список заданий для выполнения работниками. Следующее запустит службу планировщика Airflow и работника, который выполнит все запланированные задания.

$ airflow scheduler &
$ airflow worker &

Спасибо, что нашли время прочитать эту статью. Я предоставляю консалтинговые, архитектурные и практические услуги по разработке клиентам в Северной Америке и Европе. Если вы заинтересованы в обсуждении того, как мои продукты могут помочь вашему бизнесу, свяжитесь с намиLinkedInСвяжитесь со мной.

Если вы обнаружите ошибки в переводе или в других областях, требующих доработки, добро пожаловать наПрограмма перевода самородковВы также можете получить соответствующие бонусные баллы за доработку перевода и PR. начало статьиПостоянная ссылка на эту статьюЭто ссылка MarkDown этой статьи на GitHub.


Программа перевода самородковэто сообщество, которое переводит высококачественные технические статьи из Интернета сНаггетсДелитесь статьями на английском языке на . Охват контентаAndroid,iOS,внешний интерфейс,задняя часть,блокчейн,продукт,дизайн,искусственный интеллектЕсли вы хотите видеть более качественные переводы, пожалуйста, продолжайте обращать вниманиеПрограмма перевода самородков,официальный Вейбо,Знай колонку.