Серия Spark (10) — внешний источник данных Spark SQL

Spark

1. Введение

1.1 Поддержка нескольких источников данных

Spark поддерживает следующие шесть основных источников данных, а сообщество Spark также предоставляет сотни способов чтения источников данных, которые подходят для большинства сценариев использования.

  • CSV
  • JSON
  • Parquet
  • ORC
  • JDBC/ODBC connections
  • Plain-text files

Примечание. Все следующие тестовые файлы можно загрузить из этого репозитория.resourcesкаталог для загрузки

1.2 Формат чтения данных

Все API чтения следуют следующему формату вызова:

// 格式
DataFrameReader.format(...).option("key", "value").schema(...).load()

// 示例
spark.read.format("csv")
.option("mode", "FAILFAST")          // 读取模式
.option("inferSchema", "true")       // 是否自动推断 schema
.option("path", "path/to/file(s)")   // 文件路径
.schema(someSchema)                  // 使用预定义的 schema      
.load()

Режим чтения имеет следующие три опции:

режим чтения описывать
permissive При обнаружении поврежденной записи установите для всех ее полей значение null и поместите все поврежденные записи в строковый столбец с именем _повреждение t_record.
dropMalformed удалить неверные строки
failFast Немедленный сбой на искаженных данных

1.3 Формат записи данных

// 格式
DataFrameWriter.format(...).option(...).partitionBy(...).bucketBy(...).sortBy(...).save()

//示例
dataframe.write.format("csv")
.option("mode", "OVERWRITE")         //写模式
.option("dateFormat", "yyyy-MM-dd")  //日期格式
.option("path", "path/to/file(s)")
.save()

Режим записи данных имеет следующие четыре опции:

Scala/Java описывать
SaveMode.ErrorIfExists Выдает исключение, если файл уже существует по указанному пути, что является режимом записи данных по умолчанию.
SaveMode.Append Данные записываются в режиме добавления
SaveMode.Overwrite Данные записываются в режиме перезаписи
SaveMode.Ignore Ничего не делать, если файл уже существует по указанному пути

2. CSV

CSV — это распространенный формат текстового файла, в котором каждая строка представляет собой запись, а каждое поле в записи разделяется запятыми.

2.1 Чтение CSV-файла

Пример автоматического определения типа read read:

spark.read.format("csv")
.option("header", "false")        // 文件中的第一行是否为列的名称
.option("mode", "FAILFAST")      // 是否快速失败
.option("inferSchema", "true")   // 是否自动推断 schema
.load("/usr/file/csv/dept.csv")
.show()

Используйте предопределенные типы:

import org.apache.spark.sql.types.{StructField, StructType, StringType,LongType}
//预定义数据格式
val myManualSchema = new StructType(Array(
    StructField("deptno", LongType, nullable = false),
    StructField("dname", StringType,nullable = true),
    StructField("loc", StringType,nullable = true)
))
spark.read.format("csv")
.option("mode", "FAILFAST")
.schema(myManualSchema)
.load("/usr/file/csv/dept.csv")
.show()

2.2 Запись в файл CSV

df.write.format("csv").mode("overwrite").save("/tmp/csv/dept2")

Также можно указать конкретные разделители:

df.write.format("csv").mode("overwrite").option("sep", "\t").save("/tmp/csv/dept2")

2.3 Дополнительная конфигурация

Чтобы сократить объем основной статьи, все элементы конфигурации чтения и записи показаны в Разделе 9.1 в конце статьи.


3. JSON

3.1 Файл JSON для чтения

spark.read.format("json").option("mode", "FAILFAST").load("/usr/file/json/dept.json").show(5)

Следует отметить, что: запись данных, занимающая несколько строк, по умолчанию не поддерживается (как указано ниже), что можно настроить с помощьюmultiLineдляtrueчтобы внести изменения, по умолчаниюfalse.

// 默认支持单行
{"DEPTNO": 10,"DNAME": "ACCOUNTING","LOC": "NEW YORK"}

//默认不支持多行
{
  "DEPTNO": 10,
  "DNAME": "ACCOUNTING",
  "LOC": "NEW YORK"
}

3.2 Запись JSON-файла

df.write.format("json").mode("overwrite").save("/tmp/spark/json/dept")

3.3 Дополнительная конфигурация

Чтобы сократить объем основной статьи, все элементы конфигурации чтения и записи показаны в Разделе 9.2 в конце статьи.


4. Паркет

Parquet – это хранилище данных с открытым исходным кодом, ориентированное на столбцы. Оно обеспечивает множество оптимизаций хранения, позволяющих считывать отдельные столбцы, а не весь файл, что не только экономит место на диске, но и повышает эффективность чтения. Это формат файла по умолчанию в Spark. .

4.1 Чтение файлов Parquet

spark.read.format("parquet").load("/usr/file/parquet/dept.parquet").show(5)

2.2 Запись файлов Parquet

df.write.format("parquet").mode("overwrite").save("/tmp/spark/parquet/dept")

2.3 Дополнительная конфигурация

Файлы паркета имеют свои собственные правила хранения, поэтому необязательных элементов конфигурации немного, наиболее часто используемые из них следующие:

операции чтения и записи элемент конфигурации необязательное значение По умолчанию описывать
Write compression or codec None,
uncompressed,
bzip2,
deflate, gzip,
lz4, or snappy
None Формат сжатого файла
Read mergeSchema true, false Зависит от элемента конфигурацииspark.sql.parquet.mergeSchema Если установлено значение true, источник данных Parquet для всех файлов данных, собранных схемой, объединяется вместе или выберите из схемы сводного файла, если нет доступного сводного файла, выберите файл из схемы случайных данных.

Дополнительные дополнительные конфигурации см. в официальной документации:spark.apache.org/docs/latest…


5. ОРЦ

ORC — это столбцовый формат файлов с самоописанием и поддержкой типов, оптимизированный для чтения и записи больших данных, и широко используемый формат файлов для больших данных.

5.1 Чтение файла ORC

spark.read.format("orc").load("/usr/file/orc/dept.orc").show(5)

4.2 Запись файла ORC

csvFile.write.format("orc").mode("overwrite").save("/tmp/spark/orc/dept")

Шестой, базы данных SQL

Spark также поддерживает чтение и запись данных в традиционных реляционных базах данных. Однако программа Spark не предоставляет драйвер базы данных по умолчанию, поэтому вам необходимо загрузить соответствующий драйвер базы данных в каталог установки перед его использованием.jarsв каталоге. В следующем примере используется база данных Mysql, прежде чем использовать соответствующийmysql-connector-java-x.x.x.jarзагрузить наjarsПод содержанием.

6.1 Чтение данных

Пример чтения полных данных таблицы выглядит следующим образом, здесьhelp_keywordэто словарная таблица, встроенная в mysql, толькоhelp_keyword_idа такжеnameдва поля.

spark.read
.format("jdbc")
.option("driver", "com.mysql.jdbc.Driver")            //驱动
.option("url", "jdbc:mysql://127.0.0.1:3306/mysql")   //数据库地址
.option("dbtable", "help_keyword")                    //表名
.option("user", "root").option("password","root").load().show(10)

Чтение данных из результатов запроса:

val pushDownQuery = """(SELECT * FROM help_keyword WHERE help_keyword_id <20) AS help_keywords"""
spark.read.format("jdbc")
.option("url", "jdbc:mysql://127.0.0.1:3306/mysql")
.option("driver", "com.mysql.jdbc.Driver")
.option("user", "root").option("password", "root")
.option("dbtable", pushDownQuery)
.load().show()

//输出
+---------------+-----------+
|help_keyword_id|       name|
+---------------+-----------+
|              0|         <>|
|              1|     ACTION|
|              2|        ADD|
|              3|AES_DECRYPT|
|              4|AES_ENCRYPT|
|              5|      AFTER|
|              6|    AGAINST|
|              7|  AGGREGATE|
|              8|  ALGORITHM|
|              9|        ALL|
|             10|      ALTER|
|             11|    ANALYSE|
|             12|    ANALYZE|
|             13|        AND|
|             14|    ARCHIVE|
|             15|       AREA|
|             16|         AS|
|             17|   ASBINARY|
|             18|        ASC|
|             19|     ASTEXT|
+---------------+-----------+

Вы также можете использовать следующий метод записи для фильтрации данных:

val props = new java.util.Properties
props.setProperty("driver", "com.mysql.jdbc.Driver")
props.setProperty("user", "root")
props.setProperty("password", "root")
val predicates = Array("help_keyword_id < 10  OR name = 'WHEN'")   //指定数据过滤条件
spark.read.jdbc("jdbc:mysql://127.0.0.1:3306/mysql", "help_keyword", predicates, props).show() 

//输出:
+---------------+-----------+
|help_keyword_id|       name|
+---------------+-----------+
|              0|         <>|
|              1|     ACTION|
|              2|        ADD|
|              3|AES_DECRYPT|
|              4|AES_ENCRYPT|
|              5|      AFTER|
|              6|    AGAINST|
|              7|  AGGREGATE|
|              8|  ALGORITHM|
|              9|        ALL|
|            604|       WHEN|
+---------------+-----------+

можно использоватьnumPartitionsУкажите степень параллелизма для чтения данных:

option("numPartitions", 10)

Здесь, помимо указания разделов, вы также можете установить верхнюю и нижнюю границы, любое значение меньше нижней границы будет присвоено первому разделу, а любое значение больше верхней границы будет присвоено последнему разделу.

val colName = "help_keyword_id"   //用于判断上下界的列
val lowerBound = 300L    //下界
val upperBound = 500L    //上界
val numPartitions = 10   //分区综述
val jdbcDf = spark.read.jdbc("jdbc:mysql://127.0.0.1:3306/mysql","help_keyword",
                             colName,lowerBound,upperBound,numPartitions,props)

Чтобы проверить содержимое раздела, вы можете использоватьmapPartitionsWithIndexКод этого оператора выглядит следующим образом:

jdbcDf.rdd.mapPartitionsWithIndex((index, iterator) => {
    val buffer = new ListBuffer[String]
    while (iterator.hasNext) {
        buffer.append(index + "分区:" + iterator.next())
    }
    buffer.toIterator
}).foreach(println)

Результат выполнения следующий:help_keywordВ этой таблице всего около 600 фрагментов данных.Изначально данные должны быть равномерно распределены по 10 разделам, но в разделе 0 319 фрагментов данных.Это связано с тем, что установлен нижний предел, и все данные меньше 300 будут ограничивается первым разделом. , который является разделом 0. Точно так же все данные больше 500 размещаются в разделе 9, который является последним разделом.

https://github.com/heibaiying

6.2 Запись данных

val df = spark.read.format("json").load("/usr/file/json/emp.json")
df.write
.format("jdbc")
.option("url", "jdbc:mysql://127.0.0.1:3306/mysql")
.option("user", "root").option("password", "root")
.option("dbtable", "emp")
.save()

7. Текст

Текстовые файлы не имеют никаких преимуществ с точки зрения производительности чтения и записи и не могут выражать четкую структуру данных, поэтому они используются меньше.Операции чтения и записи следующие:

7.1 Чтение текстовых данных

spark.read.textFile("/usr/file/txt/dept.txt").show()

7.2 Запись текстовых данных

df.write.text("/tmp/spark/txt/dept")

8. Расширенные возможности чтения и записи данных

8.1 Параллельное чтение

Несколько исполнителей не могут читать один и тот же файл одновременно, но они могут одновременно читать разные файлы. Это означает, что когда вы читаете данные из папки, содержащей несколько файлов, каждый из этих файлов становится разделом в DataFrame и считывается параллельно доступными исполнителями.

8.2 Параллельная запись

Количество записей файлов или данных зависит от количества разделов, которое содержит файл dataframe, когда данные написаны. По умолчанию один файл записан на раздел данных.

8.3 Запись раздела

Раздел и суб-бочка это два понятия и суб-улей в таблице разделов и таблице это одна и та же бочка. Данные разделены в соответствии с определенными сохраненными правилами. должен быть в курсеpartitionByУказанный раздел и раздел в RDD — это не одно и то же: здесьРазделы отображаются как подкаталоги выходного каталога, данные хранятся в соответствующих подкаталогах соответственно.

val df = spark.read.format("json").load("/usr/file/json/emp.json")
df.write.mode("overwrite").partitionBy("deptno").save("/tmp/spark/partitions")

Результаты вывода следующие: Вы можете видеть, что вывод разделен на три подкаталога в соответствии с номером отдела, и соответствующие выходные файлы находятся в подкаталогах.

https://github.com/heibaiying

8.3 Запись сегмента

Запись сегментов — это хеширование данных в соответствии с указанным количеством столбцов и сегментов. В настоящее время запись сегментов поддерживает только сохранение в виде таблицы. Фактически, это таблица сегментов Hive.

val numberBuckets = 10
val columnToBucketBy = "empno"
df.write.format("parquet").mode("overwrite")
.bucketBy(numberBuckets, columnToBucketBy).saveAsTable("bucketedFiles")

8.5 Управление размером файла

Если количество небольших файлов, созданных при записи, слишком велико, возникнет большой объем служебных данных метаданных. Spark, как и HDFS, плохо справляется с этой проблемой, которую называют «проблемой маленького файла». В то же время файл данных не должен быть слишком большим, иначе возникнет ненужная нагрузка на производительность при выполнении запросов, поэтому размер файла следует контролировать в разумных пределах.

Выше мы представили, что количество сгенерированных файлов может контролироваться количеством разделов, тем самым косвенно контролируя размер файла. Spark 2.2 представил новый способ управления размером файла более автоматизированным способом, которыйmaxRecordsPerFileпараметр, который позволяет вам контролировать размер файла, контролируя количество записей, записываемых в файл.

 // Spark 将确保文件最多包含 5000 条记录
df.write.option(“maxRecordsPerFile”, 5000)

9. Дополнительное приложение конфигурации

9.1 Дополнительная конфигурация чтения и записи CSV

операции чтения\записи элемент конфигурации необязательное значение По умолчанию описывать
Both seq любой персонаж ,(запятая) разделитель
Both header true, false false Является ли первая строка в файле именем столбца.
Read escape любой персонаж \ управляющий символ
Read inferSchema true, false false Следует ли автоматически определять типы столбцов
Read ignoreLeadingWhiteSpace true, false false пропускать ли пробелы перед значениями
Both ignoreTrailingWhiteSpace true, false false пропускать ли пробелы после значения
Both nullValue любой персонаж "" какой символ в файле объявления представляет нулевое значение
Both nanValue любой персонаж NaN Объявите, какое значение представляет NaN или значение по умолчанию
Both positiveInf любой персонаж Inf Положительная бесконечность
Both negativeInf любой персонаж -Inf отрицательная бесконечность
Both compression or codec None,
uncompressed,
bzip2, deflate,
gzip, lz4, or
snappy
none Формат сжатия файлов
Both dateFormat Любой преобразование в Java
Строка SimpleDateFormat
yyyy-MM-dd Формат даты
Both timestampFormat Все, что можно преобразовать в Java
Строка SimpleDateFormat
гггг-ММдд’T’ЧЧ:мм:сс.SSSZZ Формат временной метки
Read maxColumns любое целое число 20480 Максимальное количество столбцов в файле объявления
Read maxCharsPerColumn любое целое число 1000000 Объявите максимальное количество символов в столбце.
Read escapeQuotes true, false true Следует ли экранировать кавычки в строках.
Read maxMalformedLogPerPartition любое целое число 10 Объявите максимальное количество искаженных данных, разрешенных в каждом разделе, сверх которого искаженные данные не будут считаны.
Write quoteAll true, false false Следует указать, все ли значения заключены в кавычки, а не только экранировать значение символами кавычек.
Read multiLine true, false false Разрешить ли несколько строк для полной записи в разных доменах

9.2 Дополнительная конфигурация чтения и записи JSON

операции чтения\записи элемент конфигурации необязательное значение По умолчанию
Both compression or codec None,
uncompressed,
bzip2, deflate,
gzip, lz4, or
snappy
none
Both dateFormat Любую Java можно преобразовать в строку SimpleDataFormat yyyy-MM-dd
Both timestampFormat Любая строка, которую можно преобразовать в SimpleDataFormat Java. гггг-ММдд’T’ЧЧ:мм:сс.SSSZZ
Read primitiveAsString true, false false
Read allowComments true, false false
Read allowUnquotedFieldNames true, false false
Read allowSingleQuotes true, false true
Read allowNumericLeadingZeros true, false false
Read allowBackslashEscapingAnyCharacter true, false false
Read columnNameOfCorruptRecord true, false Value of spark.sql.column&NameOf
Read multiLine true, false false

9.3 Дополнительная конфигурация чтения и записи базы данных

Имя свойства имея в виду
url адрес базы данных
dbtable имя таблицы
driver управляемый базой данных
partitionColumn,
lowerBound, upperBoun
Общее количество разделов, верхняя граница, нижняя граница
numPartitions Максимальное количество разделов, которые можно использовать для параллелизма чтения и записи таблиц. Если количество разделов, подлежащих записи, превышает этот предел, для сброса количества разделов может быть вызвана функция «coalesce(numpartition)».
fetchsize Сколько строк данных извлекать за цикл. Этот параметр применяется только для чтения данных.
batchsize Сколько строк данных вставлять за круговой обход, этот параметр применяется только для записи данных. Значение по умолчанию — 1000.
isolationLevel Уровень изоляции транзакции: может быть NONE, READ_COMMITTED, READ_UNCOMMITTED, REPEATABLE_READ или SERIALIZABLE, стандартный уровень изоляции транзакции.
Значение по умолчанию — READ_UNCOMMITTED. Этот параметр применяется только к чтению данных.
createTableOptions Настройте конфигурацию, связанную с созданием таблицы при записи данных
createTableColumnTypes Настройте тип столбца созданного столбца при записи данных

Дополнительные сведения о настройке чтения и записи базы данных см. в официальной документации:spark.apache.org/docs/latest…

использованная литература

  1. Matei Zaharia, Bill Chambers . Spark: The Definitive Guide[M] . 2018-02
  2. spark.apache.org/docs/latest…

Другие статьи серии больших данных можно найти в проекте с открытым исходным кодом GitHub.:Руководство для начинающих по большим данным