Оператор действия RDD среди трех основных структур данных Spark

задняя часть Большие данные
Оператор действия RDD среди трех основных структур данных Spark

Это 4-й день моего участия в Gengwen Challenge, смотрите подробности мероприятия:Обновить вызов

вводить

  • С точки зрения операций с данными операторы RDD можно условно разделить на две категории: преобразования и действия.

    • Оператор преобразования: Преобразование одного RDD в другой RDD — это просто суперпозиция функций, которая на самом деле не выполняется. (шаблон декоратора)
    • Оператор действия: Оператор действия фактически инициирует SparkContext для отправки задания задания.

本文讲述其中的 行动算子

1. reduce

Объединить все элементы в RDD, сначала агрегировать данные в разделе, а затем агрегировать данные между разделами.

//使用例子

val sc: SparkContext = new SparkContext(
    new SparkConf()
        .setMaster("local")
        .setAppName("RddMem")
)

val rdd: RDD[Int] = sc.makeRDD(
    List(1, 2, 4, 5, 6)
)
//聚合
val sum = rdd.reduce(_ + _)
println(sum)

sc.stop()

2. collect

В драйвере (Driver) вернуть все элементы набора данных в виде массива Array

//使用例子

val sc: SparkContext = new SparkContext(
    new SparkConf()
        .setMaster("local")
        .setAppName("RddMem")
)

val rdd: RDD[Int] = sc.makeRDD(
    List(1, 2, 4, 5, 6), 3
)

val list = rdd.collect()
println(list.mkString(","))

sc.stop()

3. count

Получить количество элементов в RDD

//使用例子

val sc: SparkContext = new SparkContext(
    new SparkConf()
        .setMaster("local")
        .setAppName("RddMem")
)

val rdd: RDD[Int] = sc.makeRDD(
    List(1, 2, 4, 5, 6), 3
)

val count = rdd.count()
println(count)

sc.stop()

4. first

Возьмите первый элемент в RDD

//使用例子

val sc: SparkContext = new SparkContext(
    new SparkConf()
        .setMaster("local")
        .setAppName("RddMem")
)

val rdd: RDD[Int] = sc.makeRDD(
    List(1, 2, 4, 5, 6), 3
)
//取第一个
val firstNum = rdd.first()
println(firstNum)

sc.stop()

5. take

Возьмите массив из первых n элементов СДР.

//使用例子

val sc: SparkContext = new SparkContext(
    new SparkConf()
        .setMaster("local")
        .setAppName("RddMem")
)

val rdd: RDD[Int] = sc.makeRDD(
    List(1, 2, 4, 5, 6), 3
)
//取前三个
val numList = rdd.take(3)
println(numList.mkString(","))

sc.stop()

6. takeOrdered

Возьмите массив первых n элементов после сортировки RDD

//使用例子

val sc: SparkContext = new SparkContext(
    new SparkConf()
        .setMaster("local")
        .setAppName("RddMem")
)

val rdd: RDD[Int] = sc.makeRDD(
    List(1, 7, 4, 5, 6), 3
)

//升序
//val numList = rdd.takeOrdered(3)
//降序
val numList = rdd.takeOrdered(3)(Ordering.Int.reverse)
println(numList.mkString(","))

sc.stop()

7. aggregate

Данные раздела агрегируются по начальному значению и данным в разделе, а затем данные между разделами агрегируются с начальным значением

//使用例子

val sc: SparkContext = new SparkContext(
    new SparkConf()
        .setMaster("local")
        .setAppName("RddMem")
)

val rdd: RDD[Int] = sc.makeRDD(
    List(1, 7, 4, 5), 2
)

// 0+1 + 7 =8
// 0+4 + 5 =9
// 0+9 + 10=90
val res: Int = rdd.aggregate(0)(
    (i: Int, j: Int) => {
        i + j
    }
    , (i: Int, j: Int) => {
        i + j
    }
)
println(res)

// 1+1 + 7 =9
// 1+4 + 5 =10
// 1*9 * 10=90
val res2: Int = rdd.aggregate(1)(_ + _, _ * _)
println(res2)

sc.stop()

8. fold

Объединить все элементы в RDD, сначала агрегировать данные в разделе, а затем агрегировать данные между разделами.Правила расчета для внутреннего раздела и комбинированного раздела одинаковы.

Эквивалент совокупного упрощенного письма

//使用例子

val sc: SparkContext = new SparkContext(
    new SparkConf()
        .setMaster("local")
        .setAppName("RddMem")
)

val rdd: RDD[Int] = sc.makeRDD(
    List(1, 7, 4, 5), 2
)

//与rdd.aggregate(0)(_ + _, _ + _)
val res: Int = rdd.fold(0)(_ + _)
println(res)

sc.stop()

9. countByKey

Подсчитайте количество каждой клавиши

//使用例子

val sc: SparkContext = new SparkContext(
    new SparkConf()
        .setMaster("local")
        .setAppName("RddMem")
)

val rdd: RDD[(String,Int)] = sc.makeRDD(
    List(("a", 1), ("b", 2), ("a", 1), ("b", 2)), 2
)

val res = rdd.countByKey()
println(res)

sc.stop()

10. saveXxxxx

Сохраняйте данные в файлы в разных форматах

//使用例子

val sc = new SparkContext(
    new SparkConf().setMaster("local[*]").setAppName("MapPartitions")
)

val rdd = sc.makeRDD(List(
    ("a", 1), ("a", 2), ("b", 3), ("b", 3), ("b", 3)
), 1)

/**
 * text文件
 */
rdd.saveAsTextFile("text")

/**
 * Sequence文件
 */
rdd.saveAsSequenceFile("sequence")

/**
 * object文件
 */
rdd.saveAsObjectFile("object")

sc.stop()

11. foreach

分布式Пройдите каждый элемент в RDD и вызовите указанную функцию

//使用例子

val sc = new SparkContext(
    new SparkConf().setMaster("local[*]").setAppName("MapPartitions")
)

val rdd = sc.makeRDD(List(
    ("a", 1), ("a", 2), ("b", 3), ("b", 3), ("b", 3)
), 1)

//分布式遍历
rdd.foreach(println(_))

sc.stop()