Серия Spark (4) — Подробное объяснение общих операторов RDD

Spark

1. Трансформация

Операторы преобразования, обычно используемые spark, следующие:

Оператор преобразования Значение
map(func) применяются к каждому элементу в исходном RDDfuncфункции и создать новый RDD
filter(func) Для каждого элемента исходного СДР используйтеfuncфункция для фильтрации и создания нового RDD
flatMap(func) Подобно карте, но каждый входной элемент сопоставляется с нулем или более выходными элементами (funcТип возвращаемого значения должен быть Seq ).
mapPartitions(func) Аналогичен карте, но функция работает с каждым разделом RDD отдельно,funcТип функции — Iterator => Iterator , где T — тип RDD , т. е. RDD[T]
mapPartitionsWithIndex(func) Аналогично mapPartitions, ноfuncТип (Int, Iterator) => Iterator , где первый параметр — это индекс раздела.
sample(withReplacement, fraction, seed) Выборка данных, есть три необязательных параметра: установить, заменять ли (withReplacement), процент выборки (fraction), начальное значение генератора случайных чисел;
union(otherDataset) Объединить два RDD
intersection(otherDataset) Найдите пересечение двух СДР
distinct([numTasks])) дедупликация
groupByKey([numTasks]) Разделение по значению ключа, то есть при вызове набора данных из (K, V) пар, он возвращает (K, Iterable)
Note:Если группировка должна выполнять операцию агрегирования для каждого ключа (например, сумма или среднее), используйтеreduceByKeyилиaggregateByKeyпроизводительность будет лучше
Note:По умолчанию степень параллелизма зависит от количества разделов родительского RDD. можно пройти вnumTasksпараметры изменены.
reduceByKey(func, [numTasks]) Сгруппируйте по ключевому значению и выполните операцию сокращения сгруппированных данных.
aggregateByKey(zeroValue,numPartitions)(seqOp, combOp, [numTasks]) При вызове набора данных из пар (K, V) возвращает набор данных из пар (K, U), где значение для каждого ключа агрегируется с использованием заданной функции композиции и zeroValue. Как и в случае с groupByKey, количество задач сокращения настраивается с помощью второго параметра.
sortByKey([ascending], [numTasks]) Сортировка по ключу, где ключ должен реализовать черту Ordered, которую можно сравнить
join(otherDataset, [numTasks]) При вызове набора данных типов (K, V) и (K, W) возвращает набор данных из (K, (V, W)) пар, что эквивалентно операции внутреннего соединения. Если вы хотите выполнить внешнее соединение, вы можете использоватьleftOuterJoin, rightOuterJoinа такжеfullOuterJoinДождитесь оператора.
cogroup(otherDataset, [numTasks]) При вызове набора данных из (K, V) пар возвращает набор данных из (K, (Iterable, Iterable)) кортежей.
cartesian(otherDataset) При вызове набора данных типа T и U возвращает набор данных типа (T, U) (т. е. декартово произведение).
coalesce(numPartitions) Уменьшите количество разделов в RDD до numPartitions.
repartition(numPartitions) Произвольно масштабируйте данные в RDD, чтобы создать больше или меньше разделов и сбалансировать их.
repartitionAndSortWithinPartitions(partitioner) Перераспределяет RDD в соответствии с заданным разделителем и сортирует данные в разделе по значению ключа. Это лучше, чем звонитьrepartitionВ этом случае сортировка становится более эффективной, так как процесс сортировки переносится на машину, на которой выполняется операция перемешивания.

Основные примеры использования этих операторов приведены ниже:

1.1 map

val list = List(1,2,3)
sc.parallelize(list).map(_ * 10).foreach(println)

// 输出结果: 10 20 30 (这里为了节省篇幅去掉了换行,后文亦同)

1.2 filter

val list = List(3, 6, 9, 10, 12, 21)
sc.parallelize(list).filter(_ >= 10).foreach(println)

// 输出: 10 12 21

1.3 flatMap

flatMap(func)а такжеmapАналогично, но каждый входной элемент сопоставляется с нулем или более выходными элементами (funcТип возвращаемого значения должен бытьSeq).

val list = List(List(1, 2), List(3), List(), List(4, 5))
sc.parallelize(list).flatMap(_.toList).map(_ * 10).foreach(println)

// 输出结果 : 10 20 30 40 50

Оператор flatMap имеет очень высокую вероятность использования в анализе журнала.Вот демонстрация: разделите каждую строку входных данных на одно слово и присвойте значение 1, представляющее одно вхождение, затем сгруппируйте по словам и подсчитайте общее количество вхождений, код выглядит следующим образом:

val lines = List("spark flume spark",
                 "hadoop flume hive")
sc.parallelize(lines).flatMap(line => line.split(" ")).
map(word=>(word,1)).reduceByKey(_+_).foreach(println)

// 输出:
(spark,2)
(hive,1)
(hadoop,1)
(flume,2)

1.4 mapPartitions

Аналогичен карте, но функция работает с каждым разделом RDD отдельно,funcТип функцииIterator<T> => Iterator<U>(где T — тип RDD), т. е. и ввод, и вывод должны иметь тип iterable.

val list = List(1, 2, 3, 4, 5, 6)
sc.parallelize(list, 3).mapPartitions(iterator => {
  val buffer = new ListBuffer[Int]
  while (iterator.hasNext) {
    buffer.append(iterator.next() * 100)
  }
  buffer.toIterator
}).foreach(println)
//输出结果
100 200 300 400 500 600

1.5 mapPartitionsWithIndex

Аналогично mapPartitions, ноfuncТип(Int, Iterator<T>) => Iterator<U>, где первый аргумент — это индекс раздела.

val list = List(1, 2, 3, 4, 5, 6)
sc.parallelize(list, 3).mapPartitionsWithIndex((index, iterator) => {
  val buffer = new ListBuffer[String]
  while (iterator.hasNext) {
    buffer.append(index + "分区:" + iterator.next() * 100)
  }
  buffer.toIterator
}).foreach(println)
//输出
0 分区:100
0 分区:200
1 分区:300
1 分区:400
2 分区:500
2 分区:600

1.6 sample

Выборка данных. Есть три необязательных параметра: установить, заменять ли (withReplacement), процент выборки (fraction) и начальное число генератора случайных чисел (seed):

val list = List(1, 2, 3, 4, 5, 6)
sc.parallelize(list).sample(withReplacement = false, fraction = 0.5).foreach(println)

1.7 union

Объедините два RDD:

val list1 = List(1, 2, 3)
val list2 = List(4, 5, 6)
sc.parallelize(list1).union(sc.parallelize(list2)).foreach(println)
// 输出: 1 2 3 4 5 6

1.8 intersection

Найдите пересечение двух СДР:

val list1 = List(1, 2, 3, 4, 5)
val list2 = List(4, 5, 6)
sc.parallelize(list1).intersection(sc.parallelize(list2)).foreach(println)
// 输出:  4 5

1.9 distinct

Дедупликация:

val list = List(1, 2, 2, 4, 4)
sc.parallelize(list).distinct().foreach(println)
// 输出: 4 1 2

1.10 groupByKey

Сгруппировать по ключу:

val list = List(("hadoop", 2), ("spark", 3), ("spark", 5), ("storm", 6), ("hadoop", 2))
sc.parallelize(list).groupByKey().map(x => (x._1, x._2.toList)).foreach(println)

//输出:
(spark,List(3, 5))
(hadoop,List(2, 2))
(storm,List(6))

1.11 reduceByKey

Сокращение операции по клавише:

val list = List(("hadoop", 2), ("spark", 3), ("spark", 5), ("storm", 6), ("hadoop", 2))
sc.parallelize(list).reduceByKey(_ + _).foreach(println)

//输出
(spark,8)
(hadoop,4)
(storm,6)

1.12 sortBy & sortByKey

Сортировать по ключу:

val list01 = List((100, "hadoop"), (90, "spark"), (120, "storm"))
sc.parallelize(list01).sortByKey(ascending = false).foreach(println)
// 输出
(120,storm)
(90,spark)
(100,hadoop)

Сортировать по указанному элементу:

val list02 = List(("hadoop",100), ("spark",90), ("storm",120))
sc.parallelize(list02).sortBy(x=>x._2,ascending=false).foreach(println)
// 输出
(storm,120)
(hadoop,100)
(spark,90)

1.13 join

При вызове набора данных типа (K, V) и (K, W) возвращает набор данных (K, (V, W)), что эквивалентно операции внутреннего соединения. Если вы хотите выполнить внешнее соединение, вы можете использоватьleftOuterJoin, rightOuterJoinа такжеfullOuterJoinДождитесь оператора.

val list01 = List((1, "student01"), (2, "student02"), (3, "student03"))
val list02 = List((1, "teacher01"), (2, "teacher02"), (3, "teacher03"))
sc.parallelize(list01).join(sc.parallelize(list02)).foreach(println)

// 输出
(1,(student01,teacher01))
(3,(student03,teacher03))
(2,(student02,teacher02))

1.14 cogroup

При вызове набора данных из (K, V) пар возвращает набор данных из кортежей типа (K, (Iterable, Iterable)).

val list01 = List((1, "a"),(1, "a"), (2, "b"), (3, "e"))
val list02 = List((1, "A"), (2, "B"), (3, "E"))
val list03 = List((1, "[ab]"), (2, "[bB]"), (3, "eE"),(3, "eE"))
sc.parallelize(list01).cogroup(sc.parallelize(list02),sc.parallelize(list03)).foreach(println)

// 输出: 同一个 RDD 中的元素先按照 key 进行分组,然后再对不同 RDD 中的元素按照 key 进行分组
(1,(CompactBuffer(a, a),CompactBuffer(A),CompactBuffer([ab])))
(3,(CompactBuffer(e),CompactBuffer(E),CompactBuffer(eE, eE)))
(2,(CompactBuffer(b),CompactBuffer(B),CompactBuffer([bB])))

1.15 cartesian

Вычислите декартово произведение:

val list1 = List("A", "B", "C")
val list2 = List(1, 2, 3)
sc.parallelize(list1).cartesian(sc.parallelize(list2)).foreach(println)

//输出笛卡尔积
(A,1)
(A,2)
(A,3)
(B,1)
(B,2)
(B,3)
(C,1)
(C,2)
(C,3)

1.16 aggregateByKey

При вызове набора данных из пар (K, V) возвращает набор данных из пар (K, U), где значение для каждого ключа агрегируется с использованием заданной функции композиции и zeroValue. а такжеgroupByKeyТочно так же количество задач сокращения может быть передано в качестве второго параметра.numPartitionsнастроить. Пример выглядит следующим образом:

// 为了清晰,以下所有参数均使用具名传参
val list = List(("hadoop", 3), ("hadoop", 2), ("spark", 4), ("spark", 3), ("storm", 6), ("storm", 8))
sc.parallelize(list,numSlices = 2).aggregateByKey(zeroValue = 0,numPartitions = 3)(
      seqOp = math.max(_, _),
      combOp = _ + _
    ).collect.foreach(println)
//输出结果:
(hadoop,3)
(storm,8)
(spark,7)

используется здесьnumSlices = 2Укажите количество разделов для родительской операцииaggregateByKeyрапараллелить равным 2, а ее поток выполнения будет следующим:

https://github.com/heibaiying

На основе того же потока выполнения, еслиnumSlices = 1, это означает, что введен только один раздел, тогда последний шаг combOp эквивалентен недопустимому, и результат выполнения:

(hadoop,3)
(storm,8)
(spark,4)

Аналогично, если каждое слово соответствует разделу, т.е.numSlices = 6, что эквивалентно операции суммирования, а результат выполнения:

(hadoop,5)
(storm,14)
(spark,7)

aggregateByKey(zeroValue = 0,numPartitions = 3)второй параметрnumPartitionsРешение состоит в том, чтобы вывести количество разделов RDD.Чтобы проверить эту проблему, вы можете переписать приведенный выше код и использоватьgetNumPartitionsМетод получения количества разделов:

sc.parallelize(list,numSlices = 6).aggregateByKey(zeroValue = 0,numPartitions = 3)(
  seqOp = math.max(_, _),
  combOp = _ + _
).getNumPartitions

https://github.com/heibaiying

2. Действие

Наиболее часто используемые операторы Action в Spark:

Действие Значение
reduce(func) использовать функциюfuncвыполнить операцию сокращения
collect() Возвращает все элементы набора данных в виде массива, подходящего для небольших наборов результатов.
count() Возвращает количество элементов в наборе данных.
first() Возвращает первый элемент в наборе данных, эквивалентный take(1).
take(n) перед набором данныхnэлементы возвращаются как массив массивов.
takeSample(withReplacement, num, [seed]) Случайная выборка набора данных
takeOrdered(n, [ordering]) Возврат перед сортировкой по естественному порядку или пользовательскому компараторуnэлементы. Подходит только для небольших наборов результатов, так как все данные будут загружены в память драйвера для сортировки.
saveAsTextFile(path) Запишите элементы набора данных в виде текстовых файлов в локальную файловую систему, HDFS или другую файловую систему, поддерживаемую Hadoop. Spark будет вызывать метод toString для каждого элемента, преобразуя элемент в одну строку записей в текстовом файле.
saveAsSequenceFile(path) Запишите элементы набора данных как Hadoop SequenceFile в локальную файловую систему, HDFS или другую файловую систему, поддерживаемую Hadoop. Эта операция требует, чтобы элементы в RDD реализовывали Writable-интерфейс Hadoop. Для языка Scala он может автоматически и неявно преобразовывать базовые типы данных в Spark в соответствующие типы с возможностью записи. (В настоящее время поддерживает только Java и Scala)
saveAsObjectFile(path) Используя Java для сериализации и хранения, вы можете использоватьSparkContext.objectFile()загрузить. (В настоящее время поддерживает только Java и Scala)
countByKey() Подсчитайте количество вхождений каждого ключа.
foreach(func) Перебирает каждый элемент в RDD и выполняет егоfunфункция

2.1 reduce

использовать функциюfuncВыполните операцию сокращения:

 val list = List(1, 2, 3, 4, 5)
sc.parallelize(list).reduce((x, y) => x + y)
sc.parallelize(list).reduce(_ + _)

// 输出 15

2.2 takeOrdered

Возврат перед сортировкой по естественному порядку или пользовательскому компараторуnэлементы. должен быть в курсеtakeOrderedНеявное преобразование с использованием неявных параметров, ниже приведен его исходный код. Поэтому при использовании пользовательской сортировки вам необходимо наследоватьOrdering[T]Реализуйте собственный компаратор, а затем введите его как неявный параметр.

def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope {
  .........
}

Пользовательская сортировка правил:

// 继承 Ordering[T],实现自定义比较器,按照 value 值的长度进行排序
class CustomOrdering extends Ordering[(Int, String)] {
    override def compare(x: (Int, String), y: (Int, String)): Int
    = if (x._2.length > y._2.length) 1 else -1
}

val list = List((1, "hadoop"), (1, "storm"), (1, "azkaban"), (1, "hive"))
//  引入隐式默认值
implicit val implicitOrdering = new CustomOrdering
sc.parallelize(list).takeOrdered(5)

// 输出: Array((1,hive), (1,storm), (1,hadoop), (1,azkaban)

2.3 countByKey

Подсчитайте количество вхождений каждого ключа:

val list = List(("hadoop", 10), ("hadoop", 10), ("storm", 3), ("storm", 3), ("azkaban", 1))
sc.parallelize(list).countByKey()

// 输出: Map(hadoop -> 2, storm -> 2, azkaban -> 1)

2.4 saveAsTextFile

Запишите элементы набора данных в виде текстовых файлов в локальную файловую систему, HDFS или другую файловую систему, поддерживаемую Hadoop. Spark будет вызывать метод toString для каждого элемента, преобразуя элемент в одну строку записей в текстовом файле.

val list = List(("hadoop", 10), ("hadoop", 10), ("storm", 3), ("storm", 3), ("azkaban", 1))
sc.parallelize(list).saveAsTextFile("/usr/file/temp")

использованная литература

RDD Programming Guide

Дополнительные статьи серии о больших данных см. в проекте с открытым исходным кодом GitHub.:Руководство для начинающих по большим данным