Дополнение к некоторым небольшим вопросам об основах Spark

Spark

предисловие

Некоторые дополнения, основанные на знаниях, не упомянутых в двух предыдущих статьях.

3. Платформа вычислений в памяти Spark

3.1 Планирование задач Spark

  1. Сторона драйвера запускает основной метод клиента и строитSparkContextобъект, который последовательно создается внутри объекта SparkContextDAGSchedulerа такжеTaskScheduler
  2. Согласно ряду последовательностей операций RDD, для созданияDAG-ориентированный ациклический граф
  3. После того, как DAGScheduler получает DAG-ориентированный ациклический граф, он выполняется по широкой зависимостиstageразделение. Внутри каждого этапа есть много, которые могут работать параллельноtask, и, наконец, один за другим инкапсулируются в набор задач, а затем помещаютсяtaskSetОтправить в TaskScheduler
  4. После того, как TaskScheduler получает коллекцию taskSet, он по очереди проходит и извлекает каждую задачу, отправленную на рабочий узел.executorработает в процессе.
  5. Все задачи выполняются, и задача завершается

3.2 искровая перегородка

При секционировании данных RDD по умолчанию используется HashPartitioner. Эта функция хэширует ключ, а затем модулирует общее количество разделов. Если результат по модулю одинаков, он будет назначен одному и тому же разделу.)

Если вы считаете, что HashPartitioner имеет одну функцию, вы можете настроить разделитель.Настраиваемый разделитель примерно разделен на 3 этапа.

  1. Наследовать org.apache.spark.Partitioner
  2. Переопределить метод numPartitions
  3. Переопределить метод getPartition

3.3 Маленький корпус (с использованием Scala)

Мы хотим разделить в соответствии с длиной ключа RDD, и длина одного и того же ключа входит в тот же раздел (код относительно прост и не будет объясняться)

3.3.1 основной метод

//todo:使用自己实现的自定义分区
object TestPartitionerMain {

  def main(args: Array[String]): Unit = {
    //1、构建SparkConf
    val sparkConf: SparkConf = new SparkConf().setAppName("TestPartitionerMain").setMaster("local[2]")

    //2、构建SparkContext
    val sc = new SparkContext(sparkConf)
    sc.setLogLevel("warn")

    //3、构建数据源
    val data: RDD[String] = sc.parallelize(List("hadoop","hdfs","hive","spark","flume","kafka","flink","azkaban"))

    //4、获取每一个元素的长度,封装成一个元组
    val wordLengthRDD: RDD[(String, Int)] = data.map(x=>(x,x.length))

    //5、对应上面的rdd数据进行自定义分区
    val result: RDD[(String, Int)] = wordLengthRDD.partitionBy(new MyPartitioner(3))

    //6、保存结果数据到文件
    result.saveAsTextFile("./data")

    sc.stop()

  }
}

3.3.2 Пользовательский раздел MyPartitioner

//自定义分区
class MyPartitioner(num:Int) extends Partitioner{
  //指定rdd的总的分区数
  override def numPartitions: Int = {
    num
  }

  //消息按照key的某种规则进入到指定的分区号中
  override def getPartition(key: Any): Int ={
    //这里的key就是单词
    val length: Int = key.toString.length

    length match {
      case 4 =>0
      case 5 =>1
      case 6 =>2
      case _ =>0
}

} }

3.4 Общие переменные Spark

3.4.1 Переменная вещания Spark (переменная вещания)

Распределенный код выполнения в Spark необходимо передать задаче каждого исполнителя для запуска. Для некоторых данных только для чтения и фиксированных данных (например, данных, считанных из БД) Драйвер необходимо каждый раз транслировать в каждую Задачу, что неэффективно.

Широковещательные переменные позволяют передавать переменные отдельным Исполнителям. Затем каждая задача на исполнителе получает переменные от диспетчера блоков узла, на котором она расположена, вместо получения переменных от драйвера, чтобы снизить затраты на связь и использование памяти, тем самым повысив эффективность.

3.4.2 Схематическая диаграмма широковещательных переменных

Эта схема проста для понимания.На самом деле, это слово является данными, требуемыми задачами.Мы отправим слово исполнителю.После того, как задача получит данные слова, она может напрямую найти соответствующую ссылку в памяти .

3.4.3 Использование широковещательных переменных

  1. Объект Broadcast[T] создается путем вызова SparkContext.broadcast для объекта типа T. Любой сериализуемый тип может сделать это
  2. Доступ к значению объекта через свойство value
  3. Переменная будет отправлена ​​каждому узлу только один раз и должна рассматриваться как значение только для чтения (изменение этого значения не повлияет на другие узлы).

3,4.4 Простой пример кода

Пример кода без широковещательных переменных

//这里的word单词为在每一个task中进行传输
val conf = new SparkConf().setMaster("local[2]").setAppName("brocast")
val rdd1=sc.textFile("/words.txt")
val word="spark"
val rdd2=rdd1.flatMap(_.split(" ")).filter(x=>x.equals(word))
rdd2.foreach(x=>println(x))

Пример кода с использованием широковещательных переменных

val conf = new SparkConf().setMaster("local[2]").setAppName("brocast")
val sc=new SparkContext(conf)
val rdd1=sc.textFile("/words.txt")
val word="spark"
//通过调用sparkContext对象的broadcast方法把数据广播出去
val broadCast = sc.broadcast(word)

//在executor中通过调用广播变量的value属性获取广播变量的值
val rdd2=rdd1.flatMap(_.split(" ")).filter(x=>x.equals(broadCast.value))
rdd2.foreach(x=>println(x))

3.4.5 Меры предосторожности при использовании широковещательных переменных

  1. Невозможно транслировать RDD с использованием широковещательных переменных

  2. Широковещательные переменные могут быть определены только на стороне драйвера, а не на стороне исполнителя.

  3. Значение широковещательной переменной может быть изменено на стороне драйвера, но не может быть изменено на стороне исполнителя.

  4. Если на стороне исполнителя используется переменная драйвера, а широковещательная переменная не используется, копий переменной драйвера столько, сколько задач в исполнителе.

  5. Если на стороне Исполнителя используется переменная Драйвера, если используется широковещательная переменная, в каждом Исполнителе есть только одна копия переменной стороны Драйвера.

3.4.6 Аккумулятор искры

Аккумулятор — это механизм распределенных переменных, представленный в Spark.Его принцип аналогичен MapReduce, то есть распределяет изменения, а затем агрегирует эти изменения. Обычно это используется для подсчета событий во время выполнения задания во время отладки. Аккумуляторы можно использовать для глобальных подсчетов.

Как использовать

  1. Создайте аккумулятор с начальным значением, вызвав метод ==SparkContext.accumulator(initialValue) == в драйвере. Возвращаемое значение — это объект org.apache.spark.Accumulator[T], где T — это тип initialValue.
  2. Код исполнителя в замыкании искры (сериализация функции) может использовать метод добавления аккумулятора для увеличения значения аккумулятора.
  3. Программа-драйвер может вызвать свойство value аккумулятора, чтобы получить доступ к значению аккумулятора.

3.5 Сериализация программ Spark

3.5.1 Почему операции преобразования нуждаются в сериализации

Spark — это механизм распределенного выполнения, а его основная абстракция — эластичный распределенный набор данных RDD, который представляет данные, распределенные по разным узлам. Вычисление Spark выполняется распределенным образом на исполнителе, поэтому операции преобразования (замыкания), такие как map, flatMap, reduceByKey и другие RDD, разработанные пользователями, имеют следующий процесс выполнения:

  1. Объекты в коде сериализуются локально в драйвере
  2. Объект сериализуется и передается на удаленный узел-исполнитель
  3. Узел удаленного исполнителя десериализует объект
  4. Окончательное выполнение удаленного узла

Следовательно, если объект необходимо сериализовать и передать по сети во время выполнения, он должен пройти процесс сериализации.

3.5.2 Исключение сериализации задач Spark

При написании искровых программ внешние переменные и функции используются внутри таких операторов, как map и foreachPartition, что приводит к проблеме несериализации задачи. Однако использование оператором искры внешних переменных в процессе расчета действительно во многих случаях неизбежно, например, оператор фильтра фильтрует по заданным извне условиям, а карта преобразуется по соответствующей конфигурации.

Например, возникает ошибка «org.apache.spark.SparkException: Task not serializable»: причина в том, что эти операторы используют внешние переменные, но эту переменную нельзя сериализовать. Текущий класс использует объявление "extends Serializable" для поддержки сериализации, но поскольку некоторые поля не поддерживают сериализацию, это все равно вызовет проблемы при сериализации всего класса и в конечном итоге приведет к проблеме десериализации задачи.

3.5.3 Решение для сериализации в spark

  1. Если объект этого класса используется в функции, класс должен быть сериализован
  2. Если в функции используются переменные-члены объекта этого класса, все переменные-члены класса должны быть сериализованы, кроме сериализации.
  3. Используйте аннотацию «@transient» для несериализуемых переменных-членов, чтобы сообщить компилятору, что сериализация не требуется.
  4. Вы также можете независимо поместить зависимые переменные в небольшой класс, чтобы этот класс поддерживал сериализацию, что может уменьшить объем сетевой передачи и повысить эффективность.
  5. Создание объекта может быть встроено непосредственно в эту функцию, чтобы избежать необходимости сериализации.

3.6 Spark on Yarn

Вы можете отправить программу spark в пряжу для запуска.В это время вычислительные ресурсы, необходимые для задачи spark, выделяются боссом ResourceManager в пряже.

Информационный адрес официального сайта:spark.apache.org/docs/2.3.3/… spark-env.sh

В соответствии с различными методами распространения драйверов в приложениях Spark, Spark на YARN имеет два режима: режим пряжи-клиента и режим пряжи-кластера.

3.6.1 режим пряжи-кластера

Пример отправки задач в режиме yarn-cluster

spark-submit --class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode cluster \
--driver-memory 1g \
--executor-memory 1g \
--executor-cores 1 \
/opt/spark/examples/jars/spark-examples_2.11-2.3.3.jar \
10

На этом этапе обратите внимание, что если в операции возникает ошибка, возможно, недостаточно виртуальной памяти, и вы можете добавить параметры

<!--容器是否会执行物理内存限制默认为True-->
<property>
    <name>yarn.nodemanager.pmem-check-enabled</name>
    <value>false</value>
</property>

<!--容器是否会执行虚拟内存限制    默认为True-->
<property>
    <name>yarn.nodemanager.vmem-check-enabled</name>
    <value>false</value>
</property>

3.6.2 режим пряжи-клиента

spark-submit --class org.apache.spark.examples.SparkPi
--master yarn
--deploy-mode client
--driver-memory 1g
--executor-memory 1g
--executor-cores 1
/opt/spark/examples/jars/spark-examples_2.11-2.3.3.jar
10

3.6.3 Схема

пряжа-кластер:

пряжа-клиент:

Их отличия:

Режим пряжи-кластера: программа-драйвер программы spark запускается в YARN, результаты выполнения не могут отображаться на клиенте, а клиент может исчезнуть после запуска приложения. Лучше запускать те, которые со временем сохранят результаты на внешнем носителе (например, HDFS, Redis, Mysql), а клиентский терминал будет отображать только простой статус задания как задание YARN.

Режим yarn-client: ==Драйвер программы spark запускается на клиенте==, и результаты работы приложения будут отображаться на клиенте, все это подходит для приложений с выводом результатов (например, spark-shell)

Самая большая разница в том, что расположение стороны водителя отличается.

пряжа-кластер: сторона драйвера работает в пряже кластера, вместе с процессом ApplicationMaster.

Yarn-Client: сторона драйвера работает на клиенте, который представляет задачу, она не имеет ничего общего с процессом ApplicationMaster, и часто используется для тестирования

3.7 Проблемы с операциями оператора сбора

Функция оператора сбора состоит в том, чтобы действовать как операция действия, которая запускает выполнение задачи, собирает данные RDD и возвращает их стороне драйвера в виде массива.

Уведомление:

Размер памяти драйвера по умолчанию составляет 1 ГБ, который задается параметром spark.driver.memory.

Если объем данных rdd превышает память 1G по умолчанию на стороне драйвера, а операция сбора вызывается на rdd, на стороне драйвера произойдет переполнение памяти.Все эти операции сбора имеют определенные риски, и фактическая разработка код обычно не используется.

На реальных предприятиях этот параметр обычно увеличивается, например, 5G/10G и т. д., и этот параметр можно изменить с помощью new SparkConf().set("spark.driver.memory","5G")

3.8 Анализ параметров ресурсов в задачах Spark

executor-memory: указывает размер памяти, необходимый для каждого процесса-исполнителя, который определяет скорость более поздних данных операции.

total-executor-cores: указывает общее количество ядер ЦП, необходимых для выполнения задачи, что определяет степень детализации параллельного выполнения задачи.

Для оптимизации искровой программы на более позднем этапе вы можете начать с этих двух параметров.Независимо от того, какой параметр вы увеличите, эффективность программы будет в определенной степени повышена. Увеличение вычислительных ресурсов — самый прямой и эффективный метод оптимизации. В случае ограниченных вычислительных ресурсов можно учитывать другие аспекты, такие как уровень кода, уровень JVM и т. д.

finally

···