Анализ раздела Spark RDD раздела данных Spark

Spark

Эта статья подготовлена ​​технической группой OPPO Internet и является второй статьей в серии «Анализ разделов данных Spark», посвященной анализу разделов данных Spark RDD. Эта серия разделена на 3 статьи, пожалуйста, продолжайте обращать внимание.

Пожалуйста, обратите внимание на автора для перепечатки и приглашаем обратить внимание на общедоступный аккаунт команды интернет-технологий OPPO: OPPO_tech, чтобы вместе делиться передовыми интернет-технологиями и деятельностью OPPO.

Spark

Мы берем Spark on Yarn в качестве примера, чтобы проиллюстрировать принцип работы Spark.

Шаги выполнения задачи

1. Клиент подает заявку на РМ, а РМ определяет, соответствуют ли ресурсы кластера требованиям;

2. RM выбирает NodeManager в кластере для запуска Application Master (кластерный режим);

3. Драйвер запускает процесс в узле NodeManager, где находится AM;

4. AM обращается за ресурсами к ResourceManager и запускает соответствующие исполнители на каждом NodeManager;

5. Драйвер начинает планировать задачи и формирует диаграмму кровных отношений RDD, то есть диаграмму DAG, с помощью операции Transaction и, наконец, запускает задание и планирует выполнение с помощью вызова Action;

6. DAGScheduler отвечает за планирование на уровне этапов, в основном разделяя DAG на несколько этапов, упаковывая каждый этап в набор задач и отправляя его в TaskScheduler для планирования;

7. TaskScheduler отвечает за планирование на уровне задач и распределяет набор задач, предоставленный DAGScheduler, исполнителю для выполнения в соответствии с указанной политикой планирования;

Spark RDD

Эластичный распределенный набор данных RDD, RDD содержит 5 функций

1.Compute:

RDD основан на разделах во время расчета задачи, а данные каждого сегмента получаются посредством вычислений.Различные подклассы RDD могут реализовывать свои собственные методы вычислений;

2.getPartitions:

Вычислите, чтобы получить список всех разделов.RDD представляет собой набор разделов.RDD имеет один или несколько разделов.Количество разделов определяет параллелизм задач Spark;

3.getDependencies:

Получите зависимости RDD, у каждого RDD есть зависимости (зависимости исходного RDD пусты), и эти зависимости становятся родословными;

4.getPreferredLocations:

Список зависимостей от других RDD. Когда Spark планирует задачи, он попытается распределить задачи на компьютер, на котором находятся данные, избегая тем самым передачи данных между компьютерами. Метод получения RDD предпочтительного местоположения — getPreferredLocations, задействует внешний. Существует приоритетная позиция при чтении данных в структуре хранения, например HadoopRDD, ShuffleRDD;

5. Разделитель:

Определяет, какому разделу назначены данные.Для RDD типа не ключ-значение Partitioner имеет значение None.Для RDD типа ключ-значение Partitioner по умолчанию имеет значение HashPartitioner. При выполнении операций тасования, таких как reduceByKey, sortByKey, Partitioner определяет, как данные в соответствующем разделе отображаются при выводе родительского тасования RDD;

Первые две функции обязательны для всех RDD, а последние три являются необязательными.Все RDD наследуют этот интерфейс.

Spark Partition

Из-за большого объема данных в СДР для облегчения вычислений необходимо разделить СДР и хранить его в разделах каждого узла, чтобы при выполнении различных вычислительных операций над СДР данные в каждом раздел на самом деле рассчитывается выполнять параллельные операции.

То есть часть необработанных данных для обработки будет разделена на несколько частей в соответствии с соответствующей логикой.Каждая часть данных соответствует разделу RDD.Количество разделов определяет количество задач и влияет на параллелизм программы. , Разделы связаны, то есть говорят, что каждый RDD имеет соответствующую реализацию раздела.

HadoopRDD

Spark часто требуется читать файлы из hdfs для создания RDD, а затем выполнять вычислительный анализ. Этот RDD, созданный путем чтения файлов из hdfs, называется HadoopRDD.

HadoopRDD в основном переписывает три метода интерфейса RDD:

  1. override def getPartitions: Array[Partition]
  2. override def compute(theSplit: Partition, context: TaskContext): InterruptibleIterator[(K, V)]
  3. override def getPreferredLocations(split:Partition): Seq[String]

Логика для определения количества разделов находится в getPartitions, который фактически вызывает InputFormat.getSplits,

InputFormat — это интерфейс: org.apache.hadoop.mapred.InputFormat, где getInputSplit показан на рисунке 7.

Из анализа исходного кода видно, что в сценарии HadoopRDD количество разделов RDD было определено до создания RDD, что определяется параметрами HADOOP, Мы можем настроить его:

spark.hadoop.mapreduce.input.fileinputformat.split.minsize; spark.hadoop.mapreduce.input.fileinputformat.split.maxsize;

Чтобы настроить размер количества разделов Hadooprdd.

Разделение в Spark SQL

Наконец, Spark SQL преобразует операторы SQL через деревья логических операторов в деревья физических операторов.

В физическом дереве операторов узел SparkPlan листового типа отвечает за создание RDD с нуля.Каждый узел SparkPlan нелистового типа эквивалентен выполнению преобразования в RDD, то есть преобразованию его в новый путем вызова функцию execute().RDD и, наконец, выполнить операцию collect(), чтобы инициировать вычисление и вернуть результат пользователю.

Сосредоточьтесь на анализе конечных узлов:

В Spark SQL SparkPlan типа LeafExecNode отвечает за создание начального RDD.

HiveTableScanExec будет напрямую генерировать HadoopRDD на основе информации HDFS, хранящейся в таблице данных Hive; FileSourceScanExec будет генерировать FileScanRDD на основе исходного файла, в котором находится таблица данных.

При чтении и записи таблиц Parquet в хранилище метаданных Hive метод преобразования контролируется spark.sql.hive.convertMetastoreParquet .

По умолчанию имеет значение true, если установлено значение true

Будет использовано :

org.apache.spark.sql.execution.FileSourceScanExec,

В противном случае используйте:

org.apache.spark.sql.hive.execution.HiveTableScanExec

В настоящее время FileSourceScanExec включает создание RDD таблицы сегментов и таблицы RDD без сегментов.В любом случае окончательный сгенерированный файл представляет собой FileRDD.

RDD, который создает таблицу без сегментов, проанализирован ниже.

Метод getPartition FileRDD:

override protected def getPartitions: Array[RDDPartition] = filePartitions.toArray

Для получения maxSplitBytes определяющим фактором являются следующие три параметра:

В заключение:

Если вы хотите увеличить значение maxSplitBytes, то есть количество разделов станет меньше.

Увеличивая значение defaultMaxSplitBytes,

Это spark.sql.files.maxPartitionBytes,

Также увеличьте spark.sql.files.openCostInBytes;

Если вы хотите уменьшить значение maxSplitBytes, то есть количество разделов станет больше.

Значение defaultMaxSplitBytes можно уменьшить,

Это spark.sql.files.maxPartitionBytes,

Уменьшите также spark.sql.files.openCostInBytes.

Далее анализируется RDD FileSourceScanExec для создания таблицы сегментов.

При анализе исходного кода между количеством разделов в таблице сегментов и количеством сегментов существует взаимосвязь один к одному.

HiveTableScanExec

HiveTableScanExec будет напрямую генерировать HadoopRDD на основе информации HDFS, хранящейся в таблице данных Hive.

Обычно HiveTableScanExec разбит на разделы по количеству файлов, размеру.

Например:

Чтение данных размером 2048M, размер блока hdfs установлен на 128M

  1. Каталог имеет 1000 небольших файлов

Ответ: будет создано 1000 разделов.

  1. Если есть только 1 файл,

Ответ: будет создано 16 разделов.

  1. Если есть большой файл 1024 м, остальные 999 файлов находятся на 1024 м

Ответ: будет создано 1007 разделов.

Для настройки типа HiveTableScanExec обратитесь к HadoopRDD.

RDD transformation

RDD преобразуется в другой RDD путем преобразования, так каково же количество вновь созданных разделов?

  1. filter(), map(), flatMap(), distinct()

Количество разделов равно количеству родительских RDD.

  1. rdd.union(other_rdd)

Количество разделов равно rdd_size + other_rdd_size

  1. rdd.intersection(other_rdd)

Количество разделов равно max(rdd_size, other_rdd_size)

  1. rdd.subtract(other_rdd)

Количество разделов равно rdd_size

  1. rdd.cartesian(other_rdd)

Количество разделов равно rdd_size * other_rdd_size

Объединение и перераспределение RDD

Иногда необходимо сбросить количество разделов RDD.Например, в разделах RDD много разделов RDD, но число каждого RDD относительно невелико.Увеличение количества разделов может увеличить параллелизм задач, но может причиной объема данных в каждом разделе.Если он слишком мал, объем данных раздела слишком мал, так что доля времени связи между узлами во всем времени выполнения задачи увеличивается, поэтому необходимо установить более разумный раздел.

Существует два способа сбросить раздел RDD: метод «coalesce()» и «repartition()» соответственно.

Перераспределение — это особый случай, когда перетасовка выполняется в функции объединения.

В распределенных вычислениях каждый узел вычисляет только часть данных, то есть обрабатывает только один шард.Если вы хотите получить все данные, соответствующие ключу, например, reduceByKey и groupByKey, вам нужно вытащить данные того же ключа to В том же разделе данные исходного раздела необходимо перетасовать и реорганизовать.Этот процесс переразметки данных по определенным правилам называется Shuffle.

Shuffle — это мост между Map и Reduce, описывающий процесс передачи данных из Map в Reduce.

Дополнительные перетасовки полезны при увеличении параллелизма. Например, если некоторые файлы в данных являются неделимыми, раздел, соответствующий большому файлу, будет иметь большое количество записей вместо того, чтобы распределять данные по как можно большему количеству разделов, чтобы использовать все заявленные ЦП. В этом случае использование Reparition для повторного создания большего количества разделов для обеспечения параллелизма, требуемого более поздними операторами преобразования, значительно повысит производительность.

Проанализируйте исходный код функции объединения

Shuffle = true

Данные хешируются на основе количества новых разделов, выходной раздел выбирается случайным образом, а данные входного раздела

вывод в выходной раздел.

Shuffle = false

Проанализируйте метод getPartitions исходного кода CoalescedRDD.

Роль PartitionCoalescer:

  1. Убедитесь, что каждый раздел CoalescedRDD в основном соответствует одному и тому же количеству родительских разделов RDD;

  2. Попробуйте работать с каждым разделом Coalescedrdd со своим родительским RDD. Например, раздел 1 COALESCEDRDD соответствует его 10 разделу от 1 до 10 родительского RDD, но от 1 до 7 Эти семь разделов на узле 1.1.1.1, тогда узел, к которому объединен раздел 1, равен 1.1.1.1. Целью этого является сокращение передачи данных между узлами, повышение вычислительной мощности;

  3. Разделы CoalescedRDD максимально распределяются по разным узлам для выполнения; для конкретной реализации обратитесь к классу DefaultPartitionCoalescer.

Ниже приведен пример анализа Repartition и Coalesce.

Предполагая, что исходный RDD имеет N разделов, его необходимо повторно разделить на M разделов.

1. Реализация переразметки:

если N

Как правило, разделы N имеют неравномерное распределение данных.Используйте функцию HashPartitioner, чтобы переразбить данные на разделы M, затем вам нужно установить в случайном порядке значение true;

2. Реализация объединения:

если Н>М

  1. N и M аналогичны (если N равно 1000, M равно 100), то несколько разделов в N разделах могут быть объединены в новый раздел и, наконец, объединены в M разделов, тогда для перемешивания можно установить значение false ;

  2. Если M>N, объединение недопустимо, процесс перемешивания не выполняется, и между родительским RDD и дочерним RDD существует узкая зависимость, которая не может увеличить количество файлов (разделов). Короче говоря, если перемешивание ложно, если входящий параметр больше, чем существующее количество разделов, количество разделов RDD останется неизменным, то есть количество разделов RDD не может быть увеличено без перемешивания;

  3. Если N>M и разница между ними очень велика, то это зависит от соотношения между количеством исполнителей и создаваемым разделом.Если количество исполнителей

Эпилог

Spark, как один из наиболее широко используемых вычислительных механизмов больших данных, широко используется в аналитиках данных и известен своей высокой скоростью обработки.Благодаря приведенному выше анализу мы можем использовать разумные вычислительные ресурсы, включая ЦП, память, исполнители для выполнения вычислительных задач. , что делает наш кластер более эффективным и позволяет получить больше результатов задач при том же сценарии вычислительных ресурсов.