Три основные структуры данных основного программирования Spark

задняя часть Spark
Три основные структуры данных основного программирования Spark

Это третий день моего участия в Gengwen Challenge.Подробности о мероприятии, пожалуйста, проверьте:Обновить вызов

вводить

Для обеспечения высокого параллелизма и обработки данных с высокой пропускной способностью вычислительная среда Spark инкапсулирует три основные структуры данных для обработки различных сценариев приложений. Три структуры данных:

  • RDD: устойчивый распределенный набор данных
  • Аккумулятор: распределенные общие переменные только для записи
  • Широковещательные переменные: распределенные общие переменные только для чтения

1. СДР

1. Что такое СДР

RDD (Resilient Distributed Dataset) называется Resilient Distributed Dataset и является самой базовой моделью обработки данных в Spark. Код представляет собой абстрактный класс, представляющий эластичную, неизменяемую, разделимую коллекцию, элементы которой можно вычислять параллельно.

  • эластичность
    • Гибкость хранения: автоматическое переключение между памятью и диском;
    • Отказоустойчивая отказоустойчивость: потеря данных может быть автоматически восстановлена;
    • Вычислительная гибкость: механизм повтора ошибки вычисления;
    • Эластичность сегментирования: при необходимости можно выполнить повторное сегментирование.
  • Распределенный: данные хранятся на разных узлах кластера больших данных.
  • Набор данных: RDD инкапсулирует логику вычислений и не сохраняет данные.
  • Абстракция данных: RDD — это абстрактный класс, который должен быть реализован с помощью подклассов.
  • Неизменяемый: RDD инкапсулирует логику вычислений и не может быть изменен. Если вы хотите изменить, вы можете только создать новый RDD. Логику вычислений, инкапсулированную в новом RDD, можно разделить и распараллелить.

2. Основные атрибуты

 * Internally, each RDD is characterized by five main properties:
 *
 *  - A list of partitions
 *  - A function for computing each split
 *  - A list of dependencies on other RDDs
 *  - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
 *  - Optionally, a list of preferred locations to compute each split on (e.g. block locations for
 *    an HDFS file)
  • Список разделов
    • Наличие в RDD структуры данных списка разделов для выполнения задачи параллельных вычислений является важным атрибутом для достижения распределенных вычислений.
  /**
   * Implemented by subclasses to return the set of partitions in this RDD. This method will only
   * be called once, so it is safe to implement a time-consuming computation in it.
   *
   * The partitions in this array must satisfy the following property:
   *   `rdd.partitions.zipWithIndex.forall { case (partition, index) => partition.index == index }`
   */
  protected def getPartitions: Array[Partition]
  • Функция расчета раздела
    • Когда Spark вычисляет, он использует функцию раздела для расчета каждого раздела.
  /**
   * Implemented by subclasses to return how this RDD depends on parent RDDs. This method will only
   * be called once, so it is safe to implement a time-consuming computation in it.
   */
  protected def getDependencies: Seq[Dependency[_]] = deps

3. Принцип выполнения

С вычислительной точки зрения для обработки данных требуются вычислительные ресурсы (память и ЦП) и вычислительные модели (логика). Во время выполнения вычислительные ресурсы и вычислительные модели должны быть скоординированы и интегрированы. Когда среда Spark выполняется, она сначала обращается за ресурсами, а затем последовательно разбивает логику обработки данных приложения на вычислительные задачи. Затем задача отправляется на вычислительный узел, выделивший ресурсы, и данные рассчитываются по заданной вычислительной модели. Наконец получить результат расчета. RDD — это основная модель обработки данных в среде Spark.Далее давайте посмотрим, как RDD работает в среде Yarn: 1) Запустите среду кластера Yarn

image.png2) Spark создает узлы планирования и вычислительные узлы, запрашивая ресурсы.

image.png3) Фреймворк Spark разделяет вычислительную логику на разные задачи в соответствии с разделом в соответствии с требованиями.

image.png4) Узел планирования задач для вычислительного узла, соответствующего узлу, вычисляется на основе вычисленного состояния.

image.pngИз вышеуказанного процесса можно увидеть, что RDD в основном используется для инкапсулирования логики во всем процессе и генерировать задачу и отправлять его в узел Исполнителя для выполнения расчета. Далее давайте посмотрим, как RDD в Spark Framework выполняет данные обработка.

4. Базовое программирование

Базовое программирование RDD трех основных структур данных основного программирования Spark (1)

Базовое программирование RDD трех основных структур данных основного программирования Spark (2)

2. Аккумулятор

1. Принцип реализации

Аккумулятор используется для агрегирования переменной информации на стороне Исполнителя на стороне Водителя. Для переменных, определенных в программе Драйвера, каждая Задача на стороне Исполнителя получит новую копию этой переменной.После того как каждая задача обновит значения этих копий, она будет возвращена на сторону Драйвера для слияния.

2. Системный аккумулятор (3 типа)

    def main(args: Array[String]): Unit = {

        val sc = new SparkContext(
            new SparkConf().setMaster("local[*]").setAppName("MapPartitions")
        )

        val rdd = sc.makeRDD(List(
            1, 2, 5, 9
        ), 2)

        //数字累加
        val sum = sc.longAccumulator("sum")
        //集合累加
        //sc.collectionAccumulator("xx")
        //浮点累加
        //sc.doubleAccumulator("xxx")

        rdd.foreach(sum.add(_))

        println(s"sum = ${sum.value}")

        sc.stop()
    }

3. Пользовательский аккумулятор

    def main(args: Array[String]): Unit = {

        val sc = new SparkContext(
            new SparkConf().setMaster("local[*]").setAppName("MapPartitions")
        )

        val rdd = sc.makeRDD(List(
            "java scala js golang", "golang python java", "spark hadoop"
        ), 2)

        val myAccumulator = new MyAccumulator
        //注册累加器
        sc.register(myAccumulator)

        rdd.flatMap(_.split(" ")).foreach(myAccumulator.add)

        println(s"sum = ${myAccumulator.value}")

        sc.stop()
    }

    class MyAccumulator extends AccumulatorV2[String, mutable.Map[String, Int]] {

        private var map: mutable.Map[String, Int] = mutable.Map[String, Int]()

        /**
         * 执行顺序
         * 1。copy       先复制
         * 2。reset      然后重置
         * 3。isZero     判断是否初始化状态
         */

        /**
         * 判断为初始化状态
         *
         * @return 返回 false 会抛出异常
         */
        override def isZero: Boolean = map.isEmpty

        /**
         * 复制累加器对象
         *
         * @return
         */
        override def copy(): AccumulatorV2[String, mutable.Map[String, Int]] = new MyAccumulator

        /**
         * 重制
         */
        override def reset(): Unit = map.clear()

        /**
         * 分区内累加
         *
         * @param v
         */
        override def add(v: String): Unit = map.update(v, map.getOrElse(v, 0) + 1)

        /**
         * 分区间累加
         *
         * @param other
         */
        override def merge(other: AccumulatorV2[String, mutable.Map[String, Int]]): Unit = {
            map = map.foldLeft(other.value)((map, kv) => {
                map.update(kv._1, map.getOrElse(kv._1, 0) + kv._2)
                map
            })
        }

        /**
         * 取结果
         *
         * @return
         */
        override def value: mutable.Map[String, Int] = map
    }

3. Широковещательные переменные

1. Принцип реализации

Широковещательные переменные используются для эффективного распределения больших объектов. Отправляет большое значение только для чтения всем рабочим узлам для использования одной или несколькими операциями Spark. Например, если вашему приложению необходимо отправить большую таблицу поиска, доступную только для чтения, всем узлам, можно легко использовать широковещательные переменные. Используйте одну и ту же переменную в нескольких параллельных операциях, но Spark будет отправлять ее отдельно для каждой задачи.

2. Код

    def main(args: Array[String]): Unit = {

        val sc = new SparkContext(
            new SparkConf().setMaster("local[*]").setAppName("MapPartitions")
        )

        val rdd = sc.makeRDD(List(
            1, 2, 5, 9
        ), 2)

        val dict = Map((1, "前端"), (2, "后端"), (3, "大数据"), (5, "UI"), (6, "客户端"), (9, "产品"))
        //广播
        sc.broadcast(dict)

        rdd.map(id => (id, dict.getOrElse(id, "未知")))
            .collect()
            .foreach(println)

        sc.stop()
    }