Простая практика и принцип pyspark

задняя часть Python pyspider

В этой статье описаны основы использования и важное понимание pyspark, и я надеюсь, что она будет полезна всем.

Установить

Поскольку мы предназначены для учебных целей, нам не нужно загружать сервер spark для его сборки.

Непосредственно устанавливая пакет pyspark для python, он содержит SDK, который взаимодействует со spark и средой моделирования spark на одной машине:

pip3 install pyspark

Если у вас несколько сред Python, вам нужно обратить внимание на настройку переменных среды, чтобы указать серверу spark (вот встроенный сервер) использовать правильную версию Python:

export PYSPARK_PYTHON=/usr/local/bin/python3

Основные понятия

RDD

Представляет собой набор данных, который не загружается локально, когда мы пишем искровые программы, это просто логическое понятие.

Spark может генерировать RDD из локального диска, HDFS и др. Мы считаем, что объект RDD в настоящее время является просто идентификатором, и на самом деле нет необходимости читать файл HDFS.

2 вида операторов

Написание искровых программ будет использовать два типа функций, называемых двумя типами операторов:

  • Преобразование: например, сопоставление, фильтрация и т. д., вычисление кластера не будет запускаться напрямую при вызове.В настоящее время клиент только строит взаимосвязь вашей вычислительной топологии для формирования дерева вызовов итераций с несколькими раундами.
  • Действие: Например, при вызове сохранения, сбора, подсчета, суммирования и т. д. будет запущено вычисление реального кластера, а топология вычисления будет взята из нескольких предыдущих вызовов преобразования.

кеш-кэш

Spark не выполняет расчеты планирования для всей топологии, пока не будет вызвано действие.

Основным принципом Spark RDD является неизменность, и мы знаем, что RDD, как правило, представляет собой крупномасштабный распределенный набор данных, поэтому spark не вычисляет выходной RDD первого раунда преобразования, как мы себе представляли, и сохраняет его, а затем использует как второй раунд.Ввод преобразования будет занимать много ресурсов памяти, чего в принципе невозможно достичь.

Проще говоря, spark — это потоковые вычисления, а RDD-вычисления передаются между разными операторами, а не предыдущий раунд вычисляется перед следующим.

Следовательно, если один из наших RDD необходимо повторно использовать в нескольких вычислительных ветвях, мы можем рассмотреть возможность активного запроса кэширования RDD. После запроса кэша, когда RDD вычисляется в первый раз, записи в RDD фактически сохраняются в памяти/на диске для повторного использования другими вычислительными ветвями.

Поэтому все обращают внимание, что кеш должен вызываться перед действием, чтобы он помог нам где-то закэшировать RDD во время фактического расчета.

Восстановление

Я чувствую, что кеш легко спутать с отработкой отказа искры.

Когда spark повторяет нашу топологию расчета преобразования, если определенный раунд RDD поврежден из-за сбоя в работе машины в середине, то spark пересчитывает поврежденный RDD из последнего вышестоящего RDD, чтобы восстановить работу.

Мы не используем кеш, чтобы ускорить восстановление после сбоя. Восстановление после сбоя — это встроенная функция spark. Единственной причиной использования кеша должно быть повторное использование наборов результатов в нескольких вычислительных ветвях.

КПП

Личное понимание - это более надежный кэш, все-таки для мультиплексирования RDD.

Потому что кэш кэширует данные RDD в смешанном режиме памяти и диска, но их все равно приходится пересчитывать в случае сбоев отдельных машин.

Таким образом, spark может вызывать контрольную точку для сохранения RDD непосредственно в HDFS, полагаясь на HFDS для обеспечения высокой надежности.

Контрольная точка также является преобразованием, которое выполняется немедленно, когда оно не вызывается, но по умолчанию контрольная точка один раз пересчитывает RDD и выводит HDFS.

Spark официально рекомендует, если вы хотите использовать контрольную точку, рекомендуется использовать ее с кешем, чтобы при выполнении действия результат сначала записывался в кеш, а затем чекпойнт напрямую записывал содержимое кеша в HDFS, что позволяет избежать необходимости в контрольной точке для двойного вычисления RDD.

раздел раздел

Если RDD загружает файл HDFS, при вычислении большого файла необходимо учитывать распараллеливание.

Следовательно, RDD нужно разбить, то есть разбить на разделы, чтобы каждый раздел можно было вычислить параллельно.

Клиент позволяет нам указать, что RDD разделен на несколько разделов, например, нижний слой распределенной файловой системы, такой как HDFS, хранится в блоках, которые сами по себе могут поддерживать нарезку данных.

перемешивать

Учащиеся, которые выполняли map-reduce, должны понимать, что перемешивание — это процесс агрегации, сортировки и передачи данных между map и reduce.

То же самое верно и для нескольких операторов преобразования в spark.Если вычисленный результат каждой секции не может продолжаться для завершения следующего раунда вычислений в секции, его необходимо перетасовать на другие вычислительные узлы для дальнейшей агрегации (например, операции сопоставления).

При написании spark вам нужно уделить особое внимание уменьшению количества перетасовок, уменьшить объем передачи в случайном порядке, попытаться выполнить несколько раундов вычислений в текущем разделе и попытаться уменьшить требуемую величину перетасовки.

упражняться

Тестовые данные

У меня есть локальный файл в качестве исходного ввода RDD со следующим содержимым:

cat input.txt
a b c
d a e
f g h c b

Инициализировать кадр

# -*- coding: utf-8 -*-
 
## Spark Application - execute with spark-submit
 
## Imports
from pyspark import SparkConf, SparkContext, StorageLevel
 
## Module Constants
APP_NAME = "My Spark Application"
 
## Closure Functions
 
## Main functionality
 
def main(sc):
    pass
 
if __name__ == "__main__":
    # Configure Spark
    conf = SparkConf().setAppName(APP_NAME)
    conf = conf.setMaster("local")
    sc   = SparkContext(conf=conf)
 
    # Execute Main functionality
    main(sc)

  • SparkConf: настройте клиент, в основном адрес сервера (local представляет собой имитацию режима искрового кластера) и имя приложения.
  • SparkContext: входящая конфигурация, которая является входным объектом для работы с искрой.

Код, который мы напишем дальше, помещается в функцию с именем main (не требуется spark), которая по-прежнему является пустой функцией, то есть вычислительной задачи нет.

широковещательная переменная

# -*- coding: utf-8 -*-
 
## Spark Application - execute with spark-submit
 
## Imports
from pyspark import SparkConf, SparkContext, StorageLevel
 
## Module Constants
APP_NAME = "My Spark Application"
 
## Closure Functions
 
## Main functionality
 
def main(sc):
    # 广播到集群所有节点共享的数据
    broadcast_vals = sc.broadcast({"unit": "次"})
 
if __name__ == "__main__":
    # Configure Spark
    conf = SparkConf().setAppName(APP_NAME)
    conf = conf.setMaster("local")
    sc   = SparkContext(conf=conf)
 
    # Execute Main functionality
    main(sc)

Spark поддерживает распространение некоторой общей информации о классе конфигурации в кластер, что может быть достигнуто путем широковещательной рассылки переменных с помощью широковещательной рассылки.

Его параметром является любой объект python, а возвращаемым значением является идентификатор операции нашего последующего доступа.Как обращаться к широковещательным переменным, мы продемонстрируем позже в коде расчета.

глобальная переменная счетчика

## Closure Functions
 
## Main functionality
 
def main(sc):
    # 广播到集群所有节点共享的数据
    broadcast_vals = sc.broadcast({"unit": "次"})
 
    # 制造一个全局共享统计型变量
    all_words_count = sc.accumulator(0)

Через аккумулятор можно создать переменную с глобальным общим счетчиком, которую можно использовать для распределенного накопительного счета.

Моя цель — подсчитать количество разных слов, конкретное использование мы увидим позже.

Создание RDD

Указываем, что RDD создается с локального диска.Обратите внимание, что эта операция не считывает файл сразу, а только идентификатор.

## Closure Functions
 
## Main functionality
 
def main(sc):
    # 广播到集群所有节点共享的数据
    broadcast_vals = sc.broadcast({"unit": "次"})
 
    # 制造一个全局共享统计型变量
    all_words_count = sc.accumulator(0)
 
    # 从本地文件创建RDD
    raw_rdd = sc.textFile("/Users/liangdong/Documents/github/spark/py-demo/input.txt", 6)

выполнить плоскую карту

Я хочу объявить первый оператор преобразования, разделить каждую строку в файле на слова пробелами и сгенерировать набор RDD (слово, 1), указывающий, что слово появляется один раз.

## Closure Functions
def split(line):
    words = []
    for word in line.split(" "):
        words.append((word, 1))
    return words
 
## Main functionality
 
def main(sc):
    # 广播到集群所有节点共享的数据
    broadcast_vals = sc.broadcast({"unit": "次"})
 
    # 制造一个全局共享统计型变量
    all_words_count = sc.accumulator(0)
 
    # 从本地文件创建RDD
    raw_rdd = sc.textFile("/Users/liangdong/Documents/github/spark/py-demo/input.txt", 6)
 
    # 文本行RDD -> 词频RDD
    words_rdd = raw_rdd.flatMap(split)
    
    # 打印一下words_rdd的拓扑关系
    print(words_rdd.toDebugString().decode('utf-8'))

flatMap требует, чтобы метод обработки возвращал массив, то есть от 1 строки до N слов.

Мы можем отлаживать rdd через toDebugString rdd и наблюдать за его топологией:

(8) PythonRDD[2] at RDD at PythonRDD.scala:49 []
 |  /Users/liangdong/Documents/github/spark/py-demo/input.txt MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0 []
 |  /Users/liangdong/Documents/github/spark/py-demo/input.txt HadoopRDD[0] at textFile at NativeMethodAccessorImpl.java:0 []

Можно видеть, что текущий RDD называется MapPartitionRDD, который происходит от HadoopRDD, который представляет собой RDD, созданный нашим последним циклом textFile.

кэш RDD

## Closure Functions
def split(line):
    words = []
    for word in line.split(" "):
        words.append((word, 1))
    return words
 
## Main functionality
 
def main(sc):
    # 广播到集群所有节点共享的数据
    broadcast_vals = sc.broadcast({"unit": "次"})
 
    # 制造一个全局共享统计型变量
    all_words_count = sc.accumulator(0)
 
    # 从本地文件创建RDD
    raw_rdd = sc.textFile("/Users/liangdong/Documents/github/spark/py-demo/input.txt", 6)
 
    # 文本行RDD -> 词频RDD
    words_rdd = raw_rdd.flatMap(split)
 
    # 打印一下words_rdd的拓扑关系
    print(words_rdd.toDebugString().decode('utf-8'))
 
    # 缓存words_rdd,用于后续的2个分支计算复用
    words_rdd.cache()    # 也可以用persist配置内存+磁盘混合缓存

Вызов cache может кэшировать rdd, т. к. я буду использовать этот набор в двух вычислительных ветках в дальнейшем.Чтобы переиспользовать и избежать возможных повторных вычислений, я явно указываю кеш.

Кэш — это чистый кеш памяти.Нижний слой реализуется методом persist.Мы можем напрямую вызвать persist для указания других режимов кеша, чтобы избежать возникновения нехватки памяти.Я не буду это здесь демонстрировать.

выполнить подсчет

Мы знаем, что count — это оператор действия, и распределенные вычисления будут инициированы немедленно.

## Module Constants
APP_NAME = "My Spark Application"
 
## Closure Functions
def split(line):
    words = []
    for word in line.split(" "):
        words.append((word, 1))
    return words
 
## Main functionality
 
def main(sc):
    # 广播到集群所有节点共享的数据
    broadcast_vals = sc.broadcast({"unit": "次"})
 
    # 制造一个全局共享统计型变量
    all_words_count = sc.accumulator(0)
 
    # 从本地文件创建RDD
    raw_rdd = sc.textFile("/Users/liangdong/Documents/github/spark/py-demo/input.txt", 6)
 
    # 文本行RDD -> 词频RDD
    words_rdd = raw_rdd.flatMap(split)
 
    # 打印一下words_rdd的拓扑关系
    print(words_rdd.toDebugString().decode('utf-8'))
 
    # 缓存words_rdd,用于后续的2个分支计算复用
    words_rdd.cache()    # 也可以用persist配置内存+磁盘混合缓存
 
    # 计算总共有多少单词
    total_words = words_rdd.count()
 
    # 打印总单词数量
    print("总单词数量:", total_words)

Выводим общее количество слов во всем файле.

总单词数量: 11

уменьшить агрегаты

## Closure Functions
def split(line):
    words = []
    for word in line.split(" "):
        words.append((word, 1))
    return words
 
def count(left_val, right_val):
    return left_val + right_val
 
## Main functionality
 
def main(sc):
    # 广播到集群所有节点共享的数据
    broadcast_vals = sc.broadcast({"unit": "次"})
 
    # 制造一个全局共享统计型变量
    all_words_count = sc.accumulator(0)
 
    # 从本地文件创建RDD
    raw_rdd = sc.textFile("/Users/liangdong/Documents/github/spark/py-demo/input.txt", 6)
 
    # 文本行RDD -> 词频RDD
    words_rdd = raw_rdd.flatMap(split)
 
    # 打印一下words_rdd的拓扑关系
    print(words_rdd.toDebugString().decode('utf-8'))
 
    # 缓存words_rdd,用于后续的2个分支计算复用
    words_rdd.cache()    # 也可以用persist配置内存+磁盘混合缓存
 
    # 计算总共有多少单词
    total_words = words_rdd.count()
 
    # 打印总单词数量
    print("总单词数量:", total_words)
 
    # 在每个partition内做聚合
    per_word_count = words_rdd.reduceByKey(count, 4)
 
    print("单词统计:", per_word_count.collect())

Агрегируйте по первому столбцу в words_rdd в качестве ключа и выполните накопление сокращения, чтобы получить количество вхождений каждого слова.

单词统计: [('b', 2), ('c', 2), ('g', 1), ('a', 2), ('e', 1), ('d', 1), ('h', 1), ('f', 1)]

reduceByKey похож на map+reduce в map-reduce, Spark выполнит объединение на стороне сопоставления и слияние на стороне редьюсера, что означает, что перемешивание будет оптимизировано по умолчанию.

Большинство операторов преобразования могут передавать параметр numParitions, то есть СДР, сгенерированный после вычисления, должен использовать несколько разделов, и этому вопросу следует уделить особое внимание.

Кроме того, специально оговорено, что именно благодаря тому, что мы выполняем оператор collect, может производиться вычисление reduceByKey, в противном случае мы все еще находимся на стадии описания топологии.

В то же время мы повторно используем кэшированные слова words_rdd, что особенно важно для вычислений с большими объемами данных.

Доступ к общим данным

## Closure Functions
def split(line):
    words = []
    for word in line.split(" "):
        words.append((word, 1))
    return words
 
def count(left_val, right_val):
    return left_val + right_val
 
def addUnit(item):
    all_words_count.add(1)
 
    return (item[0], "{}{}".format(item[1], broadcast_vals.value["unit"]))
 
## Main functionality
 
def main(sc):
    # 广播到集群所有节点共享的数据
    global broadcast_vals
    broadcast_vals = sc.broadcast({"unit": "次"})
 
    # 制造一个全局共享统计型变量
    global all_words_count
    all_words_count = sc.accumulator(0)
 
    # 从本地文件创建RDD
    raw_rdd = sc.textFile("/Users/liangdong/Documents/github/spark/py-demo/input.txt", 6)
 
    # 文本行RDD -> 词频RDD
    words_rdd = raw_rdd.flatMap(split)
 
    # 打印一下words_rdd的拓扑关系
    print(words_rdd.toDebugString().decode('utf-8'))
 
    # 缓存words_rdd,用于后续的2个分支计算复用
    words_rdd.cache()    # 也可以用persist配置内存+磁盘混合缓存
 
    # 计算总共有多少单词
    total_words = words_rdd.count()
 
    # 打印总单词数量
    print("总单词数量:", total_words)
 
    # 在每个partition内做聚合
    per_word_count = words_rdd.reduceByKey(count, 4)
 
    print("单词统计:", per_word_count.collect())
 
    # 给统计结果增加"计数单位"
    detail_word_count = per_word_count.map(addUnit)
 
    print("详细统计:", detail_word_count.collect())
 
    print("去重单词数量:", all_words_count.value)

Затем мы выполняем операцию отображения на per_word_count.

map заключается в выполнении функции вычисления для каждого элемента в RDD, а возвращаемое значение заменяет исходный элемент.

В функции addUnit мы получаем доступ к единице в широковещательной переменной и добавляем ее к количеству вхождений слова. Кроме того, мы накапливаем количество вхождений разных слов на основе переменной аккумулятора.

单词统计: [('b', 2), ('c', 2), ('g', 1), ('a', 2), ('e', 1), ('d', 1), ('h', 1), ('f', 1)]
详细统计: [('b', '2次'), ('c', '2次'), ('g', '1次'), ('a', '2次'), ('e', '1次'), ('d', '1次'), ('h', '1次'), ('f', '1次')]
去重单词数量: 8

Сортировка и вывод в файл

## Closure Functions
def split(line):
    words = []
    for word in line.split(" "):
        words.append((word, 1))
    return words
 
def count(left_val, right_val):
    return left_val + right_val
 
def addUnit(item):
    all_words_count.add(1)
 
    return (item[0], "{}{}".format(item[1], broadcast_vals.value["unit"]))
 
def sort_key(item):
    return item[1]
 
## Main functionality
 
def main(sc):
    # 广播到集群所有节点共享的数据
    global broadcast_vals
    broadcast_vals = sc.broadcast({"unit": "次"})
 
    # 制造一个全局共享统计型变量
    global all_words_count
    all_words_count = sc.accumulator(0)
 
    # 从本地文件创建RDD
    raw_rdd = sc.textFile("/Users/liangdong/Documents/github/spark/py-demo/input.txt", 6)
 
    # 文本行RDD -> 词频RDD
    words_rdd = raw_rdd.flatMap(split)
 
    # 打印一下words_rdd的拓扑关系
    print(words_rdd.toDebugString().decode('utf-8'))
 
    # 缓存words_rdd,用于后续的2个分支计算复用
    words_rdd.cache()    # 也可以用persist配置内存+磁盘混合缓存
 
    # 计算总共有多少单词
    total_words = words_rdd.count()
 
    # 打印总单词数量
    print("总单词数量:", total_words)
 
    # 在每个partition内做聚合
    per_word_count = words_rdd.reduceByKey(count, 4)
 
    print("单词统计:", per_word_count.collect())
 
    # 给统计结果增加"计数单位"
    detail_word_count = per_word_count.map(addUnit)
 
    print("详细统计:", detail_word_count.collect())
 
    print("去重单词数量:", all_words_count.value)
 
    # 按出现次数排序
    sorted_rdd = per_word_count.sortBy(sort_key, False)
 
    # 写到文件中
    sorted_rdd.saveAsTextFile("/Users/liangdong/Documents/github/spark/py-demo/output")

Теперь мы вызываем sortBy для сортировки per_word_count, первый параметр возвращает поле для сортировки, а второй параметр указывает порядок убывания.

Наконец, мы записываем коллекцию в файл, обычно это путь к файлу в HDFS.

output/
total 16
-rw-r--r--  1 liangdong  staff   0  7  3 13:11 _SUCCESS
-rw-r--r--  1 liangdong  staff   0  7  3 13:11 part-00000
-rw-r--r--  1 liangdong  staff  27  7  3 13:11 part-00001
-rw-r--r--  1 liangdong  staff   0  7  3 13:11 part-00002
-rw-r--r--  1 liangdong  staff  45  7  3 13:11 part-00003
liangdongs-MacBook-Pro:py-demo liangdong$ cat output/part-00001
('b', 2)
('c', 2)
('a', 2)
liangdongs-MacBook-Pro:py-demo liangdong$ cat output/part-00003
('g', 1)
('e', 1)
('d', 1)
('h', 1)
('f', 1)

Суммировать

Вышеупомянутое — это просто метод программирования для пакетных вычислений spark.

На самом деле, spark также поддерживает потоковые вычисления, и я кратко расскажу об этом, когда у меня будет время.

Поскольку программирование в Spark ничем не отличается от разработки автономных программ, но на самом деле это распределенные параллельные вычисления на распределенных больших наборах данных, поэтому в Spark есть множество библиотек Machine Leanring, разработанных для того, чтобы легко помочь вам решить задачи машинного обучения с большими данными.

Кроме того, стоит упомянуть идею программирования, воплощенную в Spark, а именно: перемещение вычислений в хранилище.

Поскольку и java, и python поддерживают сериализацию объектов, код вычислений можно сериализовать и отправлять на узлы хранения для участия в распределенных вычислениях.Это важная причина, по которой большинство платформ с открытым исходным кодом для больших данных поддерживают только языки на основе JVM.

Ссылаться на