предисловие
Предыдущая статья была потрясена, так что давайте перейдем к сути этой.
2. Платформа вычислений памяти Spark (выделено 😶)
RDD (устойчивый распределенный набор данных) называетсяУстойчивые распределенные наборы данных, является самой простой абстракцией данных в Spark, она представляет собой неизменяемую разделяемую коллекцию, элементы которой можно вычислять параллельно.
Dataset:就是一个集合,存储很多数据.
Distributed:它内部的元素进行了分布式存储,方便于后期进行分布式计算.
Resilient:表示弹性,rdd的数据是可以保存在内存或者是磁盘中.
Производительность в коде такая, результат, соответствующий каждому методу, представляет собой RDD, например код scala выше, результат следующего RDD будет зависеть от предыдущего RDD.
Я знаю, что пока никто этого не понял, ничего страшного, просто продолжайте читать и вы поймете😏
2.1 Пять характеристик RDD
Далее объяснение RDD в исходниках, я использовал его в пунктах.я думаю(Примите опровержение 👌) Более разумное заявление для объяснения
2.1.1 A list of partitions
Список разделов, которые составляют данные для этого RDD.
Это означает, что rdd имеет много разделов, и каждый раздел содержит часть данных rdd.Задачи в spark выполняются как потоки задач, и один раздел соответствует одному потоку задач.
Пользователь может указать количество разделов RDD при создании RDD, если не указать, будет использоваться значение по умолчанию. (Например: количество разделов RDD, сгенерированных при чтении файлов данных в HDFS, равно количеству блоков)
На самом деле разбиение RDD можно просто понять таким образом.Например, я хочу использовать сейчас wordCount, а размер текста 300M.Согласно нашей процедуре HDFS, каждые 128M - это блок, затем 300M файл состоит из 3 блоков, и тогда наш RDD определит количество разделов RDD в соответствии с количеством блоков, принадлежащих вашему файлу.На данный момент количество разделов RDD равно 3, но если сам файл меньше 128M , тогда RDD по умолчанию будет иметь 2 раздела.
2.1.2 A function for computing each split
Функция расчета каждого раздела рассматривается как СДР.--- Расчет RDD в Spark основан на разделах, и каждый RDD реализует функцию вычисления для достижения этой цели.
2.1.3 A list of dependencies on other RDDs
rdd будет зависеть от нескольких других rdd--- На этой функции основана зависимость между rdd и rdd, механизм отказоустойчивости искровых задач.
2.1.4 Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
Существует только функция разделения для RDD типа ключ-значение., функция разбиения фактически разбрасывает результаты вычислений по разным разбиениям.
В настоящее время в Spark реализовано два типа функций секционирования, один из которых основан на хеше.HashPartitioner, другой основан на диапазонеRangePartitioner.
Только для RDD со значением ключа и перетасовки будет Partitioner, а значение Partitioner для RDD без значения ключа равно None.
Подпрограмма HashPartitioner находится в предыдущемMapReduceВ статье уже упоминалось, на самом деле, многие процедуры разделения больших данных являются этой процедурой.RangePartitioner похож на игру, в которой оговаривается, сколько нужно потерять этот раздел и сколько потерять этот раздел.
2.1.5 Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)
Расположение вычислительной задачи имеет приоритет как место, где хранится каждый раздел.(настраиваемый)
Это включает в себя локальность данных, и расположение блоков данных является оптимальным. Проще говоря, это означает, что там, где есть данные, мы делаем расчет там, где они есть.
Когда задачи искры запланированы, ониприоритет(Обратите внимание, что это только приоритет, а не необходимость) рассмотреть узлы, у которых есть данные, чтобы начать вычислительные задачи, сократить передачу данных по сети и повысить эффективность вычислений.
2.2 Анализ пяти атрибутов RDD на основе wordCount
Требование состоит в том, чтобы иметь файл размером 300M в HDFS, реализовать статистику слов файла через Spark и, наконец, сохранить данные результатов в HDFS.Код выглядит следующим образом.Обратите внимание, что мы используем Scala вместо кода Java.
sc.textFile("/words.txt") // 读取数据文件
.flatMap(_.split(" ")) // 切分每一行,获取所有单词
.map((_,1)) // 每个单词计为1
.reduceByKey(_+_) // 相同单词(key)出现的1累加
.saveAsTextFile("/out") // 保存输出到/out
Поскольку я не развертывал среду локально, я не буду делать скриншоты следующего процесса, но объясню шаги операции и результаты.
2.2.1 RDD1 : sc.textFile("/words.txt")
Запускаем Spark-shell, затем пошагово запускаем код, в этот момент мы видим
На этом этапе мы можем выполнить код scala. Первый шаг — ввести sc.textFile("/words.txt")
Поскольку RDD является абстрактным классом, результат sc.textFile("/words.txt") получен его подклассом MapPartitionsRDD, этот код получит результат раздела RDD, мы также можем передать
sc.textFile("/words.txt").partitions
Глядя на раздел, в это время будет получен массив, и длина этого массива равна 3 (Файл 300M будет иметь 3 блока, и количество разделов RDD определяется блоком.Обратите внимание, что количество разделов RDD не обязательно может быть на разных серверах. Но если блок только 1, количество разделов RDD по умолчанию будет равно 2.)
sc.textFile("/words.txt").partitions.length
Выполните это, чтобы проверить длину массива, которая должна быть 3
В это время все результаты RDD2, RDD3 и RDD1 получены MapPartitionsRDD, разница в том, что RDD, полученный из карты, будет иметь тип ключ-значение.
2.2.2 RDD4 : sc.textFile("/words.txt").flatMap(.split(" ")).map((,1)).reduceByKey(+)
Тип приема RDD4 - ShuffleRDD, потому что результаты в это время должны быть сгруппированы по ключу, и неизбежно произойдет перетасовка. Перетасовка может быть легко понята в первую очередь. Например, в нашем текущем wordCount, words.txt имеет только 3 ключа , которые являются "zookeeper". , "kafka", "spark", то я делаю это правило, когда тасую
В этот момент мы saveAsTextFile получим файлы 3. Нетрудно обнаружить, что в RDD есть несколько файлов с несколькими разделами. Пять основных характеристик каждого СДР также указаны в правой части рисунка.
2.3 Как создаются RDD
2.3.1 Сборка из существующей коллекции scala
val rdd1=sc.parallelize(List(1,2,3,4,5))
val rdd2=sc.parallelize(Array("zookeeper","kafka","spark"))
val rdd3=sc.makeRDD(List(1,2,3,4))
2.3.2 Загрузка внешних источников данных для сборки
val rdd1=sc.textFile("/words.txt")
2.3.3 Преобразование существующего rdd для создания нового rdd
val rdd2=rdd1.flatMap(_.split(" "))
val rdd3=rdd2.map((_,1))
2.4 Классификация операторов RDD
2.4.1 преобразование
Создайте новый rdd на основе существующего преобразования rdd, он загружается лениво, он не будет выполняться немедленно
Например, map, flatMap, reduceByKey только что использовались в wordCount.
2.4.2 действие
Это фактически вызовет запуск задачи. Верните данные результата расчета rdd в драйвер или сохраните данные результата на внешнем носителе.
Например, collect, saveAsTextFile и т. д., которые только что использовались в wordCount.
2.5 Общие операторы RDD (в сочетании с описанием кода позже)
2.5.1 Операторы преобразования
конвертировать | значение |
---|---|
map(func) | Возвращает новый RDD, состоящий из каждого элемента ввода, преобразованного функцией func. |
filter(func) | Возвращает новый RDD, состоящий из элементов ввода, которые возвращают значение true после оценки функцией func. |
flatMap(func) | Подобно карте, но каждый входной элемент может быть сопоставлен с 0 или более выходными элементами (поэтому func должна возвращать последовательность, а не один элемент) |
mapPartitions(func) | Подобно карте, но работает независимо с каждым осколком СДР, поэтому при работе с СДР типа T тип функции func должен быть Iterator[T] => Iterator[U] |
mapPartitionsWithIndex(func) | Подобно mapPartitions, но func принимает целочисленный параметр, представляющий значение индекса раздела, поэтому при работе с RDD типа T тип функции func должен быть (Int, Interator[T]) => Iterator[U] |
union(otherDataset) | Возвращает новый RDD после объединения исходного RDD и параметра RDD. |
intersection(otherDataset) | Возвращает новый RDD после пересечения исходного RDD и параметра RDD. |
distinct([numTasks])) | Возвращает новый RDD после дедупликации исходного RDD |
groupByKey([numTasks]) | Вызывается для RDD (K, V), возвращает RDD (K, Iterator[V]) |
reduceByKey(func, [numTasks]) | Вызовите RDD из (K, V), верните RDD из (K, V) и используйте указанную функцию сокращения для агрегирования значений одного и того же ключа.Похоже на groupByKey, количество задач сокращения может быть передано через второй необязательный параметр установить |
sortByKey([ascending], [numTasks]) | Вызванный на RDD (K, V), K должен реализовать интерфейс Ordered и вернуть RDD (K, V), отсортированный по ключу. |
sortBy(func,[ascending], [numTasks]) | Аналогичен sortByKey, но более гибкий. |
join(otherDataset, [numTasks]) | Вызывается для СДР типа (K, V) и (K, W), возвращает СДР типа (K, (V, W)) со всеми элементами, соответствующими одному и тому же ключу, соединенному вместе. |
cogroup(otherDataset, [numTasks]) | Вызывается для RDD типа (K, V) и (K, W), возвращает RDD типа (K, (Iterable, Iterable)) |
coalesce(numPartitions) | Уменьшите количество разделов RDD до указанного значения. |
repartition(numPartitions) | Переразбить RDD |
repartitionAndSortWithinPartitions(partitioner) | Переразбить RDD и отсортировать каждый раздел по ключу записи |
2.5.2 Оператор действия
действие | значение |
---|---|
reduce(func) | reduce передает первые два элемента в RDD функции ввода для генерации нового возвращаемого значения.Вновь сгенерированное возвращаемое значение и следующий элемент (третий элемент) в RDD образуют два элемента, которые затем передаются во входную функцию до тех пор, пока В конце есть только одно значение. |
collect() | В драйвере вернуть все элементы набора данных в виде массива |
count() | Возвращает количество элементов в RDD |
first() | Возвращает первый элемент RDD (аналогично take(1)) |
take(n) | Возвращает массив первых n элементов набора данных. |
takeOrdered(n, [ordering]) | Возвращает первые n элементов в естественном или пользовательском порядке |
saveAsTextFile(path) | Сохраните элементы набора данных в виде текстовых файлов в файловой системе HDFS или других поддерживаемых файловых системах.Для каждого элемента Spark вызовет метод toString, чтобы заменить его текстом в файле. |
saveAsSequenceFile(path) | Сохраните элементы набора данных в указанный каталог в формате файла последовательности Hadoop, который можно использовать в HDFS или других файловых системах, поддерживаемых Hadoop. |
saveAsObjectFile(path) | Сохраните элементы набора данных в указанный каталог в виде сериализации Java. |
countByKey() | Для RDD типа (K, V) возвращается карта (K, Int), представляющая количество элементов, соответствующих каждому ключу. |
foreach(func) | Для каждого элемента набора данных запустите функцию func |
foreachPartition(func) | В каждом разделе набора данных запустите функцию func |
2.6 Код Описание общих операторов
Ну, я забыл, что у меня нет локальной среды...
Напоминаю, что операция оператора преобразования выполняется, а реального результата нет, нужно запустить collect(), чтобы действительно начать расчет.
2.6.1 Примечание: перераспределение и объединение
val rdd1 = sc.parallelize(1 to 10,3)
//打印rdd1的分区数
rdd1.partitions.size
//利用repartition改变rdd1分区数
//减少分区
rdd1.repartition(2).partitions.size
//增加分区
rdd1.repartition(4).partitions.size
//利用coalesce改变rdd1分区数
//减少分区
rdd1.coalesce(2).partitions.size
repartition : перераспределение с перемешиванием может использоваться для работы с небольшими файлами.
объединение: объединение разделов/уменьшение разделов, без перемешивания по умолчанию
Объединение по умолчанию не может увеличить количество разделов. Если вы не добавите истинный параметр или не используете перераспределение.
Применимая сцена:
- Если вы хотите перетасовать, используйте перераспределение
- Нет необходимости перемешивать, просто объединить разделы, объединить
- repartition часто используется для расширения разделов.
2.6.2 Примечание: map, mapPartitions и mapPartitionsWithIndex
map: используется для обхода RDD, применения функции f к каждому элементу и возврата нового RDD (оператор преобразования).
mapPartitions: используется для обхода каждого раздела в RDD и возврата для создания нового RDD (оператор преобразования).
Суммировать:
Если в процессе сопоставления необходимо часто создавать дополнительные объекты, использование mapPartitions более эффективно, чем map.
Например, чтобы записать все данные в RDD в базу данных через соединение JDBC, если вы используете функцию карты, вам может потребоваться создать соединение для каждого элемента, что очень дорого.Если вы используете mapPartitions, вам нужно только создать соединение для каждого раздела.
2.6.3 Примечание: foreach, foreachPartition
foreach: используется для обхода RDD, применения функции f к каждому элементу, без возвращаемого значения (оператор действия).
foreachPartition: используется для обхода каждого раздела в операции RDD. Нет возвращаемого значения (оператор действия).
Суммировать:
Как правило, использование операторов mapPartitions или foreachPartition более эффективно, чем map и foreach, и их рекомендуется использовать.
Таким образом, мы можем использовать оператор foreachPartition для достижения
2.7 СДР-зависимости
Существует два разных типа отношений между RDD и его зависимым родительским RDD: узкая зависимость и широкая зависимость.
2.7.1 Узкие зависимости
Узкая зависимость означает, что раздел каждого родительского RDD используется не более чем одним разделом дочернего RDD, например map/flatMap/filter/union и т. д., иВсе узкие зависимости не перемешиваются
2.7.2 Широкие зависимости
Широкая зависимость означает, что разделы нескольких дочерних RDD будут зависеть от разделов одного и того же родительского RDD, например, reduceByKey/sortByKey/groupBy/groupByKey/join и т. д.
. Все широкие зависимости будут перемешаны
На приведенном выше рисунке также показано, что операция соединения делится на широкие зависимости и узкие зависимости.Если RDD имеет один и тот же разделитель, это не вызовет перетасовки.Это соединение является узкой зависимостью, в противном случае это широкая зависимость.
2.8 lineage
Lineage переводится как кровь. Та же картина, что и раньше.
Lineage of RDD будет записывать информацию о метаданных и поведение преобразования RDD. Lineage сохраняет зависимости RDD. Когда часть данных раздела RDD потеряна, он может повторно работать и восстанавливать потерянный раздел данных на основе этой информации.
Однако следует отметить, что RDD поддерживает толькоКрупнозернистые преобразования(то есть записывать только одну операцию, выполняемую в одном блоке), например, если в это время я являюсь номером 0 RDD4, я снова получу все данные RDD3, а затем снова уменьшу по ключу, чтобы восстановить в результате эту широкую Зависимость нужно перетасовать, а стоимость восстановления будет гораздо выше
Стоит отметить, что: нам не нужно вмешательство человека в восстановление данных раздела, сама программа может помочь нам восстановить в соответствии с отношением происхождения RDD
2.9 Механизм кэширования RDD
Данные rdd можно кэшировать, и другим заданиям необходимо использовать результирующие данные rdd, которые можно получить непосредственно из кэша, избегая повторных вычислений. Кэширование предназначено для ускорения последующих операций доступа к данным.
Например, на картинке выше мне нужно кэшировать только результаты RDD2, если я его использую в это время, то восстановить будет очень удобно.
2.9.1 Как установить кэш на RDD
RDD может кэшировать результаты предыдущих вычислений с помощью метода сохранения или метода кэширования. Но нужно обратить вниманиеДело не в том, что эти два метода кэшируются сразу же при их вызове, а в том, что когда запускается последующее действие, RDD будет кэшироваться в памяти вычислительного узла и повторно использоваться позже..
Мы можем взглянуть на сводку исходного кода RDD, чтобы узнать, как выглядят эти два метода.
При просмотре исходного кода обнаруживается, что кеш, наконец, вызывает метод persist, а уровень кеша по умолчанию равенMEMORY_ONLYОни хранятся только в памяти.Существует много уровней хранения Spark.Уровень хранения определяется в объекте StorageLevel.
StorageLevel в Scalaobject(Обратите внимание, что здесь я использую строчные буквы, объект означает одноэлементный объект, а не объект в Java), существует много разных уровней хранения, поэтому я не буду их здесь раскрывать, этот английский язык нетруден для понимания.
Разница между cache и persist заключается в том, что cache: по умолчанию кэширует данные в памяти, суть его в вызове метода persist, и persist: можно кэшировать данные в памяти или на диске, есть богатые уровни кэша, эти уровни кэша определенный в StorageLevel в этом объекте.
2.9.2 Когда использовать кеш
При первом использовании RDD2 для выполнения соответствующей операции оператора для получения RDD3 расчет начнется с RDD1, сначала будет прочитан файл в HDFS, затем выполнена соответствующая операция оператора на RDD1 для получения RDD2, а затем расчет с RDD2 для получения СДР3. Также для расчета RDD4 будет пересчитана предыдущая логика.
По умолчанию, если операция оператора выполняется над СДР несколько раз, СДР один раз пересчитывает СДР и предыдущий родительский СДР. Такая ситуация часто встречается при реальной разработке кода, но мы должныИзбегайте многократного повторения расчета RDD, иначе это приведет к резкому падению производительности..
Для того, чтобы получить результирующие данные СДР, после большого количества операторских операций или сложной логики вычислений, то есть когда данные определенного СДР добыты тяжким трудом, можно установить кеш
Резюме: СДР, которые используются несколько раз, то есть общедоступные СДР, можно сохранить, чтобы избежать последующих потребностей, и повторно пересчитать для повышения эффективности.
2.9.3 Очистить данные кеша
Автоматически очищать: после завершения приложения соответствующие кэшированные данные будут автоматически очищены.
Ручная очистка: позвоните в RDDunpersistметод
Хотя мы можем кэшировать данные RDD, сохранять их в памяти или на диске, а затем получать прямо из памяти или с диска, но обратите внимание наЭтот метод не особенно безопасен.
кеш. Он напрямую хранит данные в памяти, и последующая операция выполняется быстрее, и она получается прямо из памяти. Но этот способ очень небезопасен, так как сервер зависает или процесс завершается, что приведет к потере данных.
persist может сохранять данные на локальный диск, а данные можно получить с диска позже, но это не особо безопасно.Из-за каких-то неправильных действий системного администратора, или диск поврежден, также может привести к потери данных.
Так у нас есть более безопасный способ?
2.10 Механизм контрольной точки RDD
Checkpoint обеспечивает относительно более надежный способ сохранения данных. Он хранит данные в распределенной файловой системе, такой как HDFS. Здесь используется высокая доступность HDFS, высокая отказоустойчивость (несколько копий) для обеспечения безопасности данных в наибольшей степени.
2.10.1 Как установить контрольную точку
1. Настройте каталог контрольных точек на HDFS.
sc.setCheckpointDir("hdfs://node1:9000/checkpoint")
2. Вызовите метод контрольной точки на rdd, который необходимо отметить контрольной точкой.
val rdd1=sc.textFile("/words.txt")
rdd1.checkpoint
val rdd2=rdd1.flatMap(_.split(" "))
3. Наконец, должна быть операция действия, чтобы запустить выполнение задачи (Для выполнения операции контрольной точки требуется операция действия, а операция действия соответствует последующему заданию. После выполнения задания оно отдельно запустит другое задание для выполнения операции rdd1.checkpoint.)
Оптимизируйте использование контрольной точки.Перед вызовом операции контрольной точки вы можете выполнить операцию кэширования для кэширования данных результата, соответствующих rdd, а затем вы можете напрямую получить данные rdd из кэша и записать их в указанный каталог контрольной точки.
rdd2.collect
Итак, давайте суммируем различия между кешем, сохранением и контрольной точкой.
кешировать и сохранять
cache默认数据缓存在内存中
persist可以把数据保存在内存或者磁盘中
后续要触发 cache 和 persist 持久化操作,需要有一个action操作
它不会开启其他新的任务,一个action操作就对应一个job
它不会改变RDD的依赖关系,程序运行完成后对应的缓存数据就自动消失
checkpoint
可以把数据持久化写入到 HDFS 上
后续要触发checkpoint持久化操作,需要有一个action操作,后续会开启新的job执行checkpoint操作
它会改变RDD的依赖关系,后续数据丢失了不能够在通过血统进行数据的恢复。
(因为它判断你已经持久化到 HDFS 中所以把依赖关系删除了)
程序运行完成后对应的checkpoint数据就不会消失
2.11 Генерация DAG-направленного ациклического графа
DAG (направленный ациклический граф) называется направленным ациклическим графом (с направлением, без замкнутого цикла, представляющим поток данных).Исходный RDD формирует DAG посредством серии преобразований.
Это DAG, сгенерированный в нашем примере wordCount, мы можем увидеть эту картинку в описании spark-shell ---> Completed Jobs of Running Application в веб-интерфейсе, упомянутом в предыдущей лекции.
2.11.1 DAG разделен на этапы
Что такое стадия: задание будет разделено на несколько групп задач, каждая группа задач называется стадией, стадия представляет различные этапы планирования, а искровое задание будет генерировать множество стадий соответственно.
Есть 2 типа стадии:
- ShuffleMapStage: последнийВсе преобразования перед перемешиваниемОн называется ShuffleMapStage, а соответствующая ему задача — shuffleMapTask.
- Стадия Результата: последняяОперации после перетасовкиНазывается ResultStage, это последняя стадия, соответствующая задача — ResultTask.
2.11.2 Зачем нам нужно делить этапы
Разделите DAG на разные этапы (этапы планирования) в соответствии с различными зависимостями между RDD.
Для узких зависимостей обработка преобразования разделов рассчитывается на этапе
Для широких зависимостей из-за наличия Shuffle следующее вычисление может быть запущено только после обработки родительского RDD.
Поскольку после разделения этапа на одном и том же этапе остаются только узкие зависимости и нет широких зависимостей, можно реализовать конвейерные вычисления.
Каждый раздел на этапе соответствует задаче, и существует множество задач, которые могут выполняться параллельно на одном этапе.
2.11.3 Как разделить этапы
Разделяем этап исходя из широких зависимостей
Сначала сгенерируйте ациклический граф, ориентированный на DAG, в соответствии с порядком операций оператора rdd, затем продвиньтесь вперед от последнего RDD, создайте новый этап и добавьте rdd к этапу, который является последним этапом.
В процессе продвижения, если встретится узкая зависимость, к этому этапу будет добавлено СДР, если встретится широкая зависимость, то она будет вырезана из положения широкой зависимости, а последний этап будет разделен.
Пересоздайте новый этап, и продолжайте продвигаться по второму шагу, пока не закончится первый RDD, все деление этапа.
2.11.4 Связь между стадией и стадией
После того, как этапы разделены, есть много задач, которые могут выполняться параллельно на каждом этапе.Позднее задачи на каждом этапе инкапсулируются в набор наборов задач, и, наконец, наборы наборов задач отправляются один за другим процессу-исполнителю на рабочий узел для запуска.
Существует отношение зависимости между RDD и RDD, а также отношение зависимости между этапом и этапом.Сначала выполняется задача на предыдущем этапе, а после завершения операции выполняется задача на последующем этапе, т. е. скажем, входные данные задачи на последнем этапе - это предыдущий этап.Выходные данные результата задачи в .
finally
Выше мы вкратце рассмотрели некоторые базовые знания RDD, а некоторые более глубокие и подробные аспекты будут объяснены в Spark Core и Spark Streaming, Заинтересованные друзья могут продолжать обращать на них внимание.