1. Введение
1.1 Поддержка нескольких источников данных
Spark поддерживает следующие шесть основных источников данных, а сообщество Spark также предоставляет сотни способов чтения источников данных, которые подходят для большинства сценариев использования.
- CSV
- JSON
- Parquet
- ORC
- JDBC/ODBC connections
- Plain-text files
Примечание. Все следующие тестовые файлы можно загрузить из этого репозитория.resourcesкаталог для загрузки
1.2 Формат чтения данных
Все API чтения следуют следующему формату вызова:
// 格式
DataFrameReader.format(...).option("key", "value").schema(...).load()
// 示例
spark.read.format("csv")
.option("mode", "FAILFAST") // 读取模式
.option("inferSchema", "true") // 是否自动推断 schema
.option("path", "path/to/file(s)") // 文件路径
.schema(someSchema) // 使用预定义的 schema
.load()
Режим чтения имеет следующие три опции:
режим чтения | описывать |
---|---|
permissive |
При обнаружении поврежденной записи установите для всех ее полей значение null и поместите все поврежденные записи в строковый столбец с именем _повреждение t_record. |
dropMalformed |
удалить неверные строки |
failFast |
Немедленный сбой на искаженных данных |
1.3 Формат записи данных
// 格式
DataFrameWriter.format(...).option(...).partitionBy(...).bucketBy(...).sortBy(...).save()
//示例
dataframe.write.format("csv")
.option("mode", "OVERWRITE") //写模式
.option("dateFormat", "yyyy-MM-dd") //日期格式
.option("path", "path/to/file(s)")
.save()
Режим записи данных имеет следующие четыре опции:
Scala/Java | описывать |
---|---|
SaveMode.ErrorIfExists |
Выдает исключение, если файл уже существует по указанному пути, что является режимом записи данных по умолчанию. |
SaveMode.Append |
Данные записываются в режиме добавления |
SaveMode.Overwrite |
Данные записываются в режиме перезаписи |
SaveMode.Ignore |
Ничего не делать, если файл уже существует по указанному пути |
2. CSV
CSV — это распространенный формат текстового файла, в котором каждая строка представляет собой запись, а каждое поле в записи разделяется запятыми.
2.1 Чтение CSV-файла
Пример автоматического определения типа read read:
spark.read.format("csv")
.option("header", "false") // 文件中的第一行是否为列的名称
.option("mode", "FAILFAST") // 是否快速失败
.option("inferSchema", "true") // 是否自动推断 schema
.load("/usr/file/csv/dept.csv")
.show()
Используйте предопределенные типы:
import org.apache.spark.sql.types.{StructField, StructType, StringType,LongType}
//预定义数据格式
val myManualSchema = new StructType(Array(
StructField("deptno", LongType, nullable = false),
StructField("dname", StringType,nullable = true),
StructField("loc", StringType,nullable = true)
))
spark.read.format("csv")
.option("mode", "FAILFAST")
.schema(myManualSchema)
.load("/usr/file/csv/dept.csv")
.show()
2.2 Запись в файл CSV
df.write.format("csv").mode("overwrite").save("/tmp/csv/dept2")
Также можно указать конкретные разделители:
df.write.format("csv").mode("overwrite").option("sep", "\t").save("/tmp/csv/dept2")
2.3 Дополнительная конфигурация
Чтобы сократить объем основной статьи, все элементы конфигурации чтения и записи показаны в Разделе 9.1 в конце статьи.
3. JSON
3.1 Файл JSON для чтения
spark.read.format("json").option("mode", "FAILFAST").load("/usr/file/json/dept.json").show(5)
Следует отметить, что: запись данных, занимающая несколько строк, по умолчанию не поддерживается (как указано ниже), что можно настроить с помощьюmultiLine
дляtrue
чтобы внести изменения, по умолчаниюfalse
.
// 默认支持单行
{"DEPTNO": 10,"DNAME": "ACCOUNTING","LOC": "NEW YORK"}
//默认不支持多行
{
"DEPTNO": 10,
"DNAME": "ACCOUNTING",
"LOC": "NEW YORK"
}
3.2 Запись JSON-файла
df.write.format("json").mode("overwrite").save("/tmp/spark/json/dept")
3.3 Дополнительная конфигурация
Чтобы сократить объем основной статьи, все элементы конфигурации чтения и записи показаны в Разделе 9.2 в конце статьи.
4. Паркет
Parquet – это хранилище данных с открытым исходным кодом, ориентированное на столбцы. Оно обеспечивает множество оптимизаций хранения, позволяющих считывать отдельные столбцы, а не весь файл, что не только экономит место на диске, но и повышает эффективность чтения. Это формат файла по умолчанию в Spark. .
4.1 Чтение файлов Parquet
spark.read.format("parquet").load("/usr/file/parquet/dept.parquet").show(5)
2.2 Запись файлов Parquet
df.write.format("parquet").mode("overwrite").save("/tmp/spark/parquet/dept")
2.3 Дополнительная конфигурация
Файлы паркета имеют свои собственные правила хранения, поэтому необязательных элементов конфигурации немного, наиболее часто используемые из них следующие:
операции чтения и записи | элемент конфигурации | необязательное значение | По умолчанию | описывать |
---|---|---|---|---|
Write | compression or codec | None, uncompressed, bzip2, deflate, gzip, lz4, or snappy |
None | Формат сжатого файла |
Read | mergeSchema | true, false | Зависит от элемента конфигурацииspark.sql.parquet.mergeSchema
|
Если установлено значение true, источник данных Parquet для всех файлов данных, собранных схемой, объединяется вместе или выберите из схемы сводного файла, если нет доступного сводного файла, выберите файл из схемы случайных данных. |
Дополнительные дополнительные конфигурации см. в официальной документации:spark.apache.org/docs/latest…
5. ОРЦ
ORC — это столбцовый формат файлов с самоописанием и поддержкой типов, оптимизированный для чтения и записи больших данных, и широко используемый формат файлов для больших данных.
5.1 Чтение файла ORC
spark.read.format("orc").load("/usr/file/orc/dept.orc").show(5)
4.2 Запись файла ORC
csvFile.write.format("orc").mode("overwrite").save("/tmp/spark/orc/dept")
Шестой, базы данных SQL
Spark также поддерживает чтение и запись данных в традиционных реляционных базах данных. Однако программа Spark не предоставляет драйвер базы данных по умолчанию, поэтому вам необходимо загрузить соответствующий драйвер базы данных в каталог установки перед его использованием.jars
в каталоге. В следующем примере используется база данных Mysql, прежде чем использовать соответствующийmysql-connector-java-x.x.x.jar
загрузить наjars
Под содержанием.
6.1 Чтение данных
Пример чтения полных данных таблицы выглядит следующим образом, здесьhelp_keyword
это словарная таблица, встроенная в mysql, толькоhelp_keyword_id
а такжеname
два поля.
spark.read
.format("jdbc")
.option("driver", "com.mysql.jdbc.Driver") //驱动
.option("url", "jdbc:mysql://127.0.0.1:3306/mysql") //数据库地址
.option("dbtable", "help_keyword") //表名
.option("user", "root").option("password","root").load().show(10)
Чтение данных из результатов запроса:
val pushDownQuery = """(SELECT * FROM help_keyword WHERE help_keyword_id <20) AS help_keywords"""
spark.read.format("jdbc")
.option("url", "jdbc:mysql://127.0.0.1:3306/mysql")
.option("driver", "com.mysql.jdbc.Driver")
.option("user", "root").option("password", "root")
.option("dbtable", pushDownQuery)
.load().show()
//输出
+---------------+-----------+
|help_keyword_id| name|
+---------------+-----------+
| 0| <>|
| 1| ACTION|
| 2| ADD|
| 3|AES_DECRYPT|
| 4|AES_ENCRYPT|
| 5| AFTER|
| 6| AGAINST|
| 7| AGGREGATE|
| 8| ALGORITHM|
| 9| ALL|
| 10| ALTER|
| 11| ANALYSE|
| 12| ANALYZE|
| 13| AND|
| 14| ARCHIVE|
| 15| AREA|
| 16| AS|
| 17| ASBINARY|
| 18| ASC|
| 19| ASTEXT|
+---------------+-----------+
Вы также можете использовать следующий метод записи для фильтрации данных:
val props = new java.util.Properties
props.setProperty("driver", "com.mysql.jdbc.Driver")
props.setProperty("user", "root")
props.setProperty("password", "root")
val predicates = Array("help_keyword_id < 10 OR name = 'WHEN'") //指定数据过滤条件
spark.read.jdbc("jdbc:mysql://127.0.0.1:3306/mysql", "help_keyword", predicates, props).show()
//输出:
+---------------+-----------+
|help_keyword_id| name|
+---------------+-----------+
| 0| <>|
| 1| ACTION|
| 2| ADD|
| 3|AES_DECRYPT|
| 4|AES_ENCRYPT|
| 5| AFTER|
| 6| AGAINST|
| 7| AGGREGATE|
| 8| ALGORITHM|
| 9| ALL|
| 604| WHEN|
+---------------+-----------+
можно использоватьnumPartitions
Укажите степень параллелизма для чтения данных:
option("numPartitions", 10)
Здесь, помимо указания разделов, вы также можете установить верхнюю и нижнюю границы, любое значение меньше нижней границы будет присвоено первому разделу, а любое значение больше верхней границы будет присвоено последнему разделу.
val colName = "help_keyword_id" //用于判断上下界的列
val lowerBound = 300L //下界
val upperBound = 500L //上界
val numPartitions = 10 //分区综述
val jdbcDf = spark.read.jdbc("jdbc:mysql://127.0.0.1:3306/mysql","help_keyword",
colName,lowerBound,upperBound,numPartitions,props)
Чтобы проверить содержимое раздела, вы можете использоватьmapPartitionsWithIndex
Код этого оператора выглядит следующим образом:
jdbcDf.rdd.mapPartitionsWithIndex((index, iterator) => {
val buffer = new ListBuffer[String]
while (iterator.hasNext) {
buffer.append(index + "分区:" + iterator.next())
}
buffer.toIterator
}).foreach(println)
Результат выполнения следующий:help_keyword
В этой таблице всего около 600 фрагментов данных.Изначально данные должны быть равномерно распределены по 10 разделам, но в разделе 0 319 фрагментов данных.Это связано с тем, что установлен нижний предел, и все данные меньше 300 будут ограничивается первым разделом. , который является разделом 0. Точно так же все данные больше 500 размещаются в разделе 9, который является последним разделом.
6.2 Запись данных
val df = spark.read.format("json").load("/usr/file/json/emp.json")
df.write
.format("jdbc")
.option("url", "jdbc:mysql://127.0.0.1:3306/mysql")
.option("user", "root").option("password", "root")
.option("dbtable", "emp")
.save()
7. Текст
Текстовые файлы не имеют никаких преимуществ с точки зрения производительности чтения и записи и не могут выражать четкую структуру данных, поэтому они используются меньше.Операции чтения и записи следующие:
7.1 Чтение текстовых данных
spark.read.textFile("/usr/file/txt/dept.txt").show()
7.2 Запись текстовых данных
df.write.text("/tmp/spark/txt/dept")
8. Расширенные возможности чтения и записи данных
8.1 Параллельное чтение
Несколько исполнителей не могут читать один и тот же файл одновременно, но они могут одновременно читать разные файлы. Это означает, что когда вы читаете данные из папки, содержащей несколько файлов, каждый из этих файлов становится разделом в DataFrame и считывается параллельно доступными исполнителями.
8.2 Параллельная запись
Количество записей файлов или данных зависит от количества разделов, которое содержит файл dataframe, когда данные написаны. По умолчанию один файл записан на раздел данных.
8.3 Запись раздела
Раздел и суб-бочка это два понятия и суб-улей в таблице разделов и таблице это одна и та же бочка. Данные разделены в соответствии с определенными сохраненными правилами. должен быть в курсеpartitionBy
Указанный раздел и раздел в RDD — это не одно и то же: здесьРазделы отображаются как подкаталоги выходного каталога, данные хранятся в соответствующих подкаталогах соответственно.
val df = spark.read.format("json").load("/usr/file/json/emp.json")
df.write.mode("overwrite").partitionBy("deptno").save("/tmp/spark/partitions")
Результаты вывода следующие: Вы можете видеть, что вывод разделен на три подкаталога в соответствии с номером отдела, и соответствующие выходные файлы находятся в подкаталогах.
8.3 Запись сегмента
Запись сегментов — это хеширование данных в соответствии с указанным количеством столбцов и сегментов. В настоящее время запись сегментов поддерживает только сохранение в виде таблицы. Фактически, это таблица сегментов Hive.
val numberBuckets = 10
val columnToBucketBy = "empno"
df.write.format("parquet").mode("overwrite")
.bucketBy(numberBuckets, columnToBucketBy).saveAsTable("bucketedFiles")
8.5 Управление размером файла
Если количество небольших файлов, созданных при записи, слишком велико, возникнет большой объем служебных данных метаданных. Spark, как и HDFS, плохо справляется с этой проблемой, которую называют «проблемой маленького файла». В то же время файл данных не должен быть слишком большим, иначе возникнет ненужная нагрузка на производительность при выполнении запросов, поэтому размер файла следует контролировать в разумных пределах.
Выше мы представили, что количество сгенерированных файлов может контролироваться количеством разделов, тем самым косвенно контролируя размер файла. Spark 2.2 представил новый способ управления размером файла более автоматизированным способом, которыйmaxRecordsPerFile
параметр, который позволяет вам контролировать размер файла, контролируя количество записей, записываемых в файл.
// Spark 将确保文件最多包含 5000 条记录
df.write.option(“maxRecordsPerFile”, 5000)
9. Дополнительное приложение конфигурации
9.1 Дополнительная конфигурация чтения и записи CSV
операции чтения\записи | элемент конфигурации | необязательное значение | По умолчанию | описывать |
---|---|---|---|---|
Both | seq | любой персонаж |
, (запятая) |
разделитель |
Both | header | true, false | false | Является ли первая строка в файле именем столбца. |
Read | escape | любой персонаж | \ | управляющий символ |
Read | inferSchema | true, false | false | Следует ли автоматически определять типы столбцов |
Read | ignoreLeadingWhiteSpace | true, false | false | пропускать ли пробелы перед значениями |
Both | ignoreTrailingWhiteSpace | true, false | false | пропускать ли пробелы после значения |
Both | nullValue | любой персонаж | "" | какой символ в файле объявления представляет нулевое значение |
Both | nanValue | любой персонаж | NaN | Объявите, какое значение представляет NaN или значение по умолчанию |
Both | positiveInf | любой персонаж | Inf | Положительная бесконечность |
Both | negativeInf | любой персонаж | -Inf | отрицательная бесконечность |
Both | compression or codec | None, uncompressed, bzip2, deflate, gzip, lz4, or snappy |
none | Формат сжатия файлов |
Both | dateFormat | Любой преобразование в Java Строка SimpleDateFormat |
yyyy-MM-dd | Формат даты |
Both | timestampFormat | Все, что можно преобразовать в Java Строка SimpleDateFormat |
гггг-ММдд’T’ЧЧ:мм:сс.SSSZZ | Формат временной метки |
Read | maxColumns | любое целое число | 20480 | Максимальное количество столбцов в файле объявления |
Read | maxCharsPerColumn | любое целое число | 1000000 | Объявите максимальное количество символов в столбце. |
Read | escapeQuotes | true, false | true | Следует ли экранировать кавычки в строках. |
Read | maxMalformedLogPerPartition | любое целое число | 10 | Объявите максимальное количество искаженных данных, разрешенных в каждом разделе, сверх которого искаженные данные не будут считаны. |
Write | quoteAll | true, false | false | Следует указать, все ли значения заключены в кавычки, а не только экранировать значение символами кавычек. |
Read | multiLine | true, false | false | Разрешить ли несколько строк для полной записи в разных доменах |
9.2 Дополнительная конфигурация чтения и записи JSON
операции чтения\записи | элемент конфигурации | необязательное значение | По умолчанию |
---|---|---|---|
Both | compression or codec | None, uncompressed, bzip2, deflate, gzip, lz4, or snappy |
none |
Both | dateFormat | Любую Java можно преобразовать в строку SimpleDataFormat | yyyy-MM-dd |
Both | timestampFormat | Любая строка, которую можно преобразовать в SimpleDataFormat Java. | гггг-ММдд’T’ЧЧ:мм:сс.SSSZZ |
Read | primitiveAsString | true, false | false |
Read | allowComments | true, false | false |
Read | allowUnquotedFieldNames | true, false | false |
Read | allowSingleQuotes | true, false | true |
Read | allowNumericLeadingZeros | true, false | false |
Read | allowBackslashEscapingAnyCharacter | true, false | false |
Read | columnNameOfCorruptRecord | true, false | Value of spark.sql.column&NameOf |
Read | multiLine | true, false | false |
9.3 Дополнительная конфигурация чтения и записи базы данных
Имя свойства | имея в виду |
---|---|
url | адрес базы данных |
dbtable | имя таблицы |
driver | управляемый базой данных |
partitionColumn, lowerBound, upperBoun |
Общее количество разделов, верхняя граница, нижняя граница |
numPartitions | Максимальное количество разделов, которые можно использовать для параллелизма чтения и записи таблиц. Если количество разделов, подлежащих записи, превышает этот предел, для сброса количества разделов может быть вызвана функция «coalesce(numpartition)». |
fetchsize | Сколько строк данных извлекать за цикл. Этот параметр применяется только для чтения данных. |
batchsize | Сколько строк данных вставлять за круговой обход, этот параметр применяется только для записи данных. Значение по умолчанию — 1000. |
isolationLevel | Уровень изоляции транзакции: может быть NONE, READ_COMMITTED, READ_UNCOMMITTED, REPEATABLE_READ или SERIALIZABLE, стандартный уровень изоляции транзакции. Значение по умолчанию — READ_UNCOMMITTED. Этот параметр применяется только к чтению данных. |
createTableOptions | Настройте конфигурацию, связанную с созданием таблицы при записи данных |
createTableColumnTypes | Настройте тип столбца созданного столбца при записи данных |
Дополнительные сведения о настройке чтения и записи базы данных см. в официальной документации:spark.apache.org/docs/latest…
использованная литература
- Matei Zaharia, Bill Chambers . Spark: The Definitive Guide[M] . 2018-02
- spark.apache.org/docs/latest…
Другие статьи серии больших данных можно найти в проекте с открытым исходным кодом GitHub.:Руководство для начинающих по большим данным