предисловие
1. Анализ принципа тасования
1.1 Обзор перемешивания
Перетасовка предназначена для реорганизации данных Из-за характеристик и требований распределенных вычислений детали реализации более громоздки и сложны.
В структуре MapReduce Shuffle является мостом между Map и Reduce.Этап Map считывает данные посредством перемешивания и выводит их в соответствующий Reduce, а этап Reduce отвечает за извлечение данных из Map и выполнение вычислений. В течение всего процесса тасования он часто сопровождается большим объемом дисковых и сетевых операций ввода-вывода. Следовательно, производительность тасования также напрямую определяет производительность всей программы. Spark также будет иметь собственный процесс реализации в случайном порядке.
1.2 Введение в перемешивание в Spark
В процессе планирования DAGStageРазделение этапов основано на том, есть ли процесс тасования, то есть наличиеширокая зависимостьКогда вам нужно перетасовать, это время будетjobРазделенный на несколько этапов, каждый этап имеет множество вещей, которые могут выполняться параллельно.Task.
Процесс между этапом и этапом называется этапом перемешивания.В Spark компоненты, отвечающие за выполнение, расчет и обработку процесса перемешивания, в основномShuffleManager. С развитием Spark ShuffleManager можно реализовать двумя способами, а именно:HashShuffleManagerа такжеSortShuffleManager, так что у искры Shuffle естьHash Shuffleа такжеSort Shuffleдва вида.
1.3 Механизм HashShuffle
1.3.1 Введение в HashShuffle
существуетSpark 1.2Раньше механизм вычислений в случайном порядке по умолчанию былHashShuffleManager.
HashShuffleManagerЕсть очень серьезный недостаток, то есть будет генерироваться большое количество промежуточных дисковых файлов, и тогда на производительность будет влиять большое количество дисковых операций ввода-вывода. Таким образом, в Spark 1.2 и более поздних версиях по умолчаниюShuffleManagerизменился наSortShuffleManager.
SortShuffleManagerПо сравнению с HashShuffleManager есть определенные улучшения. Основная причина заключается в том, что когда каждая Задача выполняет операцию перемешивания, хотя и будет создано больше временных файлов на диске, все временные файлы в конце будут объединены в один файл на диске, поэтому каждая Задача имеет только один файл на диске. Когда задача чтения в случайном порядке на следующем этапе извлекает свои собственные данные, ей нужно только прочитать часть данных в каждом файле на диске в соответствии с индексом.
Перетасовка хешей — это перетасовка без сортировки.
1.3.2 Перетасовка хэшей общего механизма
Операционный механизм HashShuffleManager в основном делится на два типа: один — обычный операционный механизм, а другой — комбинированный операционный механизм, и комбинированный механизм в основном оптимизирует количество небольших файлов, созданных в процессе перемешивания, за счет повторного использования буферов.
Кратко поясните ситуацию. На данный момент задача разделена на два этапа, первый этап имеет 4 MapTask вверху, а второй этап имеет 3 ReduceTask, но если наша текущая MapTask увеличится до 1000, то файл блока, который мы сгенерируем, не будет так много MapTask * 3, в настоящее время большое количество операций ввода-вывода вызовет большие проблемы с производительностью
1.3.3 Подробное описание этапов перемешивания хэшей по общему механизму
Здесь мы сначала разъясняем посылку:Каждый Executor имеет только 1 ядро процессора, то есть независимо от того, сколько потоков задач выделено этому Executor, одновременно может выполняться только один поток задач..
На рисунке представлены 3 задачи ReduceTask. Начиная с ShuffleMapTask, каждая из них выполняет вычисление хэша (разделитель: hash/numReduce по модулю) и классифицирует их по 3 различным категориям. Каждая задача ShuffleMapTask делится на 3 категории данных. Различные данные агрегируются, а затем вычисляется окончательный результат.Поэтому ReduceTask будет собирать данные, относящиеся к своей собственной категории, и объединять их в большой набор той же категории.Каждый ShuffleMapTask выводит 3 локальных файла.Здесь 4 ShuffleMapTasks, поэтому всего 4 x 3 классифицированных файла = было выведено 12 локальных небольших файлов.
Shuffle Writeсцена:
Основная причина заключается в том, что после того, как этап завершает вычисление, для следующего этапа могут выполняться операции, подобные перетасовке (такие как reduceByKey, groupByKey), а данные, обрабатываемые каждой задачей, разделяются по ключу. Так называемое «разделение» заключается в выполнении алгоритма хеширования для одного и того же ключа, так что один и тот же ключ записывается в один и тот же файл на диске, и каждый файл на диске принадлежит только одной задаче этапа на стороне сокращения. Перед записью данных на диск данные сначала записываются в буфер памяти, а когда буфер памяти заполняется, они переполняются в файл на диске.
Итак, сколько дисковых файлов должно быть создано для следующего этапа для каждой задачи, которая выполняет Shuffle Write?Это очень просто, сколько задач находится на следующем этапе, и сколько дисковых файлов создается для каждой задачи на текущем этапе. Например, на следующем этапе всего 100 задач, тогда каждая задача текущего этапа должна создать 100 файлов на диске. Если на текущем этапе 50 задач, всего 10 Исполнителей, и каждый Исполнитель выполняет 5 Задач, то всего на каждом Исполнителе будет создано 500 дисковых файлов, а на всех Исполнителях будет создано 5000 дисковых файлов. Можно видеть, что количество дисковых файлов, сгенерированных неоптимизированной операцией случайной записи, чрезвычайно ошеломляет.
Shuffle Readсцена:
Случайное чтение обычно выполняется в начале этапа. В это время каждой задаче этапа нужно подтянуть все одинаковые ключи в результатах вычислений предыдущего этапа от каждого узла через сеть к узлу, где он находится, а затем выполнить операции агрегации ключей или соединения. В процессе случайной записи задача создает дисковый файл для каждой задачи этапа на стороне Reduce, поэтому в процессе случайного чтения каждой задаче нужно только вытащить свой с узлов, где находятся все задачи вышестоящего stage находятся на диске.
Процесс извлечения случайного чтения заключается в агрегировании во время извлечения. Каждая задача чтения в случайном порядке будет иметь свой собственный буферный буфер, и каждый раз она может извлекать данные только того же размера, что и буферный буфер, а затем выполнять агрегацию и другие операции через Map в памяти. После агрегирования пакета данных извлекается следующий пакет данных и помещается в буфер для операций агрегирования. И так далее, пока наконец не будут подтянуты все данные и не будет получен окончательный результат.
Уведомление:
Буфер играет роль кэша.Кэш может ускорить запись на диск и повысить эффективность вычислений.Размер буфера по умолчанию 32k.
Partitioner: По модулю hash/numRecue определяется, что данные обрабатываются несколькими Reduces, а также определяется запись в несколько буферов
блочный файл: файл на маленьком диске, из рисунка мы можем узнать формулу расчета количества файлов на маленьком диске: блочный файл=M*R. M — количество задач карты, а R — количество Reduces.Как правило, количество Reduces равно количеству буферов, которые определяются разделителем.
Проблемы с общим механизмом Hash shuffle:
На этапе Shuffle на диске будет создано большое количество небольших файлов, а также увеличится количество раз установления связи и извлечения данных.В это время будет создано большое количество трудоемких и неэффективных операций ввода-вывода. (потому что создается слишком много маленьких файлов)
Это может привести к OOM, большому количеству трудоемких и неэффективных операций ввода-вывода, что приводит к слишком большому количеству объектов при записи на диск и слишком большому количеству объектов при чтении с диска.Эти объекты хранятся в куче памяти, что приведет к нехватке кучи памяти, что соответственно приведет к частому GC, GC вызывает OOM. Поскольку в памяти необходимо хранить большое количество дескрипторов файловых операций и временной информации, если масштаб обработки данных относительно велик, память не выдержит, и возникнут такие проблемы, как OOM.
1.3.4 Перетасовка хэшей механизма слияния
Механизм слияния заключается в повторном использовании буферного буфера, а конфигурация для включения механизма слияния — spark.shuffle.consolidateFiles. Значение по умолчанию для этого параметра — false, и установка его в true включает механизм оптимизации. Вообще говоря, если мы используем HashShuffleManager, рекомендуется включить эту опцию.
Здесь 6 shuffleMapTasks.Категория данных по-прежнему делится на 3 типа, потому что алгоритм Hash будет классифицироваться в соответствии с вашим Key.В одном и том же процессе, сколько бы ни было Tasks, один и тот же Key будет помещен в один и тот же в буфере, а затем записать данные из буфера в локальный файл с числом ядер в качестве единицы (у ядра есть только один тип ключевых данных), а в процессе, где находится каждая задача, записывается в общий процесс 3 локальных файла, здесь 6 shuffleMapTasks, поэтому общий вывод 2 ядра x 3 классифицированных файла = 6 небольших локальных файлов.
В настоящее время файл блока = Core * R, Core — это количество ядер ЦП, а R — количество редукторов.Однако, если на стороне редуктора слишком много параллельных задач или фрагментов данных, задача Core * Reducer по-прежнему будет выполняться. быть слишком большим, и будет создано много мелких задач.
1.4 Sort shuffle
Рабочий механизм SortShuffleManager в основном делится на два типа:нормальный рабочий механизма такжеобходной механизм
1.4.1 Общий механизм сортировки в случайном порядке
В этом режиме данные сначала будут записаны в структуру данных, а оператор агрегации будет записан в Map, при этом локальная агрегация через Map будет записана в память за один проход. Оператор Join записывается в ArrayList прямо в память. Затем необходимо судить о том, достигнут ли порог (5M), при достижении которого данные структуры данных памяти будут записаны на диск, а структура данных памяти будет очищена.
Перед переполнением на диск выполните сортировку по ключу, и отсортированные данные будут записываться в файл на диск пакетами. Пакет по умолчанию — 10 000, и данные будут записываться в файл на диск партиями по 10 000. Запись файлов на диск выполняется путем переполнения буфера, и каждое переполнение создает файл на диске, то есть процесс задачи создает несколько временных файлов.
Наконец, в каждой задаче все временные файлы объединяются.Это процесс слияния.Этот процесс считывает все временные файлы и записывает их в окончательный файл один раз. Это означает, что все данные задачи находятся в этом одном файле. В то же время записывается отдельный файл индекса для определения смещения начала индекса и смещения конца данных каждой нижестоящей задачи в файле (например, для wordCount, где индекс идет от (начало смещения) до места (конец смещение) это слово).
Преимущества этого механизма:
- Маленьких файлов значительно меньше, а задача генерирует только один файл
- Файлы файлов в целом в порядке, а с помощью индексного файла поиск становится быстрее.Хотя сортировка немного снижает производительность, поиск становится намного быстрее.
1.4.2 sortShuffle в обходном режиме
Условием работы механизма обхода является то, что количество задач тасования карт меньше значения параметра spark.shuffle.sort.bypassMergeThreshold (значение по умолчанию — 200), и это не агрегированный оператор тасования (такой как reduceByKey )
количество shuffleMapTaskменьше, чем значение по умолчанию 200При включенном sortShuffle в обходном режиме сортировка не выполняется, причина в том, что объем самих данных относительно невелик, и нет необходимости выполнять полную сортировку, поскольку объем данных мал, сама скорость запроса fast, который просто экономит часть накладных расходов на производительность.
1.5 Используемые параметры
1.5.1 spark.shuffle.file.buffer
Размер буфера по умолчанию — 32 КБ. Чтобы уменьшить количество операций записи при переполнении диска, можно соответствующим образом настроить размер этого значения. Уменьшить дисковый ввод-вывод
1.5.2 spark.reducer.MaxSizeFlight
Размер объема данных, получаемых с помощью ReduceTask, по умолчанию составляет 48 МБ.
1.5.3 spark.shuffle.memoryFraction
Доля памяти агрегата в случайном порядке, размер доли памяти исполнителя
1.5.4 spark.shuffle.io.maxRetries
Количество попыток извлечения данных для предотвращения влияния дрожания сети.
1.5.5 spark.shuffle.io.retryWait
Отрегулируйте интервал повторных попыток, сколько времени потребуется для повторного извлечения после сбоя извлечения.
1.5.6 spark.shuffle.consolidateFiles
Механизм слияния для HashShuffle
1.5.7 spark.shuffle.sort.bypassMergeThreshold
Механизм обхода SortShuffle, по умолчанию 200 раз
1.5.8 spark.sql.shuffle.partitions
По умолчанию 200, количество разделов, используемых в случайном порядке, то есть часть-00000, часть-00001 и т. д., которые вы создаете, может быть не более части-00199.
finally
···