Заметки по началу работы со Spark

Spark

Исходный код этой статьи основан на spark 2.2.0.

Базовые концепты

Application

Программа Spark, написанная пользователем, выполняется через класс с основным методом для завершения обработки вычислительной задачи. Он состоит из программы-драйвера и набора исполнителей, работающих в кластере Spark.

RDD

Устойчивые распределенные наборы данных. RDD — это основная структура данных Spark, которой можно манипулировать с помощью ряда операторов. Когда RDD встречает оператор Action, все предыдущие операторы формируются в направленный ациклический граф (DAG). Затем преобразуйте его в задание в Spark и отправьте в кластер для выполнения.

Использовать DataFrame/DateSet после spark2.x

SparkContext

SparkContext — это точка входа Spark, отвечающая за подключение к кластерам Spark, создание RDD, накопление и трансляцию и т. д. По сути, SparkContext — это внешний интерфейс Spark, отвечающий за предоставление вызывающей стороне различных функций Spark.

SparkContext在Spark中的主要功能
Программа-драйвер подключается к диспетчеру кластера через SparkContext для управления задачами в кластере. Настройка параметров конфигурации Spark и управление SQLContext, HiveContext и StreamingContext также выполняются через SparkContext.

Only one SparkContext may be active per JVM. You must stop() the active SparkContext before creating a new one. This limitation may eventually be removed; see SPARK-2243 for more details.
Для каждой JVM существует только один SparkContext, и один сервер может запускать несколько JVM.

SparkSession

The entry point to programming Spark with the Dataset and DataFrame API.
Содержит sqlcontext и hivecontext

Driver

Процесс виртуальной машины Java, выполняющий основной метод, отвечает за мониторинг связи и соединения, отправляемого процессом-исполнителем искрового приложения, и за отправку файла jar проекта всем процессам-исполнителям.
Драйвер взаимодействует с Мастером и Рабочим для завершения запуска процесса приложения, разделения DAG, инкапсуляции вычислительных задач, назначения задач исполнителям, выделения вычислительных ресурсов и т. д. для планирования и выполнения заданий и т. д.
Драйвер планирует задачи, которые должны выполняться исполнителем, поэтому для драйвера лучше всего находиться в сети со искровым кластером для связи с ним.
Процесс драйвера обычно находится на рабочем узле, а не на том же узле, что и диспетчер кластера.

Менеджер кластера действует на весь кластер saprk (распределение ресурсов кластера), на все приложения, а Драйвер действует на приложение (координирует ресурсы, которые были выделены для приложения), и уровень управления отличается

Worker

Рабочие узлы в кластере, запускают и запускают процессы-исполнители, узлы, выполняющие код заданий
В автономном режиме: узел, на котором находится рабочий процесс.
В режиме пряжи: узел, в котором находится процесс nodemanager пряжи.

Executor

Работает на рабочих узлах, отвечает за операции миссии и данные, хранящиеся в памяти или на диске.
Каждое приложение spark имеет свой собственный процесс-исполнитель, и приложение spark не использует общий процесс-исполнитель.

В параметрах запуска естьexecutor-cores,executor-memoryКаждый Executor будет занимать ядро ​​​​процессора и память, и приложение spark не будет повторно использовать Executor, это легко вызвать нехватку рабочих ресурсов.

исполнители могут быть динамически добавлены/выпущены в течение всего жизненного цикла приложения spark, см.Динамическое распределение ресурсоводин период
Исполнитель использует несколько потоков для выполнения задач, назначенных SparkContext, и выполняет пакет задач по мере их поступления.

Job

Зажигание может быть разделено на несколько заданий,Каждый раз, когда действие вызывается, задание будет сгенерировано логически, а задание содержитодин или несколько этапов.

Stage

Каждое задание будет разделено на один или несколько этапов (стадий), каждая стадия будет иметь номер, соответствующий задаче (т.

Этап включает две категории: ShuffleMapStage и ResultStage.Если пользовательская программа вызывает оператор, которому необходимо выполнить вычисления в случайном порядке, например groupByKey, он будет разделен на ShuffleMapStage и ResultStage с Shuffle в качестве границы.
Если тасование не выполняется, остается только один этап

TaskSet

Набор задач, которые связаны между собой, но не имеют зависимостей в случайном порядке друг от друга. Этап можно напрямую сопоставить с набором задач, набор задач инкапсулирует задачу, которую нужно выполнить один раз и которая имеет одинаковую логику обработки. Эти задачи можно вычислять параллельно. и крупнозернистое планирование.Он находится в модуле TaskSet.

Стадия, соответствующая набору задач

Task

Драйвер отправляет его в вычислительный блок, выполненный в исполнителе. Каждая задача отвечает за обработку небольшого произведения данных на сцене и расчет соответствующего результата.
Задача — это основная единица, работающая на физических узлах.Задача включает две категории: ShuffleMapTask и ResultTask, которые соответствуют одной из основных единиц выполнения ShuffleMapStage и ResultStage в Stage соответственно.
Существует однозначное соответствие между InputSplit-task-partitions, и Spark будет запускать задачу для каждой секции для обработки (см. эту статью).Точка знаний — взаимосвязь между количеством узлов в кластере Spark, количеством разделов RDD, количеством ядер ЦП и степенью параллелизмаодин период)
Установите вручную количество задачspark.default.parallelism

Cluster Manager

Менеджер кластера, файловое приложение в планировании кластера и выделение ресурсов для каждого компонента искры, например Spark Standalone, YARN, Mesos и т. д.

Deploy Mode

Независимо от того, является ли он автономным или пряжей, он делится на два режима: клиентский и кластерный.Разница в том, чтокуда бежит водитель
В клиентском режиме драйвер запускается на машине, отправляющей искровое задание, и вы можете просматривать подробную информацию журнала в режиме реального времени, что удобно для отслеживания и устранения ошибок, а также используется для тестирования.
В режиме кластера приложение Spark отправляется диспетчеру кластера, а диспетчер кластера отвечает за запуск процессов драйвера на узле в кластере.

Обычно драйвер и воркер лучше всего в одной сети, а клиент скорее всего драйвер и воркер устроены отдельно, поэтому сетевое общение занимает очень много времени, а у кластера такой проблемы нет.

автономный режим

Мастер для управления кластером
Мастер-процесс и кластер рабочего процесса могут не нуждаться ни в Yarn-кластерах, ни в HDFS.

Master

В автономном режиме — диспетчер кластера (Cluster Manager) — компонент, планирующий и распределяющий ресурсы в кластере для каждого приложения spark.

Обратите внимание на разницу и драйвер, то есть разницу между Cluster Manager и драйвером.

режим пряжи

пряжа для управления кластером
Кластер, состоящий из процесса ResourceManager и процесса NodeManager

DAGScheduler

Создайте группу обеспечения доступности баз данных на основе этапа в соответствии с заданием и отправьте этап в планировщик задач.

TaskScheduler

Отправьте набор задач в кластер рабочих узлов для запуска и возврата результата.

spark组件-百度脑图

Основной принцип работы искры

Водитель применяет ресурсы для мастера;
Мастер просит Работника назначить конкретного Исполнителя на программу
Драйвер передает разделенную Задачу Исполнителю, который является кодом бизнес-логики нашей программы Spark.

Генерация заданий, разделение этапов и распределение задач происходят на стороне водителя Да

Spark VS MapReduce

Самая большая разница между Spark и MapReduce: итеративные вычисления

  • MapReduce
    Задание разделено на два этапа: сопоставление и уменьшение, и оно завершается после обработки двух этапов.
  • Spark
    Его можно разделить на n этапов, которые являются итеративными по памяти.

RDD

Полное название — Resilient Distributed Dataset, то есть эластичный распределенный набор данных.
Обеспечивает отказоустойчивость, может автоматически пересчитывать исходные данные и восстанавливаться после сбоев узлов.
По умолчанию в памяти, если памяти недостаточно, запись на диск
Распространяется RDD, данные распределяются по группе узлов, и каждый узел хранит часть раздела RDD.

Если память RDD недостаточно, она будет автоматически записана на диск. Вызов Cache () и сохраняются () хранит данные RDD в соответствии с Storelevel.

создание СДР

  1. SparkContext.wholeTextFiles()Для большого количества небольших файлов в каталоге верните<filename,fileContent>ПараRDD
  2. SparkContext.sequenceFile[K,V]()Для SequenceFile могут быть созданы RDD, а универсальные типы K и V являются типами ключа и значения SequenceFile. Требования K и V должны быть типами сериализации Hadoop, такими как IntWritable, Text и т. д.
  3. SparkContext.hadoopRDD()RDD могут быть созданы для пользовательских типов ввода для Hadoop. Этот метод получает класс JobConf, InputFormatClass, Key и Value.
  4. SparkContext.objectFile()метод, который можно вызвать передRDD.saveAsObjectFile()Созданный объект сериализует файл, десериализует данные в файле и создает RDD.

Создайте RDD параллельно
перечислитьparallelize()метод, вы можете указать, на сколько разделов разделить коллекцию (на самом деле он должен указать количество InputSplit, InputSplit-task-partition), Spark запустит задачу для каждого обрабатываемого раздела (см.Связь между количеством узлов, количеством разделов RDD, количеством ядер ЦП в кластере Spark и степенью параллелизма.один период)
Официальная рекомендация Spark создавать от двух до четырех разделов в кластере на каждый ЦП, чтобы избежать нагрузки на ЦП.

Если кластер работает несколько задач, в том числе Spark Hadoop Task, будь то задача CPU Core вычислений 2-4 для настройки?

Трансформация и действие

Transformation

Создайте новый RDD против существующего RDD
трансформация имеетlazyФункции, которые просто записывают операции, выполненные в RDD, но не выполняются спонтанно. Только после операции Action будут выполнены все преобразования, что позволит избежать получения слишком большого количества промежуточных результатов.

действовать вводить
map Передайте каждый элемент в RDD в пользовательскую функцию, получите новый элемент, а затем используйте новый элемент, чтобы сформировать новый RDD
filter Судя по каждому элементу в RDD, если он возвращает значение true, он сохраняется, а если он возвращает значение false, он удаляется.
flatMap Похоже на карту, он сплющен после отображения
gropuByKey Сгруппировать по ключу, каждый ключ соответствует Iterable
reduceByKey Выполните операцию уменьшения значения, соответствующего каждому ключу.
sortByKey Отсортируйте значение, соответствующее каждому ключу.
join Выполните операцию соединения на двух RDDS, содержащих пары . Пара на каждом ключее соединение будет передана на пользовательскую функцию для обработки.
cogroup То же, что и соединение, но Iterable, соответствующий каждому ключу, будет передан в пользовательскую функцию для обработки.

Разница между картой и плоской картой
map выполняет функциональные операции над элементами в rdd один за другим и сопоставляет их с другим rdd.
flatMap работает с каждым элементом в коллекции, а затем сглаживает его. Часто используется для разделения слов

Эксперимент: будет ли flatMap сглаживать многоуровневые вложенные элементы
Экспериментальный вывод: выполняйте операцию выравнивания только для следующего слоя и не возвращайтесь к выполнению операции выравнивания.

val arr = sc.parallelize(Array(("A", 1), ("B", 2), ("C", 3)))
arr.flatMap(x => (x._1 + x._2)).foreach(print)  //A1B2C3

val arr2 = sc.parallelize(Array(
                              Array(
                                ("A", 1), ("B", 2), ("C", 3)),
                              Array(
                                ("C", 1), ("D", 2), ("E", 3)),
                              Array(
                                ("F", 1), ("G", 2), ("H", 3))))
arr2.flatMap(x => x).foreach(print)  //(A,1)(B,2)(C,3)(C,1)(D,2)(E,3)(F,1)(G,2)(H,3)

val arr3 = sc.parallelize(Array(
                              Array(
                                Array(("A", 1), ("B", 2), ("C", 3))),
                              Array(
                                Array(("C", 1), ("D", 2), ("E", 3))),
                              Array(
                                Array(("F", 1), ("G", 2), ("H", 3)))))
arr3.flatMap(x => x).foreach(print)  //[Lscala.Tuple2;@11074bf8 [Lscala.Tuple2;@c10a22d [Lscala.Tuple2;@40ef42cd

исходный код карты и flatMap

  def map[B](f: A => B): Iterator[B] = new AbstractIterator[B] {
    def hasNext = self.hasNext
    //直接遍历元素,对元素应用f方法
    def next() = f(self.next())
  }

  /** Creates a new iterator by applying a function to all values produced by this iterator
   *  and concatenating the results.
   *
   *  @return  the iterator resulting from applying the given iterator-valued function
   *           `f` to each value produced by this iterator and concatenating the results.
   */
  def flatMap[B](f: A => GenTraversableOnce[B]): Iterator[B] = new AbstractIterator[B] {
    private var cur: Iterator[B] = empty
    //这一步只是取当前元素的Iterator,没有递归往下层取
    private def nextCur() { cur = f(self.next()).toIterator }
    def hasNext: Boolean = {
      while (!cur.hasNext) {
        if (!self.hasNext) return false
        nextCur()
      }
      true
    }
    //在调用next方法时,最终会调用到nextCur方法
    def next(): B = (if (hasNext) cur else empty).next()
  }

join VS cogroup VS fullOuterJoin VS leftOuterJoin VS rightOuterJoin

val studentList = Array(
  Tuple2(1, "leo"),
  Tuple2(2, "jack"),
  Tuple2(3, "tom"));
val scoreList = Array(
  Tuple2(1, 100),
  Tuple2(2, 90),
  Tuple2(2, 90),
  Tuple2(4, 60));
val students = sc.parallelize(studentList);
val scores = sc.parallelize(scoreList);
/*
 * (4,(CompactBuffer(),CompactBuffer(60)))
 * (1,(CompactBuffer(leo),CompactBuffer(100)))
 * (3,(CompactBuffer(tom),CompactBuffer()))
 * (2,(CompactBuffer(jack),CompactBuffer(90, 90)))
 */
val studentCogroup = students.cogroup(scores)   //union key数组延长
/*
 * (1,(leo,100))
 * (2,(jack,90))
 * (2,(jack,90))
 */
val studentJoin = students.join(scores) //交集
/*
 * (4,(None,Some(60)))
 * (1,(Some(leo),Some(100)))
 * (3,(Some(tom),None))
 * (2,(Some(jack),Some(90)))
 * (2,(Some(jack),Some(90)))
 */
val studentFullOuterJoin = students.fullOuterJoin(scores) //some可为空 union
/*
 * (1,(leo,Some(100)))
 * (3,(tom,None))
 * (2,(jack,Some(90)))
 * (2,(jack,Some(90)))
 */
val studentLeftOuterJoin = students.leftOuterJoin(scores) //左不为空
/*
 * (4,(None,60))
 * (1,(Some(leo),100))
 * (2,(Some(jack),90))
 * (2,(Some(jack),90))
 */
val studentRightOuterJoin = students.rightOuterJoin(scores) //右不为空

Action

Выполнение заключительных операций над RDD, таких как обход, сокращение, сохранение и т. д.,Запустите операцию вычисления и верните значение в программу пользователя или запишите данные во внешнее хранилище.
встречаЗапустить задание искры для запуска, тем самым запуская выполнение всех преобразований до этого действия Для RDD Tuple2, которые работают с парами ключ-значение, например groupByKey, scala реализуется путем неявного преобразования в PairRDDFunction, а затем предоставляется соответствующий метод groupByKey.Необходимо вручную импортировать соответствующее неявное преобразование Spark.import org.apache.spark.SparkContext._

Для groupByKey saprk2.2 явно использует HashPartitioner, не видит неявного преобразования в PairRDDFunction Операция Action должна возвращать результат драйверу? Да, см. метод runJob ниже.

Рабочие характеристики действия
Операция действия должна вызываться в исходном коде.runJob()Метод, это может быть прямо или косвенно

    //直接调用了runJob
  def collect(): Array[T] = withScope {
    val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
    Array.concat(results: _*)
  }
  
  /**
   * Run a function on a given set of partitions in an RDD and pass the results to the given
   * handler function. This is the main entry point for all actions in Spark.
   *
   * @param resultHandler callback to pass each result to
   */
   //会把结果传递给handler function,handle function就是对返回结果进行处理的方法
   //如上文的collect方法的handler function就是 (iter: Iterator[T]) => iter.toArray
  def runJob[T, U: ClassTag](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      resultHandler: (Int, U) => Unit): Unit = {
    if (stopped.get()) {
      throw new IllegalStateException("SparkContext has been shutdown")
    }
    val callSite = getCallSite
    val cleanedFunc = clean(func)
    logInfo("Starting job: " + callSite.shortForm)
    if (conf.getBoolean("spark.logLineage", false)) {
      logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
    }
    dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
    progressBar.foreach(_.finishAll())
    rdd.doCheckpoint()
  }
действовать вводить
reduce Агрегируйте все элементы в RDD. Первый и второй элементы агрегируются, значение агрегируется с третьим элементом, значение агрегируется с четвертым элементом и так далее.
collect В RDD все элементы попадают к локальному клиенту. Обратите внимание, что проблемы с передачей данных,spark.driver.maxResultSizeВы можете ограничить максимальное количество наборов результатов, возвращаемых оператором действия драйверу.
count Получите общее количество элементов RDD.
take(n) Получите первые n элементов в RDD.
saveAsTextFile Сохраните элементы RDD в файл, вызвав метод toString для каждого элемента.
countByKey Подсчитайте значение, соответствующее каждому ключу.
foreach Перебрать каждый элемент в RDD.
//从本地文件创建
val lines = spark.sparkContext.textFile("hello.txt")
//Transformation,返回(key,value)的RDD
val linePairs = lines.map(line => (line, 1))
//Transformation,隐式装换为PairRDDFunction,提供reduceByKey等方法
//源码中是用HashPartitioner
val lineCounts = linePairs.reduceByKey(_ + _)
//Action,发送到driver端执行
lineCounts.foreach(lineCount => println(lineCount._1 + " appears " + lineCount._2 + " times."))

mapPartitions

карта: обрабатывать один раздел за разодинданные
mapPartitions: обрабатывать один раздел за развсеДанные
используемые сцены:
Объем данных RDD не особенно велик.Рекомендуется использовать оператор mapPartitions вместо оператора карты, что может ускорить скорость обработки.Если объем данных RDD особенно велик, не рекомендуется использовать mapPartitions, что может привести к переполнению памяти.

val studentScoresRDD = studentNamesRDD.mapPartitions { it =>
    var studentScoreList = Array("a")
    while (it.hasNext) {
      ...
    }
    studentScoreList.iterator
}

mapPartitionsWithIndex: добавлен индекс раздела

studentNamesRDD.mapPartitionsWithIndex{(index:Int,it:Iterator[String])=>
      ...
 }

другие операторы

  1. Образец: принимать образцы в пропорции, операции преобразования
  2. takeSample: взять образцы по номеру, операция действия
  3. декартово: декартово произведение
  4. объединение: сокращает разделы RDD и сжимает данные в меньшее количество разделов.
    Сценарий использования: если данные во многих разделах неравномерны (например, после фильтрации), можно использовать объединение для сжатия количества разделов rdd, чтобы данные в каждом разделе были более компактными.
    rdd.coalesce(3): сжато в 3 раздела

Разница между объединением и перераспределением
repartition — это упрощенная версия слияния

/**
 * 返回一个经过简化到numPartitions个分区的新RDD。这会导致一个窄依赖
 * 例如:你将1000个分区转换成100个分区,这个过程不会发生shuffle,相反如果10个分区转换成100个分区将会发生shuffle。
 * 然而如果你想大幅度合并分区,例如合并成一个分区,这会导致你的计算在少数几个集群节点上计算(言外之意:并行度不够)
 * 为了避免这种情况,你可以将第二个shuffle参数传递一个true,这样会在重新分区过程中多一步shuffle,这意味着上游的分区可以并行运行。
 */
def coalesce(numPartitions: Int, shuffle: Boolean = false,
           partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
          (implicit ord: Ordering[T] = null)
  : RDD[T] = withScope {...}
/**
 * 返回一个恰好有numPartitions个分区的RDD,可以增加或者减少此RDD的并行度。
 * 在内部,这将使用shuffle重新分布数据,如果你减少分区数,考虑使用coalesce,这样可以避免执行shuffle
 */
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
    coalesce(numPartitions, shuffle = true)
}

Пусть количество разделов RDD изменится с N на M

количество разделов shuffle = true shuffle = false
N < M Разделы N имеют неравномерное распределение данных, используйте функцию HashPartitioner, чтобы переразбить данные на разделы M. Объединение недействительно, нет процесса перемешивания, существует узкая зависимость между родительским RDD и дочерним RDD
N > M Объедините несколько разделов N в новый раздел и, наконец, объедините разделы M
N >> M Shuffle = true, еще одно перемешивание в процессе перераспределения, вышестоящие разделы могут выполняться параллельно, так что операции перед объединением имеют лучший параллелизм. Родительско-дочерний RDD является узкой зависимостью, на этом же этапе параллелизм программы Spark может быть недостаточным (вычисление выполняется на нескольких узлах кластера), что влияет на производительность.
  1. вернутьуменьшатьДля нового RDD с M разделами это приведет к узким зависимостям, и не произойдет перемешивание
  2. вернутьУвеличиватьДля нового RDD из M разделов произойдет перетасовка
  3. Если перетасовка ложна, N

постоянство RDD

кэш () и сохранять ()

  /**
   * Persist this RDD with the default storage level (`MEMORY_ONLY`).
   */
  def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)
  /**
   * Persist this RDD with the default storage level (`MEMORY_ONLY`).
   */
  def cache(): this.type = persist()

Если вам нужно очистить кэш из памяти, вы можете использовать метод unpersist().

class StorageLevel private(
    private var _useDisk: Boolean,  //磁盘
    private var _useMemory: Boolean,//内存
    private var _useOffHeap: Boolean,//内存满就存磁盘
    private var _deserialized: Boolean,//序列化储存
    private var _replication: Int = 1)//冗余备份,默认1,只自己储存一份
  extends Externalizable {
Уровень сохраняемости значение
MEMORY_ONLY Сохраняться в памяти JVM как несериализованный объект Java. Если память не может полностью хранить все разделы RDD, тогда разделы, которые не являются постоянными, будут использоваться в следующий раз, когда они должны быть использованы.пересчитано.
MEMORY_AND_DISK Как указано выше, но когда определенный раздел не может быть сохранен в памяти, он будет сохраняться на диск. В следующий раз, когда вам нужно использовать эти разбиения, нужно прочитать с диска.
MEMORY_ONLY_SER С Memory_only используется последовательность Java, а объект Java сериализуется и длится. Настройка памяти может быть уменьшена, но необходимо выполнить анти-секвенирование, поэтому накладные расходы CPU можно увеличить.
MEMORY_AND_DSK_SER То же, что и MEMORY_AND_DSK. Но используйте сериализацию для сохранения объектов Java.
DISK_ONLY Сохранение с использованием несериализованных объектов Java, полностью хранящихся на диске.
MEMORY_ONLY_2 MEMORY_AND_DISK_2 и т. д. Если в конце добавляется уровень сохраняемости 2, это означает, что постоянные данные будут повторно использоваться и сохраняться на других узлах, так что при потере данных нет необходимости пересчитывать, а нужно только данные резервной копии. использовал.

Приоритизация (память в первую очередь)

  1. MEMORY_ONLY
  2. MEMORY_ONLY_SER, сериализовать данные для хранения
  3. DISK

общая переменная

  1. дефолт
    Если в функции оператора используется внешняя переменная, скопируйте переменную вкаждыйзадача, и в этом случае каждая задача может управлять только своей собственной долейкопия переменной
  2. Широковещательная переменная
    Переменные будут использоваться для каждогоузелСкопируйте копию (не каждую задачу), уменьшите передачу по сети и потребление памяти
    переменная только для чтения
  3. Аккумулятор (накопительные переменные)
    Пусть несколько задач совместно обрабатывают переменную, в основном для операций накопления
val rdd = sc.parallelize(Array(1, 2, 3, 4, 5))
val factorBroadcast = sc.broadcast(3)
val sumAccumulator = new DoubleAccumulator()
//Accumulator must be registered before send to executor
sc.register(sumAccumulator)

val multipleRdd = rdd.map(num => num * factorBroadcast.value)
//不能获取值,只能在driver端获取
val accumulator = rdd.map(num2 => sumAccumulator.add(num2.toDouble))
//action:3,6,9,12,15
multipleRdd.foreach(num => println(num))
//要先执行action操作才能获取值
accumulator.collect()  //15
println(sumAccumulator.value) 
accumulator.count()    //30,再次加15
println(sumAccumulator.value)

архитектура ядра искры

В автономном режиме

Spark架构原理-standalone模式下

TaskScheduler отправляет каждую задачу в наборе задач исполнителю для выполнения.

spark内部组件

Широкие и узкие зависимости

Узкая зависимость: раздел каждого родительского RDD используется не более чем одним разделом дочернего RDD.
Широкая зависимость: каждый родительский раздел RDD используется несколькими дочерними разделами RDD.

разница:

  1. Он обеспечивает узкую зависимость от узла кластера в трубной основе (трубопровод), рассчитанный для всех родительских раздела. Например, выполняется элемент-мудрый карта, затем фильтрация операции;
    Широкие зависимости требуют, чтобы все данные родительского раздела сначала вычислялись, а затем перетасовывались между узлами, что аналогично MapReduce.
  2. Узкие зависимости могут эффективнее восстанавливать отказавшие узлы, то есть необходимо пересчитывать только родительский раздел потерянного раздела RDD, а разные узлы можно вычислять параллельно;
    Для графа происхождения с широкими зависимостями отказ одного узла может привести к тому, что все предки этого RDD потеряют некоторые разделы, что потребует общего пересчета.

宽依赖与窄依赖

режим подачи искры

  1. режим ядра spark/автономный режим: мастер-рабочий кластер на основе spark
  2. Режим кластера пряжи на основе пряжи
  3. Режим клиента пряжи на основе пряжи

установить в сценарии отправки искры
--master значение параметра: yarn-cluster/yarn-client
По умолчанию стоит автономный режим

Скрипт отправки искры

/usr/local/spark/bin/spark-submit \
--class com.feng.spark.spark1.StructuredNetworkWordCount \
--master spark://spark1:7077 \ #standalone模式
--num-executors 3 \     // #分配3个executor
--driver-memory 500m \  
--executor-memory 500m \    # //每个executor500m内存
--executor-cores 2 \    // # 每个executor2个core
/usr/local/test_data/spark1-0.0.1-SNAPSHOT-jar-with-dependencies.jar \

Все приложение требует3*500=1500mОЗУ,3*2=6Ядро
--master local[8]При реализации процесса восемь потоков для имитации кластера:
--total-executor-cores: Укажите общее количество ядер процессора для всех исполнителей.
--supervise: Указан узел драйвера искрового мониторинга.Если драйвер зависнет, то драйвер будет перезапущен автоматически

Метод конфигурации

От высшего к низшему порядку по приоритету

  1. SparkConf: Через настройки программы при запуске в локальном режиме в редакторе можно задать только свойства в SparkConf
  2. команда сценария spark-submit: текущее приложение допустимо, рекомендуется
  3. Spark-Defaults.Conf Файл: Global
SparkConf.set("spark.default.parallelism", "100")
spark-submit: --conf spark.default.parallelism=50
spark-defaults.conf: spark.default.parallelism 10

В сценарии spark-submit вы можете использовать --verbose для вывода подробной информации о свойствах конфигурации.

Сначала вы можете создать пустой объект SparkConf в программе, например

val sc = new SparkContext(new SparkConf())

Затем Spark - отправьте скрипт, используя--confЗадайте значения свойств, например

--conf spark.eventLog.enabled=false

управление зависимостями

--jars: Дополнительные зависимые пакеты jar будут автоматически отправлены в кластер.
Укажите связанную банку:

  1. файл: поддерживается файловой службой http драйвера, все исполнители будут извлекать файлы через службу HTTP драйвера.
  2. hdfs:/http:/https:/ftp: извлекать напрямую из указанного места в соответствии с URI
  3. локальный: файлы в этом формате должны существовать на каждом рабочем узле, поэтому нет необходимости извлекать файлы через сеть io. Он подходит для особенно больших файлов или пакетов jar и может повысить производительность выполнения заданий.

Файлы и jar-файлы копируются в рабочий каталог каждого экзекьютора, что занимает много места на диске, поэтому эти файлы нужно потом очистить
При запуске искровых заданий на пряже очистка файлов зависимостей выполняется автоматически.
Используйте автономный режим, вам нужно настроитьspark.worker.cleanup.appDataTtlсвойства, чтобы включить автоматическую очистку зависимых файлов и пакетов jar

Связанные параметры см.параметры conf/spark-evnshодин период

--packages: Привязать зависимости maven
--repositories: привязать дополнительные репозитории

режим кластера пряжи

используется дляСпособ производства, драйвер работает на nodeManager, проблемы с всплеском трафика сетевой карты нет, но лог смотреть хлопотно, а отлаживать неудобно

基于YARN的提交模式-yarn-cluster

режим клиента пряжи

yarn-clientдля тестирования, Водитель работает на местном клиенте, отвечает за приложение планирования, пряжа будет иметь много сетевого трафика Super-кластера, что приводит кВсплеск трафика сетевой карты
yarn-client может видеть все логи локально, что удобно для отладки

基于YARN的提交模式-yarn-client

  1. В yarn-client драйвер запускается на машине, представленной spark-submit, а ApplicationMaster эквивалентен только ExecutorLauncher, который отвечает только за подачу заявки на запуск исполнителя; драйвер отвечает за конкретное планирование
  2. В пряже-кластере ApplicationMaster является драйвером, а ApplicationMaster отвечает за конкретное планирование.

Автономный процесс взаимодействия основных компонентов

Смотрите такжеКраткий анализ искровой архитектуры

Основные выводы:

  1. Приложение запустит драйвер
  2. Драйвер отвечает за отслеживание и управление всеми состояниями ресурсов и задач во время работы приложения.
  3. Водитель будет управлять группой Исполнителей
  4. Исполнитель выполняет только Задачи, принадлежащие одному Водителю

standalone核心组件交互流程

  • Оранжевый: отправить пользовательскую программу Spark.
    Пользователь отправляет программу Spark, и основной процесс выглядит следующим образом:

    1. Пользовательский сценарий spark-submit отправляет программу Spark, которая создаст объект ClientEndpoint, отвечающий за связь с Мастером.
    2. ClientEndpoint отправляет мастеру сообщение RequestSubmitDriver, указывающее, что пользовательская программа отправлена
    3. Мастер получает сообщение RequestSubmitDriver и отвечает на ClientEndpoint сообщением SubmitDriverResponse, указывающим, что пользовательская программа завершила регистрацию.

    4,5 комбинация, должна быть программа, которую пользователь прописал в мастере, но драйвер может не запуститься

    1. Отправить сообщение мастеру ClientEndpoint RequestDriverStatus, запрос состояния драйвера

    MasterEndPoint должен возвращать ответ, аналогичный DriverStatusResponse to DriverClient? Периодический ответ, когда он узнает, что драйвер был запущен, это приведет к 5

    1. Если драйвер, соответствующий текущей пользовательской программе, был запущен, ClientEndpoint напрямую завершает работу и завершает отправку пользовательской программы.
  • Фиолетовый: запустите процесс драйвера.
    После того, как пользователь отправит пользовательскую программу Spark, необходимо запустить драйвер для обработки логики расчета пользовательской программы и выполнения задачи расчета.В это время Мастеру необходимо запустить драйвер:

    1. Приложение задачи, отправленное пользователем для расчета, сохраняется в памяти Maser.Каждый раз при изменении структуры памяти будет срабатывать планирование, и в Worker будет отправлен запрос LaunchDriver.
    2. Worker получает сообщение LaunchDriver и запускает поток DriverRunner для выполнения задачи LaunchDriver.
    3. Нить Driverrunner начинается на рабочемНовый экземпляр JVM, процесс Driver запускается в экземпляре JVM, и Driver создает объект SparkContext.

    Текущий рабочий узел запускает процесс драйвера

  • Красный: заявка на регистрацию
    После того, как драйвер запустится, он создаст объекты SparkContext, базовый процесс инициализации компонента, необходимый для регистрации мастера приложений, этот процесс описывается следующим образом:

    1. Создание объекта SparkEnv, создание и управление некоторыми базовыми компонентами

    SparkEnv Holds all the runtime environment objects for a running Spark instance (either master or worker), including the serializer, RpcEnv, block manager, map output tracker, etc. Currently Spark code finds the SparkEnv through a global variable, so all the threads can access the same SparkEnv

    1. Создайте TaskScheduler, отвечающий за планирование задач
    2. Создать standaloneschedulerbackend, отвечаю за переговоры по ресурсам с ClusterManager
    3. Создайте DriverEndpoint, другие компоненты могут взаимодействовать с Driver

    Только что создан, еще не запущен

    1. Создайте StandaloneAppClient внутри StandaloneSchedulerBackend, который отвечает за обработку коммуникационного взаимодействия с Master.
    2. StandaloneAppClient создает ClientEndpoint, который фактически отвечает за связь с мастером.
    3. ClientEndpoint отправляет сообщение о зарегистрировании Master для регистрации приложения
    4. После того, как Мастер получает запрос RegisterApplication, он отвечает на ClientEndpoint сообщением RegisteredApplication, указывающим, что регистрация прошла успешно.
  • Синий: Запустите процесс Executor.

    1. Мастер отправляет Worker-у сообщение LaunchExecutor с просьбой запустить Executor, в то же время Master отправляет Driver-у сообщение ExecutorAdded о том, что Master добавил Executor (который в данный момент еще не запущен)

    Исполнитель на самом деле не запущен, мастер просто отправляет сообщение для запуска исполнителя рабочему.Этот шаг показывает, что мастер отвечает за запуск и назначение исполнителя, а драйвер просто отправляет задачу исполнителю.

    1. Когда Worker получает сообщение LaunchExecutor, он запускает поток ExecutorRunner для выполнения задачи LaunchExecutor.
    2. Рабочий процесс отправляет мастеру сообщение ExecutorStageChanged, чтобы уведомить исполнителя об изменении состояния.
    3. Мастер отправляет сообщение ExcututorUpdated водителю, и исполнитель был запущен в это время

    Здесь мастер действительно сообщает драйверу, что исполнитель запущен

  • Розовый: начать выполнение задачи.

    1. StandaloneSchedulerBackend запускает DriverEndpoint

    Он был создан ранее, но не запущен, а предыдущее взаимодействие с мастером было выполнено с помощью StandaloneSchedulerBackend.

    1. После того, как DriverEndpoint запущен, он будет периодически проверять состояние исполнителя, поддерживаемого драйвером, и, если исполнитель будет простаивать, запланирует выполнение задачи.

    Запустите фоновый поток драйвера-восстановления-потока, периодически отправляйте себе ReviveOffers и позвольте себе проверить статус исполнителя.

    1. Отправка запроса на ресурс Предложение Driverdpoint TasksCheduler

    DriverencePoint - это объект удержания внутри грузопередача

    1. Если есть доступные ресурсы для запуска задачи, DriverEndpoint отправляет запрос LaunchTask исполнителю.
    2. CoarseGrainedExecutorBackend внутри процесса Executor вызывает метод launchTask внутреннего потока Executor для запуска Task.
    3. Поток Executor поддерживает внутренний пул потоков, создает поток TaskRunner и отправляет его в пул потоков для выполнения.
  • Зеленый: выполнение задачи завершено.

    1. Поток Executor внутри процесса Executor уведомляет CoarseGrainedExecutorBackend о том, что Task выполняется.
    2. CARSSEGRIEDEXECUTECUTRECTS statusupdated Отправить сообщение на Driverncoint, уведомить уведомление о состоянии задачи драйвера
    3. StandaloneSchedulerBackend вызывает метод updateStatus TaskScheduler для обновления статуса задачи.

    Родительский класс StandaloneSchedulerBackend CoarseGrainedSchedulerBackend внутренне содержит DriverEndpoint (внутренний класс).scheduler.statusUpdate(taskId, state, data.value)

    1. StandaloneSchedulerBackend продолжает вызывать метод resourceOffers класса TaskScheduler для планирования выполнения других задач.

Кластер Spark Standalone запускает мастер и рабочий отдельно

использоватьstart-all.shСценарий может начать главный процесс, и все рабочие процессы для быстрого запуска всей искровой автономной кластеры

Запустите главный и рабочий процессы соответственно.

Зачем начинать соответственно

Для запуска отдельно можно настроить некоторые уникальные параметры процесса через параметры командной строки
Например, номер порта прослушивания, номер порта веб-интерфейса, используемый процессор и память.
Поскольку не только программа spark, но и программа storm, работающая на машине, может ограничить рабочий процесс spark, чтобы он использовал меньше ресурсов (ядро процессора, память) вместо всех ресурсов на машине.

параметр значение объект частота использования
-h HOST, --ip HOST На какой машине начинать, по умолчанию это машина master & worker редко используется
-p PORT, --port PORT После запуска на машине, какой порт используется для предоставления услуг внешнему миру, мастер по умолчанию — 7077, а рабочий по умолчанию — случайный master & worker редко используется
--webui-port PORT Порт веб-интерфейса, основной по умолчанию — 8080, рабочий по умолчанию — 8081. master & worker редко используется
-c CORES, --cores CORES Общее количество ядер ЦП, которые может использовать искровое задание, по умолчанию — все ядра ЦП на текущей машине. worker Обычно используется
-m MEM, --memory MEM Сколько памяти может использовать задание Spark в целом, в формате 100M или 1G, по умолчанию 1g worker Обычно используется
-d DIR, --work-dir DIR Рабочий каталог, по умолчанию — SPARK_HOME/work directory. worker Обычно используется
--properties-file FILE Адрес, по которому мастер и рабочий загружают файл конфигурации по умолчанию, по умолчанию — conf/spark-defaults.conf. master & worker редко используется

последовательность загрузки

Сначала запустите мастер, а затем рабочий, потому что после запуска рабочего ему нужно зарегистрироваться на мастере

последовательность выключения 1.worker(./stop-slave.sh); 2. Мастер (./stop-master); 3. Закрыть кластер./stop-all.sh

Начать мастер

  1. использоватьstart-master.shзапускать
  2. В журнале запуска будет напечатана строкаspark://HOST:PORT, это URL-адрес мастера, рабочий процесс подключится к мастер-процессу через этот URL-адрес и зарегистрируется

    можно использоватьSparkSession.master()установить главный адрес

  3. в состоянии пройтиhttp://MASTER_HOST:8080Чтобы получить доступ к веб-интерфейсу мониторинга главного кластера, в веб-интерфейсе будет отображаться URL-адрес мастера.

Вручную запустить рабочий процесс

использоватьstart-slave.sh <master-spark-URL>существуеттекущий узелЗапустить рабочий процесс на
http://MASTER_HOST:8080Такая информация, как ресурсы процессора и памяти узла, будет отображаться в веб-интерфейсе.
eg:./start-slave.sh spark://192.168.0.001:8080 --memory 500m

все сценарии запуска и выключения искры

параметр значение
sbin/start-all.sh В соответствии с конфигурацией запустите главный процесс и несколько рабочих процессов на каждом узле в кластере.
sbin/stop-all.sh Остановить все главные и рабочие процессы в кластере
sbin/start-master.sh Запустить мастер-процесс локально
sbin/stop-master.sh Закройте основной процесс
sbin/start-slaves.sh Запустите все рабочие процессы в соответствии с рабочими узлами, настроенными в файле conf/slaves.
sbin/stop-slaves.sh Остановить все рабочие процессы
sbin/start-slave.sh Запустить рабочий процесс локально

конфигурационный файл

конфигурация рабочего узла

Настройте машину как рабочий узел, например, имя хоста/IP-адрес, одна машина — это линия
После настройки скопируйте этот файл на все узлы
По умолчанию файла conf/slavs нет, есть только один пустой conf/slaves.template, На этом этапе просто запустите главный процесс и рабочий процесс на текущем первичном узле.В это время главный процесс и рабочий процесс находятся на одном узле, то есть развертывание псевдо-дистрибутива.
Пример файла conf/slaves

spark1  
spark2  
spark3  

параметры conf/spark-evnsh

Это кластерное развертывание всей искры с настройкой каждого мастера и рабочего.

То же, что и сценарий запуска --parameter./start-slave.sh spark://192.168.0.001:8080 --memory 500m, эта команда скрипта больше подходит для временного изменения параметров
Параметры командной строки имеют более высокий приоритет и переопределяют параметры spark-evnsh.

параметр значение
SPARK_MASTER_IP Укажите IP-адрес машины, на которой находится главный процесс.
SPARK_MASTER_PORT Указывает номер порта, который слушает мастер (по умолчанию 7077).
SPARK_MASTER_WEBUI_PORT Укажите номер порта основного веб-интерфейса (по умолчанию 8080).
SPARK_MASTER_OPTS Установите дополнительные параметры мастера, используйте "-Dx=y" для установки каждого параметра
SPARK_LOCAL_DIRS Рабочий каталог Spark, включая выходные файлы карт перетасовки, RDD, сохраненные на диске, и т. д.
SPARK_WORKER_PORT Номер порта рабочего узла, который по умолчанию является случайным
SPARK_WORKER_WEBUI_PORT Номер порта веб-интерфейса рабочего узла, по умолчанию 8081.
SPARK_WORKER_CORES На рабочем узле максимальное количество процессоров, разрешенное для искровых заданий, по умолчанию — все ядра процессора на машине.
SPARK_WORKER_MEMORY На рабочем узле максимальный объем памяти, разрешенный для искровых заданий, формат — 1000 м, 2 г и т. д. Минимум по умолчанию — 1 г памяти.
SPARK_WORKER_INSTANCES Количество рабочих процессов на текущей машине, по умолчанию 1,Может быть установлено несколько, но обязательно установите SPARK_WORKER_CORES в это время, чтобы ограничить количество ЦП на одного рабочего
SPARK_WORKER_DIR Рабочий каталог задания spark, включая журнал задания и т. д. По умолчанию — spark_home/work.
SPARK_WORKER_OPTS Дополнительные параметры воркера, используйте "-Dx=y" для установки каждого параметра
SPARK_DAEMON_MEMORY Память, выделенная самим мастеру и рабочим процессам, по умолчанию 1g
SPARK_DAEMON_JAVA_OPTS Установите собственные параметры jvm мастера и рабочего, используйте "-Dx=y" для установки каждого параметра
SPARK_PUBLISC_DNS Общедоступное DNS-имя домена мастера и рабочего по умолчанию недоступно.
  • SPARK_MASTER_OPTS
    Для установки дополнительных параметров мастера используйте-Dx=yУстановите каждый параметр
    eg:export SPARK_MASTER_OPTS="-Dspark.deploy.defaultCores=1"

    имя параметра По умолчанию значение
    spark.deploy.retainedApplications 200 Максимальное количество приложений, отображаемых в веб-интерфейсе искры.
    spark.deploy.retainedDrivers 200 Максимальное количество водителей, отображаемых в веб-интерфейсе искры.
    spark.deploy.spreadOut true Стратегия планирования ресурсов, spreadOut попытается распределить процесс-исполнитель приложения на большее количество воркеров, что подходит для расчета на основе файлов hdfs и повысит вероятность локализации данных; не-spreadOut попытается выделить исполнителя на воркер, который подходит для вычислительной интенсивной эксплуатации
    spark.deploy.defaultCores Неограниченный Максимальное количество ядер ЦП, которое может использовать каждое задание Spark в автономном кластере, по умолчанию бесконечно, сколько используется
    spark.deploy.timeout 60 Через секунду, сколько времени не отвечает, Мастер считает, что работник висит.
  • SPARK_WORKER_OPTS
    Дополнительные параметры для воркера

    имя параметра По умолчанию значение
    spark.worker.cleanup.enabled false Запускать ли автоматическую очистку рабочего каталога воркера, по умолчанию — false
    spark.worker.cleanup.interval 1800 В секундах временной интервал для автоматической очистки, по умолчанию 30 минут.
    spark.worker.cleanup.appDataTtl 7 * 24 * 3600 По умолчанию, сколько времени файл искрового задания находится в рабочем каталоге рабочего, по умолчанию 7 дней.

Приложение Spark запущено

локальный режим

В основном используется для нативного тестирования

/usr/local/spark/bin/spark-submit \
--class cn.spark.study.core.xxx \
--num-executors 3 \
--driver-memory 100m \
--executor-memory 100m \
--executor-cores 2 \
/usr/local/test/xxx.jar \

автономный режим

настройки параметров

автономный режим отличается от локального, вы хотите установить мастерspark://master_ip:port,какspark://192.168.0.103:7077

  1. Код:val spark = SparkSession.builder().master("spark://IP:PORT")...
  2. spark-submit: --master spark://IP:PORT --deploy-mode client/cluster
    Режим клиента по умолчанию
  3. spark-shell: --master spark://IP:PORT: для экспериментов и испытаний
    /usr/local/spark/bin/spark-submit \
    --class cn.spark.study.core.xxx \
    --master spark://192.168.0.103:7077 \
    --deploy-mode client \
    --num-executors 1 \
    --driver-memory 100m \
    --executor-memory 100m \
    --executor-cores 1 \
    /usr/local/test/xxx.jar \
    

--master:

  1. не устанавливать: локальный режим
  2. spark://xxx:автономный режим, он будет отправлен в главный процесс указанного URL-адреса
  3. yarn-xxx:yarn, прочитает файл конфигурации Hadoop, а затем подключится к ResourceManager.

рабочий процесс в автономном клиентском режиме

После отправки запущенного задания используйте jps для немедленного просмотра процесса, и вы увидите, что запущены следующие процессы.

  1. SparkSubmit: процесс-драйвер, запущенный на этой машине (машине, на которой находится spark-submit)
  2. CoarseGrainedExecutorBackend (внутренне содержит объект Executor, CoarseGrainedExecutorBackend — это процесс-исполнитель): на рабочей машине, выполняющей искровое задание, назначьте и запустите процесс-исполнитель для задания.
    SparkSubmit назначает задачи CoarseGrainedExecutorBackend

автономный кластерный режим

Автономный режим кластера поддерживает мониторинг процесса драйвера и автоматически перезапускает процесс, когда драйвер зависает, в основном используется для обеспечения высокой доступности в потоковой передаче spark.--superviseПросто определите

Чтобы убить процесс драйвера, который постоянно зависаетbin/spark-class org.apache.spark.deploy.Client kill <master url> <driver ID>,пройти черезhttp://<maser url>:8080Вы можете увидеть идентификатор водителя

убить приложение под пряжейyarn application -kill applicationid

процесс:

  1. SparkSubmit выполняется недолго, просто регистрирует драйвер на мастере, запускает драйвер с мастера и немедленно останавливается;
  2. На Worker будет запущен процесс DriverWrapper
  3. Если может быть применено достаточно ресурсов ЦП, процесс CoarseGrainedExecutorBackend будет запущен на других воркерах.
...
--deploy-mode cluster \
--num-executors 1 \
--executor-cores 1 \
...

В кластерном режиме

  1. Рабочий запускает драйвер и занимает процессорное ядро
  2. Драйвер запрашивает ресурсы у мастера и запускает исполнительный процесс на рабочем с незанятыми ресурсами ЦП.

Слишком мало ядер процессора может привести к тому, что исполнитель не запустится и будет продолжать ждать, например, когда есть только один рабочий процесс и одно ядро ​​​​процессора.

В кластерном режиме драйвер запускается в процессе в воркере в кластере, а клиентский процесс завершится после выполнения задачи отправки заявки, не дожидаясь завершения приложения и последующего выхода

Автономное планирование ресурсов для нескольких заданий

Каждое задание Spark, отправленное по умолчанию, попытается использоватьвсе доступные ресурсы процессораВ это время может поддерживать только рабочие местасериалОн запускается, поэтому автономный кластер поддерживается только для нескольких заданий, которые отправляются одновременно.Политика планирования FIFO

  • Настройте несколько заданий для одновременного запуска
    можно установитьspark.cores.maxПараметры, ограничивающие каждое задание, которое может использовать максимальное количество ядер процессора, чтобы задания не использовали все ресурсы процессора, при резервном копировании задания вы можете запустить ресурсы, по умолчанию он получит кластер всех ядер (ядерный) , который разрешает только одно значение за раз, когда приложение работает.
  1. spark.conf.set("spark.cores.max", "num")
  2. Отправить команду скриптаspark-submit: --master spark://IP:PORT --conf spark.cores.max=num
  3. spark-env.shГлобальная конфигурация:export SPARK_MASTER_OPTS="-Dspark.deploy.defaultCores=num" 默认数量

standalone web ui

Автономный режим spark предоставляет веб-интерфейс по умолчанию на порту 8080 на главном компьютере, который можно настроить с помощьюspark-env.shфайл и т. д., чтобы настроить порт веб-интерфейса, адрес такой, какspark://192.168.0.103:8080

Режим пряжи Spark следует просматривать в веб-интерфейсе YARN, напримерhttp://192.168.0.103:8088/

  • application web ui

Пользовательский интерфейс сведений о приложении находится на порту 4040 машины, на которой находится драйвер задания.

  • Оперативный уровень
    Может использоваться для конкретных задач позиционирования, таких как
    1. Неравномерное распределение данных задачи: перекос данных
    2. Этап выполняется в течение длительного времени: в соответствии с алгоритмом разделения этапов найдите код, соответствующий этапу, чтобы оптимизировать производительность.
    3. журналов на исполнителя на задание
      stdout:System.out.println;
      stderr:System.err.printlnи журнал системного уровня

      После выполнения задания информация исчезает, и необходимо запустить сервер истории.

режим пряжи

помещение:spark-env.shфайл, настроитьHADOOP_CONF_DIRилиYARN_CONF_DIRАтрибут, значением которого является каталог конфигурационного файла hadoop.HADOOP_HOME/etc/hadoop, который содержит все конфигурационные файлы хаупа и пряжи, такие как hdfs-site, пряжа-сайт и т. д.
Цель: искровое чтение и запись hdfs, подключение к менеджеру ресурсов пряжи

Два режима работы

  • режим клиента пряжи
    Процесс драйвера будет запущен на машине, которая отправляет задание. ApplicationMaster отвечает только за запрос ресурсов (исполнитель) из пряжи для задания. Драйвер по-прежнему будет нести ответственность за планирование задания.
  • режим пряжи-кластера
    Процесс драйвера будет работать на рабочем узле кластера пряжи как процесс ApplicationMaster.

Посмотреть журналы пряжи

Логи разбросаны по каждой машине в кластере, а параметры настраиваются в yarn-site.xml

  1. Метод совокупного журнала (рекомендуется)
    настройки свойства значение
    yarn.log-aggregation-enable=true Журнал контейнера будет скопирован в hdfs и удален с машины.
    yarn.nodemanager.remote-app-log-dir Когда приложение завершает работу, каталог HDFS, в который передаются журналы (действительно, когда включена функция агрегирования журналов)
    yarn.nodemanager.remote-app-log-dir-suffix Имя подкаталога удаленного каталога журналов (действительно при включенной функции агрегирования журналов)
    yarn.log-aggregation.retain-seconds Как долго агрегированные журналы хранятся в HDFS, с
    yarn logs -applicationId <app ID> Просмотрите журнал, applicationId можно просмотреть в веб-интерфейсе пряжи (вы также можете просмотреть файл журнала непосредственно на hdfs)
    yarn.nodemanager.log.retain-second когдаНе включеноЭтот параметр регистрирует начало полимеризации, файл журнала хранится в локальном времени в секундах.
    yarn.log-aggregation.retain-check-interval-seconds Как часто удалять журналы с истекшим сроком действия
  2. просмотр веб-интерфейса
    Необходимо запустить сервер истории, запустить сервер истории искры и сервер истории mapreduce.
    Без настройки вы можете только просматривать журнал работы
    конфигурация см.Spark History Web UIодин период
  3. Рассредоточенный вид
    Журнал по умолчанию находится вYARN_APP_LOGS_DIRкаталог, например/tmp/logsили$HADOOP_HOME/logs/userlogs
    Если сервер истории не включен в кластере пряжи, вы хотите просмотретьsystem.outжурнал, который должен бытьyarn-site.xmlустановить в файлyarn.log.aggregation-enableЗначение true (скопировать лог в hdfs), которое передается при просмотреyarn logs -applicationId xxxПосмотреть на машине

представить сценарий

/usr/local/spark/bin/spark-submit \
--class xxx \
# 自动从hadoop配置目录中的配置文件中读取cluster manager地址
--master yarn-cluster/yarn-client \ 
--num-executors 1 \
--driver-memory 100m \
--executor-memory 100m \
--executor-cores 1 \
--conf <key>=<value> \
# 指定不同的hadoop队列,项目或部门之间队列隔离
--queue hadoop队列 \
/usr/local/test/xxx.jar \
${1}

--conf: настроить все свойства конфигурации, поддерживаемые spark, использоватьkey=valueФормат; если значение содержит пробелы, то необходимо заключить ключ=значение в двойные кавычки.--conf "<key>=<value>"
application-jar: полный путь к упакованному пакету jar проекта spark на текущем компьютере.
application-arguments: параметр передается в основной метод основного класса; используется в оболочке${1}Заполнитель получает параметры, передавающие в оболочку; может пройти основной метод в Javaargs[0]И другие параметры для получения, при отправке искровых приложений,./脚本.sh 参数值

свойства запуска искрового задания в режиме пряжи

может быть в сценарии фиксации--confустановить свойства

Имя свойства По умолчанию значение
spark.yarn.am.memory 512m В клиентском режиме общий объем памяти, используемый Мастером приложений YARN.
spark.yarn.am.cores 1 В клиентском режиме количество процессоров, используемых Application Master
spark.driver.cores 1 В кластерном режиме количество процессорных ядер, используемых драйвером, драйвер и мастер приложений работают в одном процессе, поэтому количество процессоров мастера приложений также контролируется.
spark.yarn.am.waitTime 100s В кластерном режиме мастер приложения ожидает времени инициализации SparkContext, в клиентском режиме мастер приложения ждет, пока к нему подключится драйвер.
spark.yarn.submit.file.replication количество реплик hdfs Количество копий файлов, записанных в hdfs заданием, таких как файлы jar проекта, зависимые файлы jar, файлы конфигурации и т. д. Минимум должен быть 1.
spark.yarn.preserve.staging.files false Если установлено значение true, то после работы задания банка проекта будет удалена от удаления.
spark.yarn.scheduler.heartbeat.interval-ms 3000 Интервал, с которым мастер приложения отправляет тактовый сигнал диспетчеру ресурсов, в мс.
spark.yarn.scheduler.initial-allocation.interval 200ms Интервал, с которым мастер приложения сразу же отправляет тактовый сигнал диспетчеру ресурсов при наличии отложенного требования о выделении контейнера.
spark.yarn.max.executor.failures Количество исполнителей*2, минимум 3 Максимальное количество отказов исполнителя, прежде чем вся работа будет признана неудачной.
spark.yarn.historyServer.address никто адрес сервера истории искры
spark.yarn.dist.archives никто Архив, который получает каждый исполнитель и кладет в рабочий каталог
spark.yarn.dist.files никто Файлы в рабочем каталоге, которые каждый исполнитель должен поместить в
spark.executor.instances 2 Количество исполнителей по умолчанию
spark.yarn.executor.memoryOverhead память исполнителя 10% Размер памяти вне кучи для каждого исполнителя, используемой для хранения таких вещей, как константные строки.
spark.yarn.driver.memoryOverhead память драйвера 7% То же
spark.yarn.am.memoryOverhead АМ-память 7% То же
spark.yarn.am.port случайный главный порт приложения
spark.yarn.jar никто расположение файла jar jar
spark.yarn.access.namenodes никто Адрес namenode hdfs, к которому могут обращаться задания Spark.
spark.yarn.containerLauncherMaxThreads 25 Максимальное количество потоков, которое мастер приложения может использовать для запуска контейнера исполнителя.
spark.yarn.am.extraJavaOptions никто jvm параметры мастера приложения
spark.yarn.am.extraLibraryPath никто Дополнительные пути к библиотекам для мастера приложений
spark.yarn.maxAppAttempts Максимальное количество попыток отправки искрового задания
spark.yarn.submit.waitAppCompletion true Ожидает ли клиент в кластерном режиме завершения задания перед выходом?

О схеме высокой доступности мастера

В автономном режиме планировщик полагается на главный процесс для принятия решений по планированию, что может создать единую точку отказа: если главный процесс умирает, новые приложения не могут быть отправлены.
Для решения этой проблемы spark предлагает два решения высокой доступности, а именно:на основе зоопаркапротокол HA (рекомендуется) ифайловая системаРешение HA.

ZOOOKEEPER на основе HA раствор

Обзор

Используя zookeeper для обеспечения выбора лидера и некоторого хранилища состояния, вы можете запустить несколько главных процессов в кластере и позволить им подключаться к экземпляру zookeeper. Один из мастер-процессов будет выбран ведущим, а остальные мастер-процессы будут назначены в режим ожидания.
Если текущий главный процесс-лидер умирает, другие резервные мастера будут выбраны для восстановления состояния старого мастера.

настроить

После запуска кластера zookeeper запустите несколько главных процессов на нескольких узлах и назначьте им один и тот же zookeeper. Конфигурация (URL-адрес и каталог zookeeper). мастера могут быть динамически добавлены в главный кластер и удалены в любое время

существуетspark-env.shфайл, наборSPARK_DAEMON_JAVA_OPTSОпции:

  1. spark.deploy.recoveryMode: Установите ZOOKEEPER, чтобы включить резервный главный режим восстановления (по умолчанию НЕТ).
  2. spark.deploy.zookeeper.url: URL кластера zookeeper
  3. spark.deploy.zookeeper.dir:directory в zookeeper для хранения состояния восстановления (по умолчанию/spark)
export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=192.168.0.103:2181,192.168.0.104:2181 -Dspark.deploy.zookeeper.dir=/spark"

Если в кластере запущено несколько мастер-нод, но мастер не настроен должным образом для использования zookeeper, мастер выйдет из строя, когда он зависнет для восстановления, потому что другие мастера не могут быть найдены, и все они будут думать, что они лидеры. Это приведет к неработоспособному состоянию кластера, так как все мастера будут выполнять планирование самостоятельно.

деталь

Чтобы запланировать новые приложения или добавить рабочие узлы в кластер, им необходимо знать IP-адрес текущего ведущего мастера, что можно сделать, передав список мастеров. Вы можете указать адрес главного соединения SparkSession наspark://host1:port1,host2:port2. Это приведет к тому, что SparkSession попытается зарегистрировать все мастера.Если хост 1 умирает, конфигурация правильная, поскольку будет найден новый ведущий мастер.

При запуске приложения или при необходимости найти работника и зарегистрировать его у текущего мастера-лидера. После успешной регистрации онСохранить в зоопарке.В случае сбоя новый мастер-лидер свяжется со всеми ранее зарегистрированными приложениями и воркерами и уведомит их об изменении мастера.. Приложению даже не нужно знать о существовании нового мастера при запуске.

Таким образом, новый мастер может быть создан в любое время, пока новые приложения и рабочие могут быть найдены, и вы можете зарегистрироваться на мастере.

Запустите резервный мастер на другом узле:./start-master.sh

Схема высокой доступности на основе файловой системы

Обзор

Режим FILESYSTEM: когда и приложения, и рабочие процессы зарегистрированы в мастере, мастер запишет свою информацию в указанный каталог файловой системы, чтобы зарегистрированное приложение и статус рабочих процессов можно было восстановить при перезапуске;
Требуется ручной перезапуск

настроить

существуетspark-env.shустановить вSPARK_DAEMON_JAVA_OPTS

  1. spark.deploy.recoveryMode: установите значение FILESYSTEM, чтобы включить восстановление из одной точки (по умолчанию NONE).
  2. spark.deploy.recoveryDirectory: каталог файловой системы, в котором spark хранит информацию о состоянии, которая должна быть доступна для мастера.

eg:

export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=FILESYSTEM -Dspark.deploy.recoveryDirectory=/usr/local/spark_recovery"

деталь

  1. Этот режим больше подходит для сред разработки и тестирования.
  2. stop-master.shСкрипт, убивающий главный процесс, не очищает его состояние восстановления, при перезапуске нового главного процесса он переходит в режим восстановления. Вам нужно дождаться тайм-аута всех ранее зарегистрированных рабочих и других узлов, прежде чем их можно будет восстановить.
  3. Вы можете использовать каталог NFS (похожий на HDFS) в качестве каталога восстановления. Если исходный главный узел умирает, главный процесс может быть запущен на другом узле, и он правильно восстановит все ранее зарегистрированные рабочие процессы и приложения. Более поздние приложения могут найти нового мастера и зарегистрироваться.

мониторинг искровых заданий

Методы мониторинга заданий: веб-интерфейс Spark, веб-интерфейс Spark History, RESTFUL API и метрики.

Spark Web UI

После отправки каждого задания Spark и запуска SparkSession запускается соответствующая служба веб-интерфейса Spark. По умолчанию адрес доступа веб-интерфейса Spark — это порт 4040 узла, на котором расположен процесс драйвера, напримерhttp://<driver-node>:4040

Веб-интерфейс Spark включает следующую информацию:

  1. список этапов и задач
  2. Обзор размера RDD и использования памяти
  3. Информация об окружающей среде
  4. Информация об исполнителе, соответствующем заданию

Если на машине запущено несколько драйверов, они автоматически привязываются к разным портам. По умолчанию он начинается с порта 4040. Если обнаружится, что он был привязан, будут выбраны такие порты, как 4041 и 4042, и так далее.

Эта информация по умолчаниюДействует во время выполнения задания, После завершения задания процесс драйвера и соответствующая веб-служба пользовательского интерфейса также останавливаются. Если вы хотите увидеть веб-интерфейс Spark и подробности после завершения задания, вам необходимо включить сервер истории Spark.

Spark History Web UI

  1. Создать каталог для хранения журналов
    созданный каталогhdfs://ip:port/dirName
    Заказhdfs dfs -mkidr /dirName
  2. Исправлятьspark-defaults.conf
    spark.eventLog.enabled  true    #启用
    spark.eventLog.dir      hdfs://ip:port/dirName
    spark.eventLog.compress true    #压缩
    
  3. Исправлятьspark-env.sh
    export SPARK_HISTORY_OPTS="-Dspark.history.ui.port=18080 -Dspark.history.retainedApplications=50 -Dspark.history.fs.logDirectory=hdfs://ip:port/dirName"
    

    spark.eventLog.dirУкажите адрес журнала событий задания
    spark.history.fs.logDirectoryУказывает каталог, из которого считываются данные задания.
    Два адреса каталога должны быть одинаковыми

  4. Начать историю
    ./sbin/start-history-server.shВы можете увидеть адрес доступа истории-сервера в интерфейсе запуска, и открыть веб-интерфейс истории через адрес доступа

RESTFUL API

Предоставляет RESTFUL API для возврата json-данных о журналах.

API значение
/applications Получить список вакансий
/applications/[app-id]/jobs список заданий для указанной работы
/applications/[app-id]/jobs/[job-id] Информация об указанной работе
/applications/[app-id]/stages Определяет список стадий для задания
/applications/[app-id]/stages/[stage-id] Список всех попыток для указанного этапа
/applications/[app-id]/stages/[stage-id]/[stage-attempt-id] Укажите информацию о попытке этапа
/applications/[app-id]/stages/[stage-id]/[stage-attempt-id]/taskSummary Указывает метрическую статистику всех задач в попытке этапа.
/applications/[app-id]/stages/[stage-id]/[stage-attempt-id]/taskList Укажите список задач попытки этапа
/applications/[app-id]/executors Список исполнителей для указанной работы
/applications/[app-id]/storage/rdd Список постоянных rdd для указанного задания
/applications/[app-id]/storage/rdd/[rdd-id] Указывает информацию для сохранения rdd
/applications/[app-id]/logs Загрузите указанное задание Все архив журнала
/applications/[app-id]/[attempt-id]/logs Скачать сжатый пакет всех журналов попытки выполнения указанного задания

eg:http://192.168.0.103:18080/api/v1/applications

Планирование рабочих ресурсов

Распределение статических ресурсов

  1. Параллелизм приложений: каждое искровое приложение будет запускать свой собственный независимый пакет процессов-исполнителей для выполнения задач и хранения данных.В это время диспетчер кластера будет обеспечивать функцию планирования нескольких приложений одновременно.
  2. Параллелизм заданий: в каждом приложении Spark несколько заданий также могут выполняться параллельно.

Отправьте несколько приложений Spark одновременно

Стратегия распределения ресурсов по умолчанию между заданиями — это статическое распределение ресурсов. Ограничение максимального количества ресурсов, которые могут храниться во время выполнения. Это метод по умолчанию, используемый автономными кластерами spark и кластерами YARN.

  • Автономный кластер
    По умолчанию несколько заданий, отправленных в автономный кластер, будут проходитьFIFOспособ запуска, каждое задание будет пытаться получить все ресурсы.
    spark.cores.max: ограничить максимальное количество ядер ЦП, которое может использовать каждое задание.
    spark.deploy.defaultCores: установить использование ядра ЦП по умолчанию для каждого задания.
    spark.executor.memory: Установите максимальный объем памяти для каждого задания.

  • YARN
    --num-executors: Настройте, сколько рабочих мест может быть назначена исполнителю в кластере
    --executor-memoryи--executor-coresВы можете контролировать ресурсы, которые может использовать каждый исполнитель.

Нет менеджера кластера, который может обеспечить функцию разделения памяти между несколькими заданиями.Если вам нужно разделить память, вы можете использовать один сервис (например: alluxio), чтобы несколько приложений могли получить доступ к данным одного и того же RDD.

Динамическое распределение ресурсов

Когда ресурс выделен для задания, но он свободен, этот ресурс можно вернуть в пул ресурсов менеджера кластера для использования другими заданиями. В Spark динамическое выделение ресурсов реализовано на уровне исполнителя, установленном при включении.spark.dynamicAllocation.enabledЕсли true, запустите внешнюю службу перетасовки на каждом узле и установите для spark.shuffle.service.enabled значение true. Назначение внешней службы перетасовки состоит в том, чтобы сохранить файлы перетасовки, выводимые исполнителем, когда исполнитель удален.

стратегия применения

Приложение Spark будет обращаться к дополнительным исполнителям, когда у него есть ожидающие выполнения задачи, ожидающие планирования.

Задача отправлена, но ожидает планирования -> количество исполнителей недостаточно

  1. Драйвер обращается к исполнителю опросным способом
    когда в течение определенного времениspark.dynamicAllocation.schedulerBacklogTimeoutКогда есть ожидающая задача, будет запущено настоящее приложение-исполнитель.
  2. через определенное времяspark.dynamicAllocation.sustainedSchedulerBacklogTimeout, если есть отложенная задача, тосноваЗапустите операцию приложения.
  3. Количество исполнителей, подаваемых в каждом раунде, увеличивается экспоненциально (например, 1, 2, 4, 8, ..): Есть две причины для стратегии экспоненциального роста:
    Во-первых, если какому-либо приложению Spark нужно подать заявку только на несколько дополнительных исполнителей, оно должно быть очень осторожным при запуске ресурсного приложения, что несколько похоже на медленный запуск TCP;
    Во-вторых, если приложению Spark действительно нужно обращаться к нескольким исполнителям, оно может гарантировать, что необходимые ему вычислительные ресурсы со временем возрастут.

Политика удаления

Зажигательная работа будет работать без простоя после того, как его исполнители были простаиваются в течение более чем определенного времени (spark.dynamicAllocation.executorIdleTimeout), был удален.

Это означает, что задачи не ожидаются, исполнитель бездействует, а условия приложения являются взаимоисключающими.

сохранить промежуточное состояние

Spark использует внешнюю службу перетасовки для сохранения промежуточного состояния записи каждого исполнителя. Эта служба представляет собой длительный процесс, который выполняется на каждом узле кластера. Если служба включена, исполнитель spark будет выполнять перетасовку записи и при чтении, записывать данные в службу и получать данные из службы. Это означает, что все данные тасования, записанные исполнителем, могут продолжать использоваться вне цикла объявления исполнителя.

Добавление роли промежуточного хранилища данных также меняет способ чтения и записи исполнителем.

Помимо записи файлов в случайном порядке, исполнители также сохраняют данные в памяти или на диске. При удалении исполнителя все кэшированные данные исчезают.

Данные, записанные службой перемешивания, и данные, сохраняемые исполнителем, — это не одно и то же?После того, как исполнитель будет удален / повесит трубку, постоянные данные исчезнут, а данные, сохраненные службой перемешивания, все еще будут существовать.

Динамическое выделение ресурсов в автономном режиме

  1. Установите для spark.shuffle.service.enabled значение true до запуска рабочего процесса.
  2. application
    --conf spark.dynamicAllocation.enabled=true \
    

Динамическое распределение ресурсов в режиме Mesos

  1. запустить на каждом узле$SPARK_HOME/sbin/start-mesos-shuffle-service.sh, и установитеspark.shuffle.service.enabledистинный
  2. application
    --conf spark.dynamicAllocation.enabled=true \
    

Динамическое распределение ресурсов в режиме пряжи

Служба тасования Yarn (внешняя служба тасования) должна быть настроена для сохранения файла записи исполнителя в случайном порядке, чтобы исполнитель можно было безопасно удалить.

  1. Добавить пакет банок
    будет$SPARK_HOME/libвнизspark-<version>-yarn-shuffle.jarпринять участие ввсеКлассический путь Nodemanager, то естьhadoop/yarn/libв каталоге
  2. Исправлятьyarn-site.xml
    <propert>
        <name>yarn.nodemanager.aux-services</name>
        <value>spark_shuffle</value>
        <!-- <value>mapreduce_shuffle</value> -->
    </property>
    <propert>
        <name>yarn.nodemanager.aux-services.spark_shuffle.class</name>
        <value>org.apache.spark.network.yarn.YarnShuffleService</value>
    </property>
    
  3. запустить искровое приложение
    --conf spark.shuffle.service.enabled=true \
    --conf spark.shuffle.service.port=7337 \
    --conf spark.dynamicAllocation.enabled=true \
    

видетьConfiguring the External Shuffle Service

Планирование нескольких заданий

Задание – это вычислительная единица, активируемая операцией искрового действия. В искровом задании одновременно может выполняться несколько параллельных заданий. .

ФИФО Планирование

По умолчанию искровое планирование используетFIFOспособ планирования нескольких заданий. Каждое задание будет разделено на несколько этапов, и первое задание получит приоритетный доступ ко всем доступным ресурсам и позволит выполнять задачи своего этапа, а затем второе задание получит право использовать ресурсы для И так далее

Справедливое планирование

В соответствии со стратегией справедливого распределения ресурсов Spark будет использовать метод циклического перебора для распределения ресурсов и выполнения задач нескольких заданий, поэтому все задания имеют в основном справедливую возможность использовать ресурсы кластера.

conf.set("spark.scheduler.mode", "FAIR")
--conf spark.scheduler.mode=FAIR

Пул ресурсов справедливого планирования

Справедливый планировщик также поддерживает разделение заданий на несколько групп и размещение их в нескольких пулах, а также установку различных приоритетов планирования для каждого пула. Эта функция полезна в ситуациях, когда важные и неважные задания выполняются изолированно, один пул может быть выделен для важных заданий и иметь более высокий приоритет, другой пул выделен для неважных заданий и иметь более низкий приоритет.

Установить в кодеsparkContext.setLocalProperty("spark.scheduler.pool", "poolName"), все в этомнитьВсе задания, представленные в системе, будут поступать в этот пул, а настройки сохраняются в единицах потоков.Одним и тем же потоком легко отправить все задания одного и того же пользователя в один и тот же пул ресурсов. Установите значение null, чтобы очистить пул.

По умолчанию каждый пул будет иметь одинаковый приоритет использования ресурсов кластера, но внутри каждого пула задания будут выполняться в режиме FIFO.

Свойства пула можно изменить через файл конфигурации

  1. schedulingMode: FIFO/FAIR, чтобы контролировать, будут ли задания в пуле поставлены в очередь или ресурсы в общем пуле
  2. вес: контролирует долю ресурсов, которые могут быть выделены пулу ресурсов по отношению к другим пулам ресурсов. По умолчанию вес всех пулов равен 1. Если вес пула ресурсов установлен на 2, то ресурсы в пуле ресурсов будут в два раза больше, чем в других пулах.Если для веса задано высокое значение, например 1000, может быть достигнуто планирование приоритетов между пулами ресурсов — пул ресурсов с весом = 1000 всегда может немедленно начать соответствующее задание.
  3. minShare: Минимальное значение выделения ресурсов (количество ЦП) для каждого пула ресурсов. Справедливый планировщик всегда будет сначала пытаться удовлетворить минимальное значение выделения ресурсов для всех активных пулов ресурсов, а затем выделять остальные ресурсы в соответствии с весом каждого ресурса пула. . Таким образом, свойство minShare гарантирует, что каждый пул ресурсов получит по крайней мере определенное количество ресурсов кластера. Значение по умолчанию minShare равно 0.

Адрес файла конфигурации по умолчаниюspark/conf/fairscheduler.xml, пользовательский файлconf.set("spark.scheduler.allocation.file", "/path/to/file")

<allocations>
  <pool name="production">
    <schedulingMode>FAIR</schedulingMode>
    <weight>1</weight>
    <minShare>2</minShare>
  </pool>
  <pool name="test">
    <schedulingMode>FIFO</schedulingMode>
    <weight>2</weight>
    <minShare>3</minShare>
  </pool>
</allocations>

Пулы ресурсов, не настроенные в файле конфигурации, будут использовать конфигурацию по умолчанию (schedulingMode: FIFO, weight: 1, minShare: 0).

Общие операторы Spark

union

union算子

  1. Новый rdd скопирует разделы двух старых rdd.
  2. Количество нового раздела RDD, то есть количество раздела старого и двух RDD

groupByKey

groupByKey
При выполнении оператора класса shuffle внутри оператора неявно создается несколько СДР, в основном как выражение некоторых промежуточных данных этой операции, и как граница разделения этапов.

reduceByKey

reduceByKey
reduceByKey VS groupByKey

  • разница
    reduceByKey, посередине есть MapPartitionsRDD, далокальная агрегация данныхПосле rdd это может уменьшить передачу данных по сети.

  • то же
    Процесс чтения и агрегации в основном аналогичен процессу groupByKey. Все ShuffledRDD читают в случайном порядке, а затем объединяют, чтобы получить окончательный rdd

distinct

distinct

  1. преобразовать каждое примитивное значение в кортеж
  2. Будет выполнена локальная агрегация (аналогично reduceByKey)
  3. преобразует кортеж обратно в одно значение в конце

cogroup

Оператор когруппы является основой других операторов, таких как операции соединения и пересечения.

cogroup

Сначала агрегируйте результаты по разделу RDD (hello,[(1,1),(1,1)]): первый (1,1) — результат helo-агрегации первого RDD, второй (1,1) ) — второй результат агрегации RDD Если в первом разделе первого RDD нет приветствия, то (1), а не (,1)

intersection

intersection

фильтр: отфильтровать ключи, любой набор которых пуст в двух наборах

join

join

  1. cogroup, объединяет ключи двух rdd
  2. flatMap, каждая часть агрегированных данных может возвращать несколько частей данных. Сделайте декартово произведение всех элементов двух наборов, соответствующих каждому ключу

sortByKey

sortByKey

  1. ShuffledRDD, чтение в случайном порядке, перенос того же ключа на http://ozijnir4t.bkt.clouddn.com/spark/learning/sortByKey.pngpartition
  2. mapPartitions, который глобально сортирует ключи в каждом разделе

cartesian

Декартово произведение

cartesian

coalesce

Обычно используется для уменьшения количества разделов

coalesce

repartition

оператор перераспределения = объединение (истина)

repartition

Операция перераспределения будет вычислять значение в середине сгенерированного неявного RDD для вычисления префикса в качестве ключа, а раздел в последней операции перетасовки помещается в TUPLE, соответствующий некоторым значениям ключа, завершая операцию области восстановления.

Точка знаний

Связь между количеством узлов, количеством разделов RDD, количеством ядер ЦП в кластере Spark и степенью параллелизма.

并行度数量关系

  1. Каждый файл содержит несколько блоков
  2. Когда Spark читает входной файл, он анализирует его в соответствии с InputFormat, соответствующим конкретному формату данных.Как правило, несколько блоков объединяются в один входной срез, который называется InputSplit.Обратите внимание, чтоInputSplit не может охватывать файлы
  3. InputSplit генерирует задачу
  4. Каждый Исполнитель состоит из нескольких ядер, и каждое ядро ​​каждого Исполнителя может одновременно выполнять только одну Задачу.
  5. После выполнения каждой задачи создается раздел целевого RDD.

Если количество разделов велико и ресурсы, которые могут запускать экземпляры, также велики, естественного параллелизма будет больше.
Если количество разделов невелико, а ресурсов много, количество задач недостаточно, и у него не будет много одновременных задач.
Если количество разделов велико, а ресурсов мало (например, ядро), то параллелизм невелик, и следующий пакет будет засчитан после одного пакета.

Task被执行的并发度 = Executor数目 * 每个Executor核数

Ядром здесь является виртуальное ядро, а не физическое ядро ​​ЦП машины, можно понимать как рабочий поток Исполнителя?
Количество ядер на одного исполнителя передаетсяspark.executor.coresнастройки параметров. Ядра здесь на самом деле относятся крабочий поток. Количество ядер, указанное в информации о процессоре, — это физические ядра (или количество физических ядер после того, как на обычной машине реализована технология Hyper-Threading*2), что не совпадает с количеством ядер в spark, но, вообще говоря, числом исполнителей. Количество ядер, настроенных искровым заданием, отличается. Должно превышать количество физических ядер машины.

количество разделов

  1. Данные считываются в фазе, напримерsc.textFile, сколько InputSplits разделит входной файл, потребует, сколько первоначальных Tasks
  2. Количество разделов на этапе карты остается неизменным
  3. Стадия сокращения, запуск полимеризации операции перемешивания RDD, номер раздела RDD после полимеризации с конкретными операциями, связанными, например, с операцией перераспределения, конвергентным синтетическим заданным количеством разделов, и некоторые операторы настраиваются.

использованная литература

  1. Подробно изучите реализацию Spark в случайном порядке
  2. Оптимизация производительности Spark: настройка ресурсов
  3. В кластере Spark взаимосвязь между количеством узлов в кластере, количеством разделов RDD, количеством ядер ЦП и степенью параллелизма.
  4. Spark Notes — перераспределение и объединение
  5. Подробное объяснение Sparksession Spark 2.0 серии 2.0
  6. Агрегация журнала пряжи, связанная с параметром
  7. Глубокое понимание Spark 2.1 Core (1): принцип и анализ исходного кода RDD
  8. Spark 2.0 от входа до мастерства
  9. искра 2.2.0 документация