Базовое программирование RDD трех основных структур данных основного программирования Spark (1)

Большие данные Spark
Базовое программирование RDD трех основных структур данных основного программирования Spark (1)

Это первый день моего участия в Gengwen Challenge, смотрите подробности мероприятия:Обновить вызов

1 создание СДР

Существует четыре способа создания RDD в Spark:

  • Создать RDD из коллекции (в памяти)
    • Создайте RDD из коллекции, Spark в основном предоставляет два метода: parallelize и makeRDD.
    • Распараллеливание вызывается базовым кодом makeRDD, поэтому два метода одинаковы.
    //内存创建RDD
    def main(args: Array[String]): Unit = {

        val sc: SparkContext = new SparkContext(
            new SparkConf()
                .setMaster("local")
                .setAppName("Rdd-Mem")
        )

        val rdd1: RDD[Int] = sc.makeRDD(
            List(1, 2, 4, 5, 6)
        )
        val rdd2: RDD[Int] = sc.parallelize(
            Array(1, 2, 3, 4, 5, 6)
        )

        rdd1.collect().foreach(println)
        rdd2.collect().foreach(println)
    }

  //makeRDD源码
  def makeRDD[T: ClassTag](
      seq: Seq[T],
      numSlices: Int = defaultParallelism): RDD[T] = withScope {
      parallelize(seq, numSlices)
  }
  • Создать RDD из внешнего хранилища (файл)
    • Создание RDD из наборов данных во внешних системах хранения включает в себя: локальные файловые системы, все наборы данных, поддерживаемые Hadoop, такие как HDFS, HBase и т. д.
    def main(args: Array[String]): Unit = {

        val sc = new SparkContext(
            new SparkConf()
                .setMaster("local")
                .setAppName("Rdd-File")
        )

        val rdd1: RDD[String] = sc.textFile("data")
        //wholeTextFiles Tuple第一个数据为文件全路径 Tuple第二个为每行数据
        val rdd2: RDD[(String, String)] = sc.wholeTextFiles("data/word*.txt")

        rdd1.collect().foreach(println)
        rdd2.collect().foreach(println)

    }
  • Создать из другого RDD
    • В основном с помощью операции RDD, а затем создать новый RDD. Подробную информацию см. в следующих главах.
  • Создать RDD напрямую (новое)
    • RDD строится напрямую с использованием нового метода, который обычно используется самой инфраструктурой Spark.

2 Параллелизм и разделение RDD

По умолчанию Spark может разделить задание на несколько задач и отправить их узлу Executor для параллельных вычислений, а количество задач, которые могут быть вычислены параллельно, называется степенью параллелизма. Эту сумму можно указать при построении RDD. Помните, что количество задач, выполняемых параллельно, здесь не относится к количеству разделенных задач, так что не запутайтесь.

val sparkConf = new SparkConf().setMaster("local[*]").setAppName("spark")
val sc = new SparkContext(sparkConf)
val dataRDD: RDD[Int] =
    sc.makeRDD(
        List(1,2,3,4),
        4)
val fileRDD: RDD[String] =
    sc.textFile(
        "input",
        2)
fileRDD.collect().foreach(println)
sparkContext.stop()
  • При чтении данных памяти данные могут быть разделены в соответствии с настройкой параллелизма.Исходный код ядра Spark правил разделения данных выглядит следующим образом:
def slice[T: ClassTag](seq: Seq[T], numSlices: Int): Seq[Seq[T]] = {
    if (numSlices < 1) {
      throw new IllegalArgumentException("Positive number of partitions required")
    }
    // Sequences need to be sliced at the same set of index positions for operations
    // like RDD.zip() to behave as expected
    //计算每个分区开始位置和结束位置
    //[1,2,3,4,5] 分成两个分区后会成为 [1,2][3,4,5]
    def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
      (0 until numSlices).iterator.map { i =>
        val start = ((i * length) / numSlices).toInt
        val end = (((i + 1) * length) / numSlices).toInt
        (start, end)
      }
    }
    //下面为具体的拆分代码
    seq match {
      case r: Range =>
        positions(r.length, numSlices).zipWithIndex.map { case ((start, end), index) =>
          // If the range is inclusive, use inclusive range for the last slice
          if (r.isInclusive && index == numSlices - 1) {
            new Range.Inclusive(r.start + start * r.step, r.end, r.step)
          }
          else {
            new Range(r.start + start * r.step, r.start + end * r.step, r.step)
          }
        }.toSeq.asInstanceOf[Seq[Seq[T]]]
      case nr: NumericRange[_] =>
        // For ranges of Long, Double, BigInteger, etc
        val slices = new ArrayBuffer[Seq[T]](numSlices)
        var r = nr
        for ((start, end) <- positions(nr.length, numSlices)) {
          val sliceSize = end - start
          slices += r.take(sliceSize).asInstanceOf[Seq[T]]
          r = r.drop(sliceSize)
        }
        slices
      case _ =>
        val array = seq.toArray // To prevent O(n^2) operations for List etc
        positions(array.length, numSlices).map { case (start, end) =>
            array.slice(start, end).toSeq
        }.toSeq
    }
  }

  • При чтении файловых данных данные нарезаются и секционируются в соответствии с правилами чтения файлов Hadoop, а правила нарезки и правила чтения данных несколько отличаются.Конкретный исходный код ядра Spark выглядит следующим образом.
override def getPartitions: Array[Partition] = {
    val jobConf = getJobConf()
    SparkHadoopUtil.get.addCredentials(jobConf)
    try {
      // 分区
      val allInputSplits = getInputFormat(jobConf).getSplits(jobConf, minPartitions)
 ..........

// 具体如何分区
public InputSplit[] getSplits(JobConf job, int numSplits)
    throws IOException {

    long totalSize = 0;
    for (FileStatus file: files) {
      if (file.isDirectory()) {
        throw new IOException("Not a file: "+ file.getPath());
      }
      totalSize += file.getLen();
    }

    long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
    long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.
      FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize);
      
    ...
    
    for (FileStatus file: files) {
    
        ...
    
    if (isSplitable(fs, path)) {
          long blockSize = file.getBlockSize();
          long splitSize = computeSplitSize(goalSize, minSize, blockSize);

          ...

  }
  protected long computeSplitSize(long goalSize, long minSize,
                                       long blockSize) {
    return Math.max(minSize, Math.min(goalSize, blockSize));
  }

3. Сериализация СДР

  • Проверка закрытия

С вычислительной точки зрения код, отличный от оператора, выполняется на стороне Драйвера, а код внутри оператора выполняется на стороне Исполнителя. Тогда в функциональном программировании scala данные вне оператора часто используются в операторе, что формирует эффект замыкания.Если данные вне оператора не могут быть сериализованы, значит, данные вне оператора не могут быть сериализованы.Если значение передается Исполнителю на исполнение, возникнет ошибка, поэтому перед выполнением расчета задачи необходимо определить, можно ли сериализовать объекты в замыкании.Эта операция называется обнаружением замыкания. Компиляция закрытия изменилась со Scala 2.12

  • Методы и свойства сериализации

С вычислительной точки зрения код, отличный от оператора, выполняется на стороне драйвера, а код внутри оператора выполняется на стороне исполнителя.Код выглядит следующим образом:

def main(args: Array[String]): Unit = {

        val sc = new SparkContext(
            new SparkConf().setMaster("local[*]").setAppName("测试序列化")
        )

        val dept1 = new Dept(1, "研发部")
        val dept0 = new Dept(0, "未知")

        val rdd = sc.makeRDD(List(
            ("a", 1), ("a", 2), ("b", 3), ("b", 1),
            ("b", 4), ("F", 5), ("K", 6)
        ))

        rdd.map(t => {
            t._2 match {
                case 1 => (t._1, dept1)
                case _ => (t._1, dept0)
            }
        }).collect() foreach println


    }
    
    class Dept(var id: Int, var name: String) extends Serializable {
        override def toString: String = id + "\t" + name
    }
//校验的代码
 private def clean(
      func: AnyRef,
      checkSerializable: Boolean,
      cleanTransitively: Boolean,
      accessedFields: Map[Class[_], Set[String]]): Unit = {
    ..............
    // 校验序列化
    if (checkSerializable) {
      ensureSerializable(func)
    }
  }
  private def ensureSerializable(func: AnyRef): Unit = {
    try {
      if (SparkEnv.get != null) {
        SparkEnv.get.closureSerializer.newInstance().serialize(func)
      }
    } catch {
      case ex: Exception => throw new SparkException("Task not serializable", ex)
    }
  }

//不实现序列号接口会跑出如下异常
//Exception in thread "main" org.apache.spark.SparkException: Task not serializable
//	at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:416)
//	at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:406)
//	at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)

  • Каркас сериализации Kryo

адрес проекта:GitHub.com/esoteric, так что F…
Сериализация Java может сериализовать любой класс. Но он относительно тяжелый (больше байт), и после сериализации представление объекта тоже относительно большое. Spark Из соображений производительности Spark 2.0 начал поддерживать другой механизм сериализации Kryo. Kryo в 10 раз быстрее, чем Serializable. Простые типы данных, массивы и строковые типы уже сериализованы внутри Spark с помощью Kryo, в то время как RDD перемешивает данные.
Примечание. Даже при сериализации Kryo наследование от интерфейса Serializable.

def main(args: Array[String]): Unit = {

        val sc = new SparkContext(
            new SparkConf()
                .setMaster("local[*]")
                .setAppName("测试序列化")
                .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
                .registerKryoClasses(Array(classOf[Dept]))
        )

        val dept1 = new Dept(1, "研发部")
        val dept0 = new Dept(0, "未知")

        val rdd = sc.makeRDD(List(
            ("a", 1), ("a", 2), ("b", 3), ("b", 1),
            ("b", 4), ("F", 5), ("K", 6)
        ))

        rdd.map(t => {
            t._2 match {
                case 1 => (t._1, dept1)
                case _ => (t._1, dept0)
            }
        }).collect() foreach println


    }

    class Dept(var id: Int, var name: String) extends Serializable {
        override def toString: String = id + "\t" + name
    }