Базовое программирование RDD трех основных структур данных основного программирования Spark (2)

Spark
Базовое программирование RDD трех основных структур данных основного программирования Spark (2)

Это второй день моего участия в Gengwen Challenge.Подробности мероприятия смотрите:Обновить вызов

Приквел

Базовое программирование RDD трех основных структур данных основного программирования Spark (1)

4. СДР-зависимости

4.1 RDD кровное родство

СДР поддерживают только грубые преобразования, т. е. одну операцию, выполняемую с большим количеством записей. Серия родословных, создающих RDD, записывается, чтобы можно было восстановить потерянные разделы. Происхождение RDD записывает информацию о метаданных RDD и поведение преобразования.Когда часть данных раздела RDD потеряна, он может повторно работать и восстанавливать потерянные разделы данных на основе этой информации.

def main(args: Array[String]): Unit = {

        val sc = new SparkContext(
            new SparkConf().setMaster("local[*]").setAppName("MapPartitions")
        )

        val rdd = sc.makeRDD(List(
            ("a", 1), ("a", 2), ("b", 3), ("b", 3), ("b", 3),
            ("b", 4), ("b", 5), ("a", 6)
        ), 2)
        println(rdd.toDebugString)

        println("--------------------------------------")

        val rdd1 = rdd.map(t => (t._1, t._2 * 2))
        println(rdd1.toDebugString)

        println("--------------------------------------")

        val rdd2 = rdd1.mapValues(_ + 100)
        println(rdd2.toDebugString)

        println("--------------------------------------")

        val rdd3 = rdd2.reduceByKey(_ + _)
        println(rdd3.toDebugString)

        println("--------------------------------------")

        val res = rdd2.collect()
        println(res.mkString("\n"))

    }
(2) ParallelCollectionRDD[0] at makeRDD at _1.scala:19 []
--------------------------------------
(2) MapPartitionsRDD[1] at map at _1.scala:27 []
 |  ParallelCollectionRDD[0] at makeRDD at _1.scala:19 []
--------------------------------------
(2) MapPartitionsRDD[2] at mapValues at _1.scala:32 []
 |  MapPartitionsRDD[1] at map at _1.scala:27 []
 |  ParallelCollectionRDD[0] at makeRDD at _1.scala:19 []
--------------------------------------
(2) ShuffledRDD[3] at reduceByKey at _1.scala:37 []
 +-(2) MapPartitionsRDD[2] at mapValues at _1.scala:32 []
    |  MapPartitionsRDD[1] at map at _1.scala:27 []
    |  ParallelCollectionRDD[0] at makeRDD at _1.scala:19 []
--------------------------------------

4.2 СДР-зависимости

Так называемая зависимость здесь на самом деле является отношением между двумя соседними RDD.

def main(args: Array[String]): Unit = {

        val sc = new SparkContext(
            new SparkConf().setMaster("local[*]").setAppName("MapPartitions")
        )

        val rdd = sc.makeRDD(List(
            ("a", 1), ("a", 2), ("b", 3), ("b", 3), ("b", 3),
            ("b", 4), ("b", 5), ("a", 6)
        ), 2)
        println(rdd.dependencies)

        println("--------------------------------------")

        val rdd1 = rdd.map(t => (t._1, t._2 * 2))
        println(rdd1.dependencies)

        println("--------------------------------------")

        val rdd2 = rdd1.mapValues(_ + 100)
        println(rdd2.dependencies)

        println("--------------------------------------")

        val rdd3 = rdd2.reduceByKey(_ + _)
        println(rdd3.dependencies)

        println("--------------------------------------")

        val res = rdd2.collect()
        println(res.mkString("\n"))

    }
List()
--------------------------------------
List(org.apache.spark.OneToOneDependency@38704ff0)
--------------------------------------
List(org.apache.spark.OneToOneDependency@44de94c3)
--------------------------------------
List(org.apache.spark.ShuffleDependency@2c58dcb1)
--------------------------------------

4.3 Узкие зависимости RDD

Узкая зависимость означает, что каждый родительский (восходящий) раздел RDD используется не более чем одним разделом дочернего (нисходящего) RDD.

@DeveloperApi
class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
  override def getParents(partitionId: Int): List[Int] = List(partitionId)
}

4.4 Широкие зависимости RDD

Широкая зависимость означает, что раздел одного и того же родительского (восходящего) RDD зависит от раздела нескольких дочерних (нисходящих) RDD, что приведет к перемешиванию.

@DeveloperApi
class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
    @transient private val _rdd: RDD[_ <: Product2[K, V]],
    val partitioner: Partitioner,
    val serializer: Serializer = SparkEnv.get.serializer,
    val keyOrdering: Option[Ordering[K]] = None,
    val aggregator: Option[Aggregator[K, V, C]] = None,
    val mapSideCombine: Boolean = false,
    val shuffleWriterProcessor: ShuffleWriteProcessor = new ShuffleWriteProcessor)
  extends Dependency[Product2[K, V]] {
  if (mapSideCombine) {
    require(aggregator.isDefined, "Map-side combine without Aggregator specified!")
  }
  override def rdd: RDD[Product2[K, V]] = _rdd.asInstanceOf[RDD[Product2[K, V]]]
  private[spark] val keyClassName: String = reflect.classTag[K].runtimeClass.getName
  private[spark] val valueClassName: String = reflect.classTag[V].runtimeClass.getName
  private[spark] val combinerClassName: Option[String] =
    Option(reflect.classTag[C]).map(_.runtimeClass.getName)
  val shuffleId: Int = _rdd.context.newShuffleId()
  val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
    shuffleId, this)
  _rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
  _rdd.sparkContext.shuffleDriverComponents.registerShuffle(shuffleId)
}

4.5 Подразделение этапа RDD

DAG (направленный ациклический граф) представляет собой направленный диаграмма топологии ациклического графа, состоящий из точек и линий, узор, имеющего направление, не закрытое. Например, DAG записывает этап RDD процесса и задач преобразования.

image.png

private[scheduler] def handleJobSubmitted(jobId: Int,
      finalRDD: RDD[_],
      func: (TaskContext, Iterator[_]) => _,
      partitions: Array[Int],
      callSite: CallSite,
      listener: JobListener,
      properties: Properties): Unit = {
    var finalStage: ResultStage = null
    try {
        //创建新阶段可能会抛出异常,例如,作业运行在
        //删除底层HDFS文件的HadoopRDD。
      finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
    } catch {
    .......
  }
  
  /**
   * 创建一个与提供的jobId相关联的ResultStage
   */
  private def createResultStage(
      rdd: RDD[_],
      func: (TaskContext, Iterator[_]) => _,
      partitions: Array[Int],
      jobId: Int,
      callSite: CallSite): ResultStage = {
    checkBarrierStageWithDynamicAllocation(rdd)
    checkBarrierStageWithNumSlots(rdd)
    checkBarrierStageWithRDDChainPattern(rdd, partitions.toSet.size)
    val parents = getOrCreateParentStages(rdd, jobId)
    val id = nextStageId.getAndIncrement()
    val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite)
    stageIdToStage(id) = stage
    updateJobIdStageIdMaps(jobId, stage)
    stage
  }
  
   /**
   * 获取或创建给定RDD的父阶段列表。新的stage将使用提供的firstJobId创建
   */
  private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
    getShuffleDependencies(rdd).map { shuffleDep =>
      getOrCreateShuffleMapStage(shuffleDep, firstJobId)
    }.toList
  }
  
  /**
  * 如果在shuffleIdToMapStage中存在shuffle map stage,则获取一个shuffle map stage。否则,如果洗牌地图阶段不存在,该方法将创建洗牌地图阶段,以及任何丢失的祖先洗牌地图阶段。
  */
  private def getOrCreateShuffleMapStage(
      shuffleDep: ShuffleDependency[_, _, _],
      firstJobId: Int): ShuffleMapStage = {
    shuffleIdToMapStage.get(shuffleDep.shuffleId) match {
      case Some(stage) =>
        stage

      case None =>
        //为所有缺失的祖先洗牌依赖创建阶段。
        getMissingAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep =>
          //即使getmissing祖宗shuffledependencies只返回shuffle依赖
          //没有在shuffleIdToMapStage中,当我们
          //在foreach循环中获取一个特定的依赖,它被添加到
          // shuffleIdToMapStage通过早期依赖的阶段创建过程。看到
          // SPARK-13902获取更多信息。
          if (!shuffleIdToMapStage.contains(dep.shuffleId)) {
            createShuffleMapStage(dep, firstJobId)
          }
        }
        //最后,为给定的shuffle依赖创建一个stage。
        createShuffleMapStage(shuffleDep, firstJobId)
    }
  }

4.6 Задача RDD

Сегментация задач RDD делится на: приложение, задание, этап и задачу.

  1. Приложение: инициализируйте SparkContext для создания приложения;
  2. Job: Оператор Action создаст Job;
  3. Стадия: Стадия равна количеству ShuffleDependency плюс 1;
  4. Задача: на этапе Этап количество разделов последнего RDD равно количеству Задач.

注意:Application->Job->Stage->Task每一层都是1对n的关系

//划分源码
val tasks: Seq[Task[_]] = try {
      val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array()
      stage match {
        case stage: ShuffleMapStage =>
          stage.pendingPartitions.clear()
          partitionsToCompute.map { id =>
            val locs = taskIdToLocations(id)
            val part = partitions(id)
            stage.pendingPartitions += id
            new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber,
              taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),
              Option(sc.applicationId), sc.applicationAttemptId, stage.rdd.isBarrier())
          }

        case stage: ResultStage =>
          partitionsToCompute.map { id =>
            val p: Int = stage.partitions(id)
            val part = partitions(p)
            val locs = taskIdToLocations(id)
            new ResultTask(stage.id, stage.latestInfo.attemptNumber,
              taskBinary, part, locs, id, properties, serializedTaskMetrics,
              Option(jobId), Option(sc.applicationId), sc.applicationAttemptId,
              stage.rdd.isBarrier())
          }
      }
    } catch {
      case NonFatal(e) =>
        abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e))
        runningStages -= stage
        return
    }
  }
  
// Figure out the indexes of partition ids to compute.
val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()

//findMissingPartitions 有两个实现类

//ShuffleMapStage实现
override def findMissingPartitions(): Seq[Int] = {
mapOutputTrackerMaster
  .findMissingPartitions(shuffleDep.shuffleId)
  .getOrElse(0 until numPartitions)

//ResultStage实现
  override def findMissingPartitions(): Seq[Int] = {
    val job = activeJob.get
    (0 until job.numPartitions).filter(id => !job.finished(id))
  }

5. Устойчивость RDD

5.1 кэш кеша RDD

  • RDD кэширует результаты предыдущих вычислений с помощью метода Cache или Persist.По умолчанию данные кэшируются в динамической памяти JVM. Однако RDD не кэшируется сразу при вызове двух методов, а когда срабатывает оператор последующего действия, RDD будет кэшироваться в памяти вычислительного узла и повторно использоваться позже.
    • Кэш может быть потерян или данные, хранящиеся в памяти, могут быть удалены из-за нехватки памяти Механизм отказоустойчивости кеша RDD гарантирует, что вычисления могут быть выполнены правильно, даже если кеш потерян. Через ряд преобразований на основе RDD потерянные данные будут пересчитаны.Поскольку каждый раздел RDD относительно независим, необходимо вычислить только потерянную часть, а все разделы не нужно пересчитывать.
    • Spark автоматически сохранит некоторые промежуточные данные операций Shuffle (например: reduceByKey). Цель этого состоит в том, чтобы избежать повторного вычисления всего ввода при сбое перетасовки узла. Однако в реальных условиях, если вы хотите повторно использовать данные, по-прежнему рекомендуется вызывать persist или cache.
def main(args: Array[String]): Unit = {

        val sc = new SparkContext(
            new SparkConf().setMaster("local[*]").setAppName("MapPartitions")
        )

        val rdd = sc.makeRDD(List(
            1, 2, 5, 9
        ), 2)

        val mapRdd = rdd.map(i => {
            println("map--------")
            ("a", i)
        })

        mapRdd.cache() //源码调用的persist()
        
        //指定存储级别
        //mapRdd.persist(StorageLevel.DISK_ONLY)

        mapRdd.reduceByKey(_ + _).collect().foreach(println)
        mapRdd.groupByKey().collect().foreach(println)

        sc.stop()


    }
StorageLevel的所有存储级别
  val NONE = new StorageLevel(false, false, false, false)
  val DISK_ONLY = new StorageLevel(true, false, false, false)
  val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
  val MEMORY_ONLY = new StorageLevel(false, true, false, true)
  val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
  val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
  val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
  val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
  val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
  val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
  val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
  val OFF_HEAP = new StorageLevel(true, true, true, false, 1)

5.2 Контрольно-пропускной пункт RDD CheckPoint

  • Так называемая контрольная точка фактически записывает промежуточные результаты RDD на диск.
  • Поскольку в крови, и зависимой слишком долго, вызвало затраты на толерантность, лучше проверить толерантность точек на средней стадии. Если контрольно-пропускной пункт имеет проблемы, вы можете начать кровь с контрольной точки, чтобы уменьшить накладные расходы.
  • Операция контрольной точки на RDD не будет выполняться немедленно, и для ее запуска необходимо выполнить операцию Action.
def main(args: Array[String]): Unit = {

        val sc = new SparkContext(
            new SparkConf().setMaster("local[*]").setAppName("MapPartitions")
        )

        //设置检查点存储路径
        sc.setCheckpointDir("./check")

        val rdd = sc.makeRDD(List(
            1, 2, 5, 9
        ), 2)

        val mapRdd = rdd.map(i => {
            println("map--------")
            ("a", i)
        })

        mapRdd.cache()
        // checkpoint会把前面的RDD执行两次 配合cache()即可只执行一次
        mapRdd.checkpoint()

        mapRdd.reduceByKey(_ + _).collect().foreach(println)
        mapRdd.groupByKey().collect().foreach(println)

        sc.stop()
        
    }

5.3 Разница между кэшированием и контрольными точками

  • Кэш кэша только сохраняет данные и не отсекает кровные зависимости. Контрольно-пропускные пункты отсекают кровные зависимости.
  • Данные, закэшированные Cache, обычно хранятся в таких местах, как диски и память, и их надежность низкая. Данные контрольных точек обычно хранятся в отказоустойчивых файловых системах с высокой доступностью, таких как HDFS, с высокой надежностью.
  • Рекомендуется использовать кэш-память для RDD функции checkpoint(), чтобы заданию контрольной точки нужно было только считывать данные из кэш-памяти, в противном случае RDD необходимо пересчитывать с нуля.

6. Разделитель RDD

В настоящее время Spark поддерживает разделы Hash, Range и определяемые пользователем разделы. Раздел Hash является текущим разделом по умолчанию. Разделитель напрямую определяет количество разделов в RDD, в какой раздел входит каждый фрагмент данных в RDD после перемешивания, а затем определяет количество сокращений.

  • Только RDD типа Key-Value имеют разделители, а значение разделов RDD типов, отличных от Key-Value, равно None.
  • Диапазон идентификаторов разделов каждого RDD: 0 ~ (numPartitions - 1), который определяет, к какому разделу принадлежит это значение.

6.1 Хэш-раздел: для заданного ключа вычислите его хэш-код и разделите на количество разделов, чтобы получить остаток.

  • HashPartitionerСмотрите конкретный код

6.2 Раздел диапазона: сопоставьте определенный диапазон данных с одним разделом, постарайтесь убедиться, что данные в каждом разделе однородны, а разделы упорядочены.

  • RangePartitionerКонкретный код можно посмотреть самостоятельно

6.3 Пользовательский раздел

def main(args: Array[String]): Unit = {

        val sc = new SparkContext(
            new SparkConf().setMaster("local[*]").setAppName("MapPartitions")
        )

        val rdd = sc.makeRDD(List(
            ("a", 1), ("a", 2), ("b", 3), ("b", 3), ("b", 3),
            ("b", 4), ("b", 5), ("a", 6), ("c", 6), ("d", 6)
        ),5 )

        val parRDD = rdd.partitionBy(new MyPartitioner)

        val reduceRdd1 = parRDD.reduceByKey((i,j)=>{
            println("reduceRdd1")
            i+j
        })
        val reduceRdd2 = reduceRdd1.reduceByKey((i,j)=>{
            println("reduceRdd2")
            i+j
        })

        reduceRdd2.saveAsTextFile("out1")

        sc.stop()


    }

    /**
     * 实现把
     * a   分到0区
     * b   分到1区
     * 其他 分到2区
     */
    class MyPartitioner extends Partitioner {
        /**
         * numPartitions 和 getPartition 是必须重写的方法
         */
        override def numPartitions: Int = 3
        override def getPartition(key: Any): Int = key match {
            case "a" => 0
            case "b" => 1
            case _ => 2
        }

        /**
         * equals 和 hashCode 可选
         */
        override def equals(other: Any): Boolean = other match {
            case h: HashPartitioner =>
                h.numPartitions == numPartitions
            case _ =>
                false
        }
        override def hashCode: Int = numPartitions

    }

7. Чтение и сохранение файла RDD

Чтение и хранение данных в Spark можно разделить на два аспекта: формат файла и файловая система. Формат файла делится на: текстовый файл, файл csv, файл последовательности и объектный файл; Файловая система делится на: локальную файловую систему, HDFS, HBASE и базу данных.

7.1 текстовые файлы

def main(args: Array[String]): Unit = {

        val sc = new SparkContext(
            new SparkConf().setMaster("local[*]").setAppName("MapPartitions")
        )

        val rdd = sc.makeRDD(List(
            ("a", 1), ("a", 2), ("b", 3), ("b", 3), ("b", 3)
        ), 1)

        /**
         * text文件
         */
        rdd.saveAsTextFile("text")
        sc.textFile("text").collect().foreach(println)

        sc.stop()
    }

7.2 Файл последовательности

Файл SequenceFile — это плоский файл (Flat File), разработанный Hadoop для хранения пар ключ-значение в двоичной форме. В SparkContext вы можете позвонитьsequenceFile[keyClass, valueClass](path).

def main(args: Array[String]): Unit = {

        val sc = new SparkContext(
            new SparkConf().setMaster("local[*]").setAppName("MapPartitions")
        )

        val rdd = sc.makeRDD(List(
            ("a", 1), ("a", 2), ("b", 3), ("b", 3), ("b", 3)
        ), 1)

        /**
         * Sequence文件
         */
        rdd.saveAsSequenceFile("sequence")
        sc.sequenceFile[String, Int]("sequence").collect().foreach(println)

        sc.stop()
    }

7.3 объект объектный файл

Объектный файл — это файл, сохраняемый после сериализации объекта с использованием механизма сериализации Java. в состоянии пройтиobjectFile[T: ClassTag](path)Функция получает путь, читает объектный файл и возвращает соответствующий RDD.Его также можно вызвать, вызвавsaveAsObjectFile()Реализует вывод в объектные файлы. Поскольку это сериализация, вам нужно указать тип.

def main(args: Array[String]): Unit = {

        val sc = new SparkContext(
            new SparkConf().setMaster("local[*]").setAppName("MapPartitions")
        )

        val rdd = sc.makeRDD(List(
            ("a", 1), ("a", 2), ("b", 3), ("b", 3), ("b", 3)
        ), 1)

        /**
         * object文件
         */
        rdd.saveAsObjectFile("object")
        sc.objectFile[(String, Int)]("object").collect().foreach(println)

        sc.stop()
    }