Создайте среду PySpark
Сначала убедитесь, что установлен python 2.7, настоятельно рекомендуется использоватьVirtualenvУдобно управлять средой python. Затем установите pyspark через pip
pip install pyspark
Файл относительно большой, около 180 Мб, так что наберитесь терпения.
Загрузите spark 2.2.0, распакуйте его в определенный каталог и установите SPARK_HOME.
На самом деле, если вы отправляете программу через spark-submit, вам не нужно дополнительно устанавливать pyspark, основная цель установки через pip здесь — позволить вашей IDE иметь подсказки кода.
Механизм запуска рабочего процесса PySpark
Принцип работы PySpark заключается в запуске одного (или нескольких, с pythonExec и envVars в качестве ключа) процесса демона Python через PythonRDD в Spark, а затем, как только задача будет выполнена, он разветвит новый рабочий процесс python через демона python. обработать. Воркеры Python можно использовать повторно, и они не будут уничтожены сразу после того, как будут израсходованы. Процесс прихода задачи заключается в том, чтобы увидеть, есть ли свободное время у работника, и если есть, вернуться напрямую. Если нет, форкните нового работника.
Как PySpark реализует переменную singleton в воркере
Из предыдущего механизма запуска рабочего процесса PySpark мы видим, что рабочий процесс Python может многократно выполнять задачи. В задачах NLP нам часто приходится загружать так много словарей, что мы хотим, чтобы словари загружались только один раз. На этом этапе требуется дополнительная обработка. Метод заключается в следующем:
class DictLoader(object):
clf = None
def __init__(self, baseDir, archive_auto_extract, zipResources):
if not DictLoader.is_loaded():
DictLoader.load_dic(baseDir)
@staticmethod
def load_dic(baseDir):
globPath = baseDir + "/dic/*.dic"
dicts = glob.glob(globPath)
for dictFile in dicts:
temp = dictFile if os.path.exists(dictFile) else SparkFiles.get(dictFile)
jieba.load_userdict(temp)
jieba.cut("nice to meet you")
DictLoader.clf = "SUCCESS"
@staticmethod
def is_loaded():
return DictLoader.clf is not None
Определите объект cls и используйте аннотацию staicmethod, чтобы вы могли имитировать статические методы, подобные Java. Тогда вы можете делать все, что хотитеloader = DictLoader ()
Как загрузить файлы ресурсов
После обработки НЛП без словаря не обойтись.Раньше мы избегали многократной загрузки словаря воркером.Теперь другая проблема,а именно как программа загружает словарь. Обычно мы хотим иметь возможность ввести словарь в zip-пакет и код в zip-пакет, а затем отправить его с помощью следующей команды:
./bin/spark-submit \
--py-files dist/jobs.zip \
--files dist/dics.zip \
--master "local[*]" python/src/batch.py
Разработанные самостоятельно модули можно запаковать в jobs.zip, соответствующие им спарк-таски отделить в файл batch.py, а затем словарь запаковать в dics.zip.
Так как же прочитать файлы в dics.zip в программе? В автономном и локальном режимах Spark файл dics.zip не будет распаковываться в рабочем каталоге каждого воркера, поэтому требуется дополнительная обработка:
def __init__(self, baseDir, archive_auto_extract, zipResources):
if not DictLoader.is_loaded():
for zr in zipResources:
if not archive_auto_extract:
with zipfile.ZipFile(SparkFiles.getRootDirectory() + '/' + zr, 'r') as f:
f.extractall(".")
DictLoader(baseDir)
archive_auto_extract определяет, будет ли он автоматически распаковываться (автоматически распаковываться в режиме пряжи).Метод оценки:
archive_auto_extract = spark.conf.get("spark.master").lower().startswith("yarn")
zipResources — это имя всех zip-пакетов, которые необходимо распаковать, и соответствующий метод получения:
zipfiles = [f.split("/")[-1] for f in spark.conf.get("spark.files").split(",") if f.endswith(".zip")]
Вы можете объединить каталог, в котором находятся соответствующие zip-файлы, следующим образом:
SparkFiles.getRootDirectory() + '/' + zfilename
Поэтому, если вы не работаете в режиме пряжи, вам нужно разархивировать, а затем загрузить. Рекомендуемый способ получения пути следующий:
temp = dictFile if os.path.exists(dictFile) else SparkFiles.get(dictFile)
Это совместимо с запуском в среде IDE и запуском в локальном/автономном режиме/режиме пряжи.
Все предыдущие файлы jobs.zip — это файлы Python, которые можно читать напрямую без сжатия.
Активно определяйте схему, чтобы избежать искрового автоматического вывода
Я написал этот фрагмент кода раньше:
oldr = df.rdd.map(
lambda row: Row(ids=row['ids'], mainId=row["mainId"].item(), tags=row["tags"]))
Затем мне нужно изменить oldr обратно на rdd, на этот раз я использую:
resultDf = spark.createDataFrame(oldr)
resultDf.mode("overwrite").format(...).save(...
Это приводит к тому, что oldr выполняется дважды: один раз для анализа схемы и один раз для фактического вычисления. Мы можем написать:
from pyspark.sql.types import StructType, IntegerType, ArrayType, StructField, StringType, MapType
fields = [StructField("ids", ArrayType(IntegerType())), StructField("mainId", IntegerType()),
StructField("tags", MapType(StringType(), IntegerType()))]
resultDf = spark.createDataFrame(resultRdd, StructType(fields=fields)
Это показывает, что схема определена для rdd, что позволяет избежать дополнительных спекуляций.
Выбор лямбды и функции
лямбды могут определять анонимные функции, но имеют ограниченную выразительность:
.map(
lambda row: Row(ids=row['ids'], mainId=row["mainId"].item(), tags=row["tags"]))
Мы также можем определить функции:
def create_new_row(row):
return Row(ids=row['ids'], mainId=row["mainId"].item(), tags=row["tags"])
Затем используйте напрямую:
.map(create_new_row).....
Как определить функции udf / как избежать использования функций Python UDF
Сначала определите обычную функцию Python:
# 自定义split函数
def split_sentence(s):
return s.split(" ")
Преобразуйте в функцию udf и используйте.
from pyspark.sql.functions import udf
from pyspark.sql.types import *
ss = udf(split_sentence, ArrayType(StringType()))
documentDF.select(ss("text").alias("text_array")).show()
Единственная проблема в том, что когда вы определяете функцию udf, вам нужно указать тип возвращаемого значения.
Очевидно, что при использовании функции udf в Python эффективность будет снижена, поэтому мы рекомендуем использовать стандартную библиотечную функцию, а именно:
from pyspark.sql import functions as f
documentDF.select(f.split("text", "\\s+").alias("text_array")).show()
pyspark.sql. functions
Все ссылки относятся к реализации spark, поэтому эффективность будет выше.