1. Трансформация
Операторы преобразования, обычно используемые spark, следующие:
Оператор преобразования | Значение |
---|---|
map(func) | применяются к каждому элементу в исходном RDDfuncфункции и создать новый RDD |
filter(func) | Для каждого элемента исходного СДР используйтеfuncфункция для фильтрации и создания нового RDD |
flatMap(func) | Подобно карте, но каждый входной элемент сопоставляется с нулем или более выходными элементами (funcТип возвращаемого значения должен быть Seq ). |
mapPartitions(func) | Аналогичен карте, но функция работает с каждым разделом RDD отдельно,funcТип функции — Iterator |
mapPartitionsWithIndex(func) | Аналогично mapPartitions, ноfuncТип (Int, Iterator |
sample(withReplacement, fraction, seed) | Выборка данных, есть три необязательных параметра: установить, заменять ли (withReplacement), процент выборки (fraction), начальное значение генератора случайных чисел; |
union(otherDataset) | Объединить два RDD |
intersection(otherDataset) | Найдите пересечение двух СДР |
distinct([numTasks])) | дедупликация |
groupByKey([numTasks]) | Разделение по значению ключа, то есть при вызове набора данных из (K, V) пар, он возвращает (K, Iterable Note:Если группировка должна выполнять операцию агрегирования для каждого ключа (например, сумма или среднее), используйте reduceByKey илиaggregateByKey производительность будет лучшеNote:По умолчанию степень параллелизма зависит от количества разделов родительского RDD. можно пройти в numTasks параметры изменены. |
reduceByKey(func, [numTasks]) | Сгруппируйте по ключевому значению и выполните операцию сокращения сгруппированных данных. |
aggregateByKey(zeroValue,numPartitions)(seqOp, combOp, [numTasks]) | При вызове набора данных из пар (K, V) возвращает набор данных из пар (K, U), где значение для каждого ключа агрегируется с использованием заданной функции композиции и zeroValue. Как и в случае с groupByKey, количество задач сокращения настраивается с помощью второго параметра. |
sortByKey([ascending], [numTasks]) | Сортировка по ключу, где ключ должен реализовать черту Ordered, которую можно сравнить |
join(otherDataset, [numTasks]) | При вызове набора данных типов (K, V) и (K, W) возвращает набор данных из (K, (V, W)) пар, что эквивалентно операции внутреннего соединения. Если вы хотите выполнить внешнее соединение, вы можете использоватьleftOuterJoin , rightOuterJoin а такжеfullOuterJoin Дождитесь оператора. |
cogroup(otherDataset, [numTasks]) | При вызове набора данных из (K, V) пар возвращает набор данных из (K, (Iterable |
cartesian(otherDataset) | При вызове набора данных типа T и U возвращает набор данных типа (T, U) (т. е. декартово произведение). |
coalesce(numPartitions) | Уменьшите количество разделов в RDD до numPartitions. |
repartition(numPartitions) | Произвольно масштабируйте данные в RDD, чтобы создать больше или меньше разделов и сбалансировать их. |
repartitionAndSortWithinPartitions(partitioner) | Перераспределяет RDD в соответствии с заданным разделителем и сортирует данные в разделе по значению ключа. Это лучше, чем звонитьrepartition В этом случае сортировка становится более эффективной, так как процесс сортировки переносится на машину, на которой выполняется операция перемешивания. |
Основные примеры использования этих операторов приведены ниже:
1.1 map
val list = List(1,2,3)
sc.parallelize(list).map(_ * 10).foreach(println)
// 输出结果: 10 20 30 (这里为了节省篇幅去掉了换行,后文亦同)
1.2 filter
val list = List(3, 6, 9, 10, 12, 21)
sc.parallelize(list).filter(_ >= 10).foreach(println)
// 输出: 10 12 21
1.3 flatMap
flatMap(func)
а такжеmap
Аналогично, но каждый входной элемент сопоставляется с нулем или более выходными элементами (funcТип возвращаемого значения должен бытьSeq
).
val list = List(List(1, 2), List(3), List(), List(4, 5))
sc.parallelize(list).flatMap(_.toList).map(_ * 10).foreach(println)
// 输出结果 : 10 20 30 40 50
Оператор flatMap имеет очень высокую вероятность использования в анализе журнала.Вот демонстрация: разделите каждую строку входных данных на одно слово и присвойте значение 1, представляющее одно вхождение, затем сгруппируйте по словам и подсчитайте общее количество вхождений, код выглядит следующим образом:
val lines = List("spark flume spark",
"hadoop flume hive")
sc.parallelize(lines).flatMap(line => line.split(" ")).
map(word=>(word,1)).reduceByKey(_+_).foreach(println)
// 输出:
(spark,2)
(hive,1)
(hadoop,1)
(flume,2)
1.4 mapPartitions
Аналогичен карте, но функция работает с каждым разделом RDD отдельно,funcТип функцииIterator<T> => Iterator<U>
(где T — тип RDD), т. е. и ввод, и вывод должны иметь тип iterable.
val list = List(1, 2, 3, 4, 5, 6)
sc.parallelize(list, 3).mapPartitions(iterator => {
val buffer = new ListBuffer[Int]
while (iterator.hasNext) {
buffer.append(iterator.next() * 100)
}
buffer.toIterator
}).foreach(println)
//输出结果
100 200 300 400 500 600
1.5 mapPartitionsWithIndex
Аналогично mapPartitions, ноfuncТип(Int, Iterator<T>) => Iterator<U>
, где первый аргумент — это индекс раздела.
val list = List(1, 2, 3, 4, 5, 6)
sc.parallelize(list, 3).mapPartitionsWithIndex((index, iterator) => {
val buffer = new ListBuffer[String]
while (iterator.hasNext) {
buffer.append(index + "分区:" + iterator.next() * 100)
}
buffer.toIterator
}).foreach(println)
//输出
0 分区:100
0 分区:200
1 分区:300
1 分区:400
2 分区:500
2 分区:600
1.6 sample
Выборка данных. Есть три необязательных параметра: установить, заменять ли (withReplacement), процент выборки (fraction) и начальное число генератора случайных чисел (seed):
val list = List(1, 2, 3, 4, 5, 6)
sc.parallelize(list).sample(withReplacement = false, fraction = 0.5).foreach(println)
1.7 union
Объедините два RDD:
val list1 = List(1, 2, 3)
val list2 = List(4, 5, 6)
sc.parallelize(list1).union(sc.parallelize(list2)).foreach(println)
// 输出: 1 2 3 4 5 6
1.8 intersection
Найдите пересечение двух СДР:
val list1 = List(1, 2, 3, 4, 5)
val list2 = List(4, 5, 6)
sc.parallelize(list1).intersection(sc.parallelize(list2)).foreach(println)
// 输出: 4 5
1.9 distinct
Дедупликация:
val list = List(1, 2, 2, 4, 4)
sc.parallelize(list).distinct().foreach(println)
// 输出: 4 1 2
1.10 groupByKey
Сгруппировать по ключу:
val list = List(("hadoop", 2), ("spark", 3), ("spark", 5), ("storm", 6), ("hadoop", 2))
sc.parallelize(list).groupByKey().map(x => (x._1, x._2.toList)).foreach(println)
//输出:
(spark,List(3, 5))
(hadoop,List(2, 2))
(storm,List(6))
1.11 reduceByKey
Сокращение операции по клавише:
val list = List(("hadoop", 2), ("spark", 3), ("spark", 5), ("storm", 6), ("hadoop", 2))
sc.parallelize(list).reduceByKey(_ + _).foreach(println)
//输出
(spark,8)
(hadoop,4)
(storm,6)
1.12 sortBy & sortByKey
Сортировать по ключу:
val list01 = List((100, "hadoop"), (90, "spark"), (120, "storm"))
sc.parallelize(list01).sortByKey(ascending = false).foreach(println)
// 输出
(120,storm)
(90,spark)
(100,hadoop)
Сортировать по указанному элементу:
val list02 = List(("hadoop",100), ("spark",90), ("storm",120))
sc.parallelize(list02).sortBy(x=>x._2,ascending=false).foreach(println)
// 输出
(storm,120)
(hadoop,100)
(spark,90)
1.13 join
При вызове набора данных типа (K, V) и (K, W) возвращает набор данных (K, (V, W)), что эквивалентно операции внутреннего соединения. Если вы хотите выполнить внешнее соединение, вы можете использоватьleftOuterJoin
, rightOuterJoin
а такжеfullOuterJoin
Дождитесь оператора.
val list01 = List((1, "student01"), (2, "student02"), (3, "student03"))
val list02 = List((1, "teacher01"), (2, "teacher02"), (3, "teacher03"))
sc.parallelize(list01).join(sc.parallelize(list02)).foreach(println)
// 输出
(1,(student01,teacher01))
(3,(student03,teacher03))
(2,(student02,teacher02))
1.14 cogroup
При вызове набора данных из (K, V) пар возвращает набор данных из кортежей типа (K, (Iterable
val list01 = List((1, "a"),(1, "a"), (2, "b"), (3, "e"))
val list02 = List((1, "A"), (2, "B"), (3, "E"))
val list03 = List((1, "[ab]"), (2, "[bB]"), (3, "eE"),(3, "eE"))
sc.parallelize(list01).cogroup(sc.parallelize(list02),sc.parallelize(list03)).foreach(println)
// 输出: 同一个 RDD 中的元素先按照 key 进行分组,然后再对不同 RDD 中的元素按照 key 进行分组
(1,(CompactBuffer(a, a),CompactBuffer(A),CompactBuffer([ab])))
(3,(CompactBuffer(e),CompactBuffer(E),CompactBuffer(eE, eE)))
(2,(CompactBuffer(b),CompactBuffer(B),CompactBuffer([bB])))
1.15 cartesian
Вычислите декартово произведение:
val list1 = List("A", "B", "C")
val list2 = List(1, 2, 3)
sc.parallelize(list1).cartesian(sc.parallelize(list2)).foreach(println)
//输出笛卡尔积
(A,1)
(A,2)
(A,3)
(B,1)
(B,2)
(B,3)
(C,1)
(C,2)
(C,3)
1.16 aggregateByKey
При вызове набора данных из пар (K, V) возвращает набор данных из пар (K, U), где значение для каждого ключа агрегируется с использованием заданной функции композиции и zeroValue. а такжеgroupByKey
Точно так же количество задач сокращения может быть передано в качестве второго параметра.numPartitions
настроить. Пример выглядит следующим образом:
// 为了清晰,以下所有参数均使用具名传参
val list = List(("hadoop", 3), ("hadoop", 2), ("spark", 4), ("spark", 3), ("storm", 6), ("storm", 8))
sc.parallelize(list,numSlices = 2).aggregateByKey(zeroValue = 0,numPartitions = 3)(
seqOp = math.max(_, _),
combOp = _ + _
).collect.foreach(println)
//输出结果:
(hadoop,3)
(storm,8)
(spark,7)
используется здесьnumSlices = 2
Укажите количество разделов для родительской операцииaggregateByKeyрапараллелить равным 2, а ее поток выполнения будет следующим:
На основе того же потока выполнения, еслиnumSlices = 1
, это означает, что введен только один раздел, тогда последний шаг combOp эквивалентен недопустимому, и результат выполнения:
(hadoop,3)
(storm,8)
(spark,4)
Аналогично, если каждое слово соответствует разделу, т.е.numSlices = 6
, что эквивалентно операции суммирования, а результат выполнения:
(hadoop,5)
(storm,14)
(spark,7)
aggregateByKey(zeroValue = 0,numPartitions = 3)
второй параметрnumPartitions
Решение состоит в том, чтобы вывести количество разделов RDD.Чтобы проверить эту проблему, вы можете переписать приведенный выше код и использоватьgetNumPartitions
Метод получения количества разделов:
sc.parallelize(list,numSlices = 6).aggregateByKey(zeroValue = 0,numPartitions = 3)(
seqOp = math.max(_, _),
combOp = _ + _
).getNumPartitions
2. Действие
Наиболее часто используемые операторы Action в Spark:
Действие | Значение |
---|---|
reduce(func) | использовать функциюfuncвыполнить операцию сокращения |
collect() | Возвращает все элементы набора данных в виде массива, подходящего для небольших наборов результатов. |
count() | Возвращает количество элементов в наборе данных. |
first() | Возвращает первый элемент в наборе данных, эквивалентный take(1). |
take(n) | перед набором данныхnэлементы возвращаются как массив массивов. |
takeSample(withReplacement, num, [seed]) | Случайная выборка набора данных |
takeOrdered(n, [ordering]) | Возврат перед сортировкой по естественному порядку или пользовательскому компараторуnэлементы. Подходит только для небольших наборов результатов, так как все данные будут загружены в память драйвера для сортировки. |
saveAsTextFile(path) | Запишите элементы набора данных в виде текстовых файлов в локальную файловую систему, HDFS или другую файловую систему, поддерживаемую Hadoop. Spark будет вызывать метод toString для каждого элемента, преобразуя элемент в одну строку записей в текстовом файле. |
saveAsSequenceFile(path) | Запишите элементы набора данных как Hadoop SequenceFile в локальную файловую систему, HDFS или другую файловую систему, поддерживаемую Hadoop. Эта операция требует, чтобы элементы в RDD реализовывали Writable-интерфейс Hadoop. Для языка Scala он может автоматически и неявно преобразовывать базовые типы данных в Spark в соответствующие типы с возможностью записи. (В настоящее время поддерживает только Java и Scala) |
saveAsObjectFile(path) | Используя Java для сериализации и хранения, вы можете использоватьSparkContext.objectFile() загрузить. (В настоящее время поддерживает только Java и Scala) |
countByKey() | Подсчитайте количество вхождений каждого ключа. |
foreach(func) | Перебирает каждый элемент в RDD и выполняет егоfunфункция |
2.1 reduce
использовать функциюfuncВыполните операцию сокращения:
val list = List(1, 2, 3, 4, 5)
sc.parallelize(list).reduce((x, y) => x + y)
sc.parallelize(list).reduce(_ + _)
// 输出 15
2.2 takeOrdered
Возврат перед сортировкой по естественному порядку или пользовательскому компараторуnэлементы. должен быть в курсеtakeOrdered
Неявное преобразование с использованием неявных параметров, ниже приведен его исходный код. Поэтому при использовании пользовательской сортировки вам необходимо наследоватьOrdering[T]
Реализуйте собственный компаратор, а затем введите его как неявный параметр.
def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope {
.........
}
Пользовательская сортировка правил:
// 继承 Ordering[T],实现自定义比较器,按照 value 值的长度进行排序
class CustomOrdering extends Ordering[(Int, String)] {
override def compare(x: (Int, String), y: (Int, String)): Int
= if (x._2.length > y._2.length) 1 else -1
}
val list = List((1, "hadoop"), (1, "storm"), (1, "azkaban"), (1, "hive"))
// 引入隐式默认值
implicit val implicitOrdering = new CustomOrdering
sc.parallelize(list).takeOrdered(5)
// 输出: Array((1,hive), (1,storm), (1,hadoop), (1,azkaban)
2.3 countByKey
Подсчитайте количество вхождений каждого ключа:
val list = List(("hadoop", 10), ("hadoop", 10), ("storm", 3), ("storm", 3), ("azkaban", 1))
sc.parallelize(list).countByKey()
// 输出: Map(hadoop -> 2, storm -> 2, azkaban -> 1)
2.4 saveAsTextFile
Запишите элементы набора данных в виде текстовых файлов в локальную файловую систему, HDFS или другую файловую систему, поддерживаемую Hadoop. Spark будет вызывать метод toString для каждого элемента, преобразуя элемент в одну строку записей в текстовом файле.
val list = List(("hadoop", 10), ("hadoop", 10), ("storm", 3), ("storm", 3), ("azkaban", 1))
sc.parallelize(list).saveAsTextFile("/usr/file/temp")
использованная литература
Дополнительные статьи серии о больших данных см. в проекте с открытым исходным кодом GitHub.:Руководство для начинающих по большим данным