Это 4-й день моего участия в Gengwen Challenge, смотрите подробности мероприятия:Обновить вызов
вводить
-
С точки зрения операций с данными операторы RDD можно условно разделить на две категории: преобразования и действия.
- Оператор преобразования: Преобразование одного RDD в другой RDD — это просто суперпозиция функций, которая на самом деле не выполняется. (шаблон декоратора)
- Оператор действия: Оператор действия фактически инициирует SparkContext для отправки задания задания.
本文讲述其中的 行动算子
1. reduce
Объединить все элементы в RDD, сначала агрегировать данные в разделе, а затем агрегировать данные между разделами.
//使用例子
val sc: SparkContext = new SparkContext(
new SparkConf()
.setMaster("local")
.setAppName("RddMem")
)
val rdd: RDD[Int] = sc.makeRDD(
List(1, 2, 4, 5, 6)
)
//聚合
val sum = rdd.reduce(_ + _)
println(sum)
sc.stop()
2. collect
В драйвере (Driver) вернуть все элементы набора данных в виде массива Array
//使用例子
val sc: SparkContext = new SparkContext(
new SparkConf()
.setMaster("local")
.setAppName("RddMem")
)
val rdd: RDD[Int] = sc.makeRDD(
List(1, 2, 4, 5, 6), 3
)
val list = rdd.collect()
println(list.mkString(","))
sc.stop()
3. count
Получить количество элементов в RDD
//使用例子
val sc: SparkContext = new SparkContext(
new SparkConf()
.setMaster("local")
.setAppName("RddMem")
)
val rdd: RDD[Int] = sc.makeRDD(
List(1, 2, 4, 5, 6), 3
)
val count = rdd.count()
println(count)
sc.stop()
4. first
Возьмите первый элемент в RDD
//使用例子
val sc: SparkContext = new SparkContext(
new SparkConf()
.setMaster("local")
.setAppName("RddMem")
)
val rdd: RDD[Int] = sc.makeRDD(
List(1, 2, 4, 5, 6), 3
)
//取第一个
val firstNum = rdd.first()
println(firstNum)
sc.stop()
5. take
Возьмите массив из первых n элементов СДР.
//使用例子
val sc: SparkContext = new SparkContext(
new SparkConf()
.setMaster("local")
.setAppName("RddMem")
)
val rdd: RDD[Int] = sc.makeRDD(
List(1, 2, 4, 5, 6), 3
)
//取前三个
val numList = rdd.take(3)
println(numList.mkString(","))
sc.stop()
6. takeOrdered
Возьмите массив первых n элементов после сортировки RDD
//使用例子
val sc: SparkContext = new SparkContext(
new SparkConf()
.setMaster("local")
.setAppName("RddMem")
)
val rdd: RDD[Int] = sc.makeRDD(
List(1, 7, 4, 5, 6), 3
)
//升序
//val numList = rdd.takeOrdered(3)
//降序
val numList = rdd.takeOrdered(3)(Ordering.Int.reverse)
println(numList.mkString(","))
sc.stop()
7. aggregate
Данные раздела агрегируются по начальному значению и данным в разделе, а затем данные между разделами агрегируются с начальным значением
//使用例子
val sc: SparkContext = new SparkContext(
new SparkConf()
.setMaster("local")
.setAppName("RddMem")
)
val rdd: RDD[Int] = sc.makeRDD(
List(1, 7, 4, 5), 2
)
// 0+1 + 7 =8
// 0+4 + 5 =9
// 0+9 + 10=90
val res: Int = rdd.aggregate(0)(
(i: Int, j: Int) => {
i + j
}
, (i: Int, j: Int) => {
i + j
}
)
println(res)
// 1+1 + 7 =9
// 1+4 + 5 =10
// 1*9 * 10=90
val res2: Int = rdd.aggregate(1)(_ + _, _ * _)
println(res2)
sc.stop()
8. fold
Объединить все элементы в RDD, сначала агрегировать данные в разделе, а затем агрегировать данные между разделами.Правила расчета для внутреннего раздела и комбинированного раздела одинаковы.
Эквивалент совокупного упрощенного письма
//使用例子
val sc: SparkContext = new SparkContext(
new SparkConf()
.setMaster("local")
.setAppName("RddMem")
)
val rdd: RDD[Int] = sc.makeRDD(
List(1, 7, 4, 5), 2
)
//与rdd.aggregate(0)(_ + _, _ + _)
val res: Int = rdd.fold(0)(_ + _)
println(res)
sc.stop()
9. countByKey
Подсчитайте количество каждой клавиши
//使用例子
val sc: SparkContext = new SparkContext(
new SparkConf()
.setMaster("local")
.setAppName("RddMem")
)
val rdd: RDD[(String,Int)] = sc.makeRDD(
List(("a", 1), ("b", 2), ("a", 1), ("b", 2)), 2
)
val res = rdd.countByKey()
println(res)
sc.stop()
10. saveXxxxx
Сохраняйте данные в файлы в разных форматах
//使用例子
val sc = new SparkContext(
new SparkConf().setMaster("local[*]").setAppName("MapPartitions")
)
val rdd = sc.makeRDD(List(
("a", 1), ("a", 2), ("b", 3), ("b", 3), ("b", 3)
), 1)
/**
* text文件
*/
rdd.saveAsTextFile("text")
/**
* Sequence文件
*/
rdd.saveAsSequenceFile("sequence")
/**
* object文件
*/
rdd.saveAsObjectFile("object")
sc.stop()
11. foreach
分布式
Пройдите каждый элемент в RDD и вызовите указанную функцию
//使用例子
val sc = new SparkContext(
new SparkConf().setMaster("local[*]").setAppName("MapPartitions")
)
val rdd = sc.makeRDD(List(
("a", 1), ("a", 2), ("b", 3), ("b", 3), ("b", 3)
), 1)
//分布式遍历
rdd.foreach(println(_))
sc.stop()