Эта статья расскажет вам обо всех аспектах настройки Spark Core.

Spark

предисловие

Примечания к этой статье

  1. Прежде чем читать эту статью, вы можете поискать на Baidu десять принципов разработки программ Spark.
  2. Хоть статья и очень длинная, содержание не скучное, и это вся галуна во время интервью (думаю 🤣) можно есть в сочетании с каталогом на стороне ПК, а можно сразу перейти к части контент, который вы хотите
  3. Рисунки — очень важная и самая ценная часть статьи. Если это не очень важно, я обычно не рисую сам.
  4. Эта статья будет в значительной степени опираться на содержание статей Meituan и официальную информацию Spark для иллюстрации, а также будет сочетать собственное понимание автора.
  5. Часть перекоса данных тесно связана с настройкой Spark Streaming.

1. Кратко опишите десять принципов разработки Spark

Здесь мы кратко упомянем о них и не будем подробно раскрывать их подробные описания, которые вы можете найти через поисковые системы. Мы используем самые прямые слова, чтобы объяснить

1.1 Избегайте создания дубликатов RDD

В прямом смысле,Для одних и тех же данных следует создавать только один RDD, и нельзя создавать несколько RDD для представления одних и тех же данных.. Отказ от нашего задания Spark приведет к выполнению нескольких вычислений для создания нескольких RDD, представляющих одни и те же данные, что приведет к увеличению производительности задания.

1.2 как можно чаще повторно использовать один и тот же RDD

В случаях, когда данные, такие как несколько RDD, перекрываются или содержат, мы должныСтарайтесь как можно чаще повторно использовать RDD, чтобы максимально сократить количество RDD, тем самым максимально уменьшив количество выполнений операторов.Потому что если RDD в Spark не кешируется, то он каждый раз будет пересчитываться из исходника

Например, если формат данных одного СДР имеет тип «ключ-значение», а другой — тип с одним значением, данные значений этих двух СДР абсолютно одинаковы. Тогда мы можем использовать только RDD типа ключ-значение в это время, потому что он уже содержит другие данные.

1.3 Постоянство RDD, которые используются несколько раз

Сохраняйте RDD, которые используются несколько раз. На этом этапе Spark сохранит данные в RDD в памяти или на диске в соответствии с вашей стратегией сохранения. Каждый раз, когда операция оператора будет выполняться над этим СДР в будущем, постоянные данные СДР будут извлекаться непосредственно из памяти или диска, а затем будет выполняться оператор, без пересчета СДР из источника, а затем выполнения операции оператора.. Это гарантирует, что при выполнении нескольких операций оператора над СДР сам СДР вычисляется только один раз.

Добавлено: уровень сохраняемости Spark.

Эти английские на самом деле настолько просты, что вы можете сразу увидеть, что они означают... Я буду использовать жирный шрифт для слов или предложений, которые требуют большего внимания.

Уровень сохраняемости Объяснение значения
MEMORY_ONLY использоватьне сериализованныйФормат объекта Java сохраняет данные вОЗУсередина. Если памяти недостаточно для хранения всех данных, данные могутне будет сохраняться. Затем в следующий раз, когда над этим RDD будет выполняться операторная операция, те данные, которые не были сохранены, должны быть пересчитаны из источника. Это стратегия сохранения по умолчанию, использующаяМетод кэша(), эта стратегия постоянства фактически используется.
MEMORY_AND_DISK использоватьне сериализованныйФормат объекта Java, который предпочтительно пытается хранить данные в памяти. еслиЕсли памяти недостаточно для хранения всех данных, данные будут записаны в файл на диске, данные, сохраненные в файле на диске, будут считаны и использованы при следующем выполнении оператора на этом RDD.
MEMORY_ONLY_SER Основное значение такое же, как MEMORY_ONLY. Единственная разница в том,Данные в RDD будут сериализованы, каждый раздел RDD будет сериализован вбайтовый массив. Сюдасохранить больше памяти, чтобы постоянные данные не занимали слишком много памяти и не приводили к частому сбору мусора.
MEMORY_AND_DISK_SER Основное значение такое же, как MEMORY_AND_DISK. Единственное отличие состоит в том, что данные в RDD будут сериализованы, и каждый раздел RDD будет сериализован в массив байтов. Этот метод экономит больше памяти, тем самым избегая того, чтобы постоянные данные занимали слишком много памяти и приводили к частому сбору мусора.
DISK_ONLY Используя несериализованный формат объекта Java,Записать все данные в файл на диске.
MEMORY_ONLY_2, MEMORY_AND_DISK_2 и т. д. Для любой из вышеперечисленных стратегий сохраняемости добавление суффикса _2 означает, чтоДелается копия каждых постоянных данныхи сохранять копии на других узлах. Этот механизм сохраняемости на основе реплик в основном используется для обеспечения отказоустойчивости. Если узел умирает и постоянные данные в памяти или на диске узла теряются, копия данных на других узлах может использоваться для последующих вычислений RDD. При отсутствии копии данные могут быть пересчитаны только из источника.

(Дополнение) Как выбрать наиболее подходящую стратегию сохранения

По умолчанию самая высокая производительность, конечно, MEMORY_ONLY, все операции основаны на данных в чистой памяти, не требуют чтения данных из файлов на диске и имеют высокую производительность; и не нужно копировать копию данных и передавать это удаленно на другие узлы.

Однако здесь следует отметить, что в реальной производственной среде, вероятно, существуют ограниченные сценарии, в которых эта стратегия может использоваться напрямую.Если в RDD имеется большой объем данных (например, миллиарды), прямое использование этого уровня сохраняемости будет result in Вызывает исключение OOM JVM из-за нехватки памяти.

Если при использовании уровня MEMORY_ONLY происходит переполнение памяти, то рекомендуется попробовать уровень MEMORY_ONLY_SER. Этот уровень будет сериализовать данные RDD, а затем сохранять их в памяти.В настоящее время каждый раздел представляет собой просто массив байтов, что значительно сокращает количество объектов и использование памяти.

Этот уровень имеет большую нагрузку на производительность, чем MEMORY_ONLY, в основном накладные расходы на сериализацию и десериализацию. Однако последующие операторы могут работать на основе чистой памяти, поэтому общая производительность остается относительно высокой. Кроме того, возможные проблемы такие же, как и выше.Если объем данных в RDD слишком велик, это может вызвать исключение переполнения памяти OOM.

Если чистый уровень памяти недоступен, то рекомендуется использовать стратегию MEMORY_AND_DISK_SER. Если эта мера будет принята, это означает, что объем данных в RDD очень велик, и память не может быть полностью проставлена. Сериализованные данные относительно малы, что может сэкономить память и дисковое пространство. При этом стратегия сначала попытается закэшировать данные в памяти, а когда кеша памяти не хватит, запишет на диск.

Вообще не рекомендуется использовать DISK_ONLY и уровень с суффиксом 2: поскольку чтение и запись данных на основе дисковых файлов приведет к резкому снижению производительности, иногда лучше один раз пересчитать все RDD. На уровне с суффиксом 2 все данные должны быть реплицированы и отправлены на другие узлы. Репликация данных и передача по сети вызовут большие потери производительности. Если не требуется высокая доступность задания, это не рекомендуется.

1.4 Старайтесь избегать использования операторов перемешивания

Поскольку наиболее требовательной к производительности частью процесса выполнения задания Spark является процесс перемешивания. Проще говоря, процесс тасования заключается в извлечении одного и того же ключа, распределенного по нескольким узлам кластера, на один и тот же узел и выполнении таких операций, как агрегация или объединение.

Во время процесса тасования один и тот же ключ на каждом узле будет сначала записан в локальный файл на диске, а затем другие узлы должны будут извлечь тот же ключ из файла на диске на каждом узле через передачу по сети. Более того, когда один и тот же ключ извлекается на один и тот же узел для операции агрегирования, на узле может обрабатываться слишком много ключей, что приводит к недостаточному объему памяти, а затем к дисковому файлу. Следовательно, во время процесса тасования может происходить большое количество операций ввода-вывода чтения и записи дисковых файлов и операций передачи данных по сети. Дисковый ввод-вывод и передача данных по сети также являются основными причинами низкой производительности в случайном порядке.

1.5 Операция перемешивания с использованием предварительной агрегации на стороне карты

Так называемая предварительная агрегация на стороне карты означает, что каждый узел локально выполняет операцию агрегации с одним и тем же ключом, подобно локальному объединителю в MapReduce. После предварительной агрегации на стороне карты каждый узел будет иметь только один и тот же ключ локально, поскольку агрегируются несколько идентичных ключей. Когда другие узлы извлекают один и тот же ключ на всех узлах, это значительно уменьшает объем данных, которые необходимо извлечь, тем самым уменьшая дисковый ввод-вывод и накладные расходы на передачу по сети.

Самый очевидный пример — это reduceByKey и groupByKey, потому что groupByKey не будет предварительно агрегироваться.

группа по ключу:

img
img

undedbykey:

img
img

Мы видим, что reduceByKey агрегирует один и тот же ключ перед перемешиванием.

1.6 Использование высокопроизводительных операторов

Вы можете понять это сами.Примеры, приведенные в статье Meituan, следующие:

使用reduceByKey/aggregateByKey替代groupByKey
使用mapPartitions替代普通map
使用foreachPartitions替代foreach
使用filter之后进行coalesce操作
使用repartitionAndSortWithinPartitions替代repartition与sort类操作

1.7 Вещание больших переменных

При использовании внешней переменной Spark по умолчанию копирует несколько копий переменной и передает ее задаче по сети, в это время у каждой задачи есть копия переменной. Если сама переменная относительно велика (например, 100M или даже 1G), накладные расходы на производительность из-за передачи большого количества копий переменных в сети, а также частый GC, вызванный чрезмерным использованием памяти в Executor каждого узла, сильно повлияет на производительность.

Поэтому в описанной выше ситуации, если используемая внешняя переменная относительно велика, рекомендуется использовать функцию широковещательной передачи Spark для широковещательной передачи переменной. Широковещательная переменная гарантирует, что только одна копия переменной находится в памяти каждого Исполнителя, а задачи в Исполнителе совместно используют копию переменной в Исполнителе при выполнении. Таким образом, количество копий переменных может быть значительно уменьшено, тем самым снижая нагрузку на производительность при передаче по сети, уменьшая нагрузку на память Executor и уменьшая частоту GC.

простое описание предложенияЭто поставить в каждую задачу целую переменную, но теперь сохранить копию в Executor, а потом прийти и получить ее, когда задаче это понадобится.

1.8 Оптимизация производительности сериализации с помощью Kryo

В Spark сериализация в основном задействована в трех местах:

  1. При использовании внешней переменной переменная будет сериализована для передачи по сети.
  2. Когда настраиваемый тип используется в качестве универсального типа RDD (например, JavaRDD, Student является настраиваемым типом), все объекты настраиваемого типа будут сериализованы. Следовательно, в этом случае также требуется, чтобы пользовательский класс реализовывал интерфейс Serializable.
  3. При использовании сериализуемой стратегии сохраняемости (например, MEMORY_ONLY_SER) Spark сериализует каждый раздел в RDD в большой массив байтов.

Мы все можем оптимизировать производительность сериализации и десериализации с помощью библиотеки сериализации Kryo, и производительность примерно в 10 раз выше.

1.9 Оптимизация структуры данных

В Java есть три типа интенсивного использования памяти:

  1. Объект, каждый объект Java имеет дополнительную информацию, такую ​​как заголовок объекта, ссылка и т. д., поэтому он занимает место в памяти.
  2. Строки, каждая из которых содержит массив символов и дополнительную информацию, например длину.
  3. Типы коллекций, такие как HashMap, LinkedList и т. д., поскольку типы коллекций обычно используют некоторые внутренние классы для инкапсуляции элементов коллекции, например Map.Entry.

Spark официально рекомендует при реализации кода Spark, особенно для кода в операторных функциях, стараться не использовать три вышеуказанные структуры данных, пытаться использовать строки вместо объектов, использовать примитивные типы (такие как Int, Long) вместо строк, использовать массивы замените типы коллекций, чтобы свести к минимуму использование памяти, тем самым уменьшив частоту сборки мусора и повысив производительность.

  1. Не используйте объекты, которые могут быть представлены строками json, потому что заголовок объекта занимает дополнительные 16 байт.

  2. Если вы можете использовать строку, вам не нужно использовать строку, потому что строка занимает дополнительные байты 40. Например, если вы можете использовать 1, не используйте «1».

  3. Попробуйте использовать группы родов вместо типов коллекций.

  4. Конечно, мы не хотим хорошей производительности ради хорошей производительности, мы все равно должны учитывать читабельность кода и эффективность разработки.

1.10 Максимально возможная локализация данных

будет объяснено ниже

Во-вторых, запущенный процесс Spark

Это в основном то же самое, что и разделение Stage, которое в основном представляет собой вопрос, который нужно задавать на собеседовании.Это также очень просто задать.Расскажите мне о задаче Spark, а затем весь процесс после ее отправки.

2.1 Инициализация драйвера

Первый - это инициализация Драйвера, шаги четко обозначены на рисунке, поэтому я не буду расширять описание (есть какая-то проблема с draw.io, я не знаю, почему цвет фона шрифта по умолчанию небесно-голубой 😓, я не могу установить его обратно, так что я Минуточку)

2.2 Генерация и распределение задач

Когда код встречает оператор действия, будет сгенерирована задача задания.После создания задачи DAGScheduler выполнит разделение этапов (включая зависимости от ширины и алгоритмы разделения). Будут задания на этапе,Задачи на одном и том же этапе имеют одинаковую логику задачи, но обрабатываемые данные различаются.. Затем задача будет распределена между каждым работником для запуска.

Здесь я немного упомяну упомянутый здесь алгоритм распределения задач, но это последний пункт в десяти принципах настройки Spark Core.максимально возможная локализация данных"объяснил.

(Дополнительно) Описание уровня локализации процесса

Уровень локализации процесса:

  1. PROCESS_LOCAL: локализация процесса

Код и данные находятся в одном процессе, то есть в одном и том же исполнителе, задача вычисления данных выполняется исполнителем, а данные находятся в BlockManager исполнителя, производительность наилучшая.

  1. NODE_LOCAL: Локализованный узел в том же коде и данных в узле;

Например, данные находятся на узле в виде блока HDFS, и задача выполняется в исполнителе на узле; или данные и задача находятся в разных исполнителях на одном узле; данные необходимо передавать между процессами.

  1. NO_PREF
    Для задач, где данные получены одинаковые, нет разницы между хорошим и плохим

  2. RACK_LOCAL: Локализация стойки
    Данные и задачи находятся на двух узлах в стойке, данные необходимо передавать между узлами по сети

  3. ANY:
    Данные и задачи могут быть где угодно в кластере, а не в стойке, худшая производительность

После того, как мы отправляем задачу, появляется интерфейс мониторинга задач Spark. Вы должны правильно использовать этот интерфейс. Интерфейс Spark сделан хорошо. Например, мы видим, что локальность данных этой задачи — NODE_LOCAL, что отлично, но если локальность данных вашей задачи плохая, вы можете попробовать следующую настройку:

(Дополнительно) Как настроить

spark.locality.waitЗначение по умолчанию — 3 с, что означает, что при назначении задачи_PROCESS_LOCALставить задачи таким образом, но еслиPROCESS_LOCALЕсли это не выполняется, то подождите 3 секунды по умолчанию, чтобы увидеть, может ли он быть выделен в соответствии с этим уровнем, но если он ждет 3 секунды, он не будет реализован. затем нажмитеNODE_LOCALназначить на этом уровне

И так далее, и поэтому каждый раз три секунды. Но мы знаем, что, если вы хотите, чтобы код запустить более быструю скорость, то позвольте распределению задач как можно большеPROCESS_LOCALиNODE_LOCALуровень, поэтому при настройке дайте задаче подождать некоторое время на этих двух уровнях, чтобы как можно больше распределить задачи на эти два уровня. Таким образом, 3 секунды по умолчанию немного меньше.

spark.locality.wait.process 30s
spark.locality.wait.node 30s

Подождите немного дольше на этих двух уровнях

2.3 Шаги для возврата к Исполнителю

В Executor будет сгенерирован пул потоков, фактически соответствующий четвертому шагу диаграммы инициализации драйвера, который уже был сгенерирован.

Есть много мелких партнеров, которые могут не очень понимать взаимосвязь между различными существительными, я также скажу это на рисунке здесь, то есть в приложении будет много заданий, которые будут разделены на этапы, и будет много Задачи на этапе, а затем задача, соответствующая разделу, это так просто

1. Настройка на основе модели памяти Spark

1.1 Обзор

После того, как мы отправим задание Spark с помощью spark-submit, задание запустит соответствующий процесс Driver. В зависимости от используемого режима развертывания (deploy-mode) процесс драйвера может запускаться локально или на рабочем узле в кластере. Сам процесс Драйвера будет занимать определенное количество памяти и процессорного ядра в соответствии с заданными нами параметрами.

Первое, что должен сделать процесс Драйвер, — это обратиться к диспетчеру кластера (который может быть автономным кластером Spark или другим кластером управления ресурсами, например, наша компания использует YARN в качестве кластера управления ресурсами) для запуска заданий Spark. Ресурс здесь относится к процессу Executor. Менеджер кластера YARN запустит определенное количество процессов Executor на каждом рабочем узле в соответствии с параметрами ресурсов, которые мы установили для задания Spark.Каждый процесс Executor занимает определенное количество памяти и ядер ЦП.

1.2 Модель статической памяти

в 2016 годуspark 1.6Статическая модель памяти, используемая исполнителями spark в предыдущих версиях, но начиная со spark 1.6, была добавлена ​​унифицированная модель памяти. Настроить через параметр spark.memory.useLegacyMode. Значение по умолчанию — false, и таблица диапазонов использует новую модель динамической памяти.Если вы хотите использовать предыдущую модель статической памяти, вы должны изменить это значение на true.

Давайте сначала воспользуемся более простой схемой, чем официальная, чтобы объяснить общую ситуацию, понять ее можно примерно так.

Вот разделение -executor-memory, которое фактически разделено на три части: часть области памяти Storage, часть области Execution и некоторые другие области. Если вы используете статическую модель памяти, используйте эти параметры для управления:

spark.storage.memoryFraction:默认0.6
spark.shuffle.memoryFraction:默认0.2  
所以第三部分就是0.2

Если объем данных нашего кеша относительно велик или наши широковещательные переменные относительно велики, мы немного увеличим значение spark.storage.memoryFraction. Но если в нашем коде нет широковещательной переменной, нет кеша и больше перетасовок, то нам нужно увеличить значение spark.shuffle.memoryFraction.

Хорошо, тогда мы можем перейти к более сложной картине. На самом деле, вы обнаружите, что он более сдержан и развернут, чем тот, что выше меня.

Недостатки статической модели памяти:

После того, как мы настроили область памяти для хранения и область для выполнения, одна из наших задач предполагает, что памяти для выполнения недостаточно, но ее область памяти для хранения свободна, и они не могут заимствовать друг у друга, что недостаточно гибко, поэтому мы выпустили нашу новую Унифицированную модель памяти.

1.3 Унифицированная модель памяти


Модель динамической памяти сначала резервирует 300 м памяти, чтобы предотвратить переполнение памяти.

Модель динамической памяти делит общую память на две части, представленные параметром spark.memory.fraction, значение по умолчанию равно 0,6, эта часть делится на две небольшие части. На самом деле это две части: память для хранения и память для выполнения. Настраивается параметром spark.memory.storageFraction,Если значение spark.memory.storageFraction установлено равным 0,5, это означает, что на хранение приходится 0,5 в этом 0,6, то есть на выполнение приходится 0,1.(Примечание: десятые здесь полностью относятся к общему объему памяти, т.е.Не думайте, что spark.memory.storageFraction равно 0,5 означает, что на него приходится 0,5 spark.memory.fraction, нет, это означает 0,5 от общей памяти.)

Каковы характеристики унифицированной модели памяти?

Память для хранения и память выполнения могут быть заимствованы друг у друга. Не такая жесткая, как модель статической памяти, но есть правила:

Сценарий 1. Когда Execution используется, он обнаруживает, что памяти недостаточно, а затем данные из памяти хранилища удаляются на диск.

Сценарий 2: В начале память выполнения используется мало, но памяти, используемой хранилищем, много, поэтому хранилище занимает память выполнения, но позже выполнение также требует памяти и данных. в память хранилища в это время будет записываться на диск для освобождения места в памяти.

Осторожные друзья также обнаружат, что каждый раз, когда они отправляются выбрасывать хранилище.
Это связано с тем, что данные в исполнении будут использованы сразу, а данные в хранилище могут быть использованы не сразу.

1.4 Часть настройки ресурсов

После понимания основных принципов работы заданий Spark легко разобраться в параметрах, связанных с ресурсами. Так называемая настройка параметров ресурсов Spark на самом деле в основном предназначена для оптимизации эффективности использования ресурсов путем настройки различных параметров в различных местах, где ресурсы используются в рабочем процессе Spark, тем самым повышая производительность выполнения заданий Spark. Следующие параметры являются основными параметрами ресурсов в Spark.Каждый параметр соответствует определенной части принципа работы задания, а также предоставляет эталонное значение настройки.

1.4.1 num-executors

Описание параметра: Этот параметр используется дляУстановите общее количество процессов Executor, которые будут использоваться для выполнения заданий Spark.. Когда Драйвер запрашивает ресурсы у менеджера кластера YARN, менеджер кластера YARN попытается запустить соответствующее количество процессов Executor на каждом рабочем узле кластера в соответствии с вашими настройками. Этот параметр очень важен. Если он не установлен, по умолчанию для вас будет запускаться только небольшое количество процессов Executor. В это время скорость выполнения вашего задания Spark очень низкая.

Предложение по настройке параметров: обычно уместно установить от 50 до 100 процессов-исполнителей для запуска каждого задания Spark, и не рекомендуется устанавливать слишком мало или слишком много процессов-исполнителей. Если задано слишком малое значение, ресурсы кластера не могут быть использованы полностью; если задано слишком большое значение, большинство очередей не смогут предоставить достаточно ресурсов.

Я думаю, что это нормально начинать с num-executors в соответствии с1/10Более целесообразно тестировать количество узлов, то есть 100 узлов на 1000 узлов и 10 узлов на 100 узлов.

1.4.2 executor-memory

Описание параметра: Этот параметр используется дляУстановите память для каждого процесса Executor. Размер памяти Executor напрямую определяет производительность заданий Spark во многих случаях, а также напрямую связан с распространенными исключениями JVM OOM.

Предложение по настройке параметров: более целесообразно установить память каждого процесса Executor на 4G~8G. Однако это только ориентировочное значение, а конкретные настройки все же необходимо определить в соответствии с очередями ресурсов разных отделов. Вы можете увидеть максимальное ограничение памяти для очереди ресурсов вашей команды.Умножение num-executors на executor-memory не может превышать максимальный объем памяти очереди. Кроме того, если вы делитесь этой очередью ресурсов с другими людьми в команде, объем запрошенной памяти не должен превышать 1/3–1/2 от максимальной общей памяти очереди ресурсов, чтобы ваши собственные задания Spark не занимали все ресурсы очереди, что приводит к сбою выполнения заданий других коллег.

1.4.3 executor-cores

Описание параметра: Этот параметр используется дляУстановите количество ядер ЦП для каждого процесса Executor. Этот параметр определяет способность каждого процесса Executor выполнять потоки задач параллельно.. Поскольку каждое ядро ​​ЦП может одновременно выполнять только один поток задач, чем больше ядер ЦП имеет каждый процесс Executor, тем быстрее он может выполнять все назначенные ему потоки задач.

Предложение по настройке параметров: Более целесообразно установить количество ядер процессора Executor на 2~4. Это также должно быть определено в соответствии с очередями ресурсов разных отделов.Вы можете увидеть, каков максимальный предел ядра ЦП для ваших собственных очередей ресурсов, а затем определить, сколько ядер ЦП может выделить каждый процесс Исполнителя на основе количества установленных Исполнителей. . Также рекомендуется, чтобы, если очередь используется совместно с другими, уместно, чтобы количество исполнителей * число ядер исполнителя не превышало примерно 1/3~1/2 от общего числа ядер ЦП в очереди, а также во избежание влияния на другие домашнее задание учащихся.

Лично я считаю, что 1 ядро ​​процессора соответствует 3 задачам, это лучшая ситуация.

1.4.4 driver-memory

Описание параметра: Этот параметр используется для установкиПамять процесса драйвера.

Предложение по настройке параметров: Память драйвера обычно не устанавливается, или должно быть достаточно около 1G. Единственное, что следует отметить, это то, что если вам нужно использовать оператор сбора, чтобы вытащить все данные RDD в Драйвер для обработки, вы должны убедиться, что память Драйвера достаточно велика, иначе произойдет переполнение памяти OOM. проблема.

1.4.5 spark.default.parallelism

Описание параметра: Этот параметр используется для установкиКоличество задач по умолчанию на этап. Этот параметр чрезвычайно важен, если его не задать, он может напрямую повлиять на производительность вашего задания Spark.

Предложение по настройке параметров: количество задач по умолчанию для задания Spark составляет от 500 до 1000. Ошибка, которую часто совершают многие студенты, заключается в том, что если не задать этот параметр, то Spark установит количество задач в соответствии с количеством блоков в базовой HDFS, по умолчанию один блок HDFS соответствует одной задаче. Вообще говоря, количество настроек Spark по умолчанию слишком мало (например, десятки задач), при слишком малом количестве задач будут потеряны ранее установленные параметры Исполнителя.

Только представьте, сколько бы у вас ни было процессов Executor, сколько бы у вас ни было памяти и процессора, а задач было всего 1 или 10, то 90% процессов Executor могут вообще не выполнять задачи, что является пустой тратой ресурсов! Поэтому принцип настройки, предложенный официальным сайтом Spark, заключается в том, что более целесообразно установить этот параметр в 2-3 раза больше числа исполнителей * ядер исполнителя, например, общее количество ядер процессора Executor составляет 300, тогда можно поставить 1000 задач, в это время ресурсы кластера Spark могут быть использованы полностью.

1.4.6 spark.storage.memoryFraction

Собственно, об этом уже упоминалось. Описание параметра: Этот параметр используется дляУстановите долю постоянных данных RDD в памяти Executor, по умолчанию 0,6.То есть 60% памяти Исполнителя по умолчанию можно использовать для сохранения постоянных данных RDD. В зависимости от стратегии настойчивости вы выбираете, если не хватает памяти, данные могут не сохраняться, или данные будут записаны на диск.

Рекомендуемые параметры настройки: если операция Spark имеет более постоянную операцию RDD, значение номера параметра можно соответствующим образом улучшить, чтобы обеспечить постоянство данных в памяти. Избегайте недостатка памяти для кэширования всех данных, в результате чего данные могут быть записаны только на диск, что снижает производительность.

Однако если в задании Spark больше операций в случайном порядке и меньше постоянных операций, целесообразно соответствующим образом уменьшить значение этого параметра. Кроме того, если обнаруживается, что задание выполняется медленно из-за частого gc (время gc задания можно наблюдать через интерфейс spark web), это означает, что памяти задачи для выполнения пользовательского кода недостаточно, то также рекомендуется понизить значение этого параметра.

1.4.7 spark.shuffle.memoryFraction

Об этом только что упоминалось. Описание параметра: Этот параметр используется дляУстановите долю памяти Executor, которая может использоваться для операций агрегирования после того, как задача извлечет выходные данные задачи предыдущего этапа в процессе перемешивания.По умолчанию 0,2.. То есть Executor по умолчанию использует только 20% памяти, используемой для этой операции. Когда операция перемешивания выполняет агрегирование, если обнаруживается, что используемая память превышает предел 20%, лишние данные будут перезаписаны в файл на диске, что значительно снизит производительность.

Предложение по настройке параметров: если в задании Spark мало постоянных операций RDD и много операций перетасовки, рекомендуется уменьшить долю памяти для постоянных операций и увеличить долю памяти для операций перетасовки, чтобы избежать нехватки памяти при слишком большом количестве данных во время выполнения. процесс перемешивания.Если он используется, его необходимо перезаписать на диск, что снижает производительность. Кроме того, если обнаружено, что задание выполняется медленно из-за частого gc, а это значит, что памяти задания для выполнения пользовательского кода недостаточно, то также рекомендуется понизить значение этого параметра.

1.4.8 Краткое резюме

Не существует фиксированного значения для настройки параметров ресурсов. Оно должно основываться на вашей реальной ситуации (включая количество операций перетасовки в заданиях Spark, количество постоянных операций RDD и статус gc задания, отображаемый в веб-интерфейсе Spark). ) Также обратитесь к этой статье.Установите вышеуказанные параметры разумно в соответствии с принципами и рекомендациями по настройке, приведенными в .

Ниже приведен пример команды spark-submit, вы можете обратиться к ней и настроить ее в соответствии с вашей реальной ситуацией:

./bin/spark-submit \
  --master yarn-cluster \
  --num-executors 100 \
  --executor-memory 6G \
  --executor-cores 4 \
  --driver-memory 1G \
  --conf spark.default.parallelism=1000 \
  --conf spark.storage.memoryFraction=0.5 \
  --conf spark.shuffle.memoryFraction=0.3 \

Параметр --master yarn-cluster требует внимания, так как этот параметр несовместим для разных версий Spark. Поэтому не забывайте обращать внимание на версию в нашем кластере при отправке задач.Если это версия Spark 1.5, это просто --master yarn-cluster.Если это версия 2.4, этот параметр снова становится --master yarn, поэтому он должен быть обратить внимание

Если что-то вродеjava.lang.OutOfMemoryError, ExecutorLostFailure, Код выхода исполнителя 143, исполнитель потерялся, тайм-аут сердцебиения, файл в случайном порядке потерян...Подождите, не нервничайте, вполне вероятно, что проблема не в памяти, вы можете сначала попробовать увеличить память. Если вы все еще не можете решить ее, сосредоточьтесь на перекосе данных.

2. Настройка перекоса данных

2.1 Обзор

Иногда мы можем столкнуться с одной из самых сложных проблем при обработке больших данных — перекосом данных, и производительность заданий Spark будет намного хуже, чем ожидалось. Настройка перекоса данных заключается в использовании различных технических решений для решения различных типов проблем перекоса данных, чтобы обеспечить производительность заданий Spark.

2.1.1 Феномен перекоса данных

Большинство задач выполняются очень быстро, но некоторые задачи выполняются очень медленно. Например, всего 1000 задач, 997 задач выполняются в течение 1 минуты, а оставшиеся две-три задачи займут час-два. Эта ситуация очень распространена.

Задание Spark, которое могло нормально выполняться, однажды внезапно сообщило об исключении OOM (недостаточно памяти).Обратите внимание на стек исключений, вызванный написанным нами бизнес-кодом. Эта ситуация относительно редка.

2.1.2 Принцип перекоса данных

Принцип наклона данных прост: при выполнении перетасовки один и тот же ключ должен быть передан задаче на узле, которая будет обрабатываться на каждом узле, например полимеризация или тому подобное, в соответствии с ключом для операций соединения. В это время, если данные соответствуют ключу, то, в частности, происходит искажение данных.

Например, большинство ключей соответствуют 10 фрагментам данных, но отдельные ключи соответствуют 1 миллиону фрагментов данных, тогда большинству задач может быть выделено только 10 фрагментов данных, а затем они могут выполняться за 1 секунду; но отдельные задачи могут быть выделены для 1 миллион фрагментов данных для работы в течение часа или двух. Таким образом, ход выполнения всего задания Spark определяется задачей с наибольшим временем выполнения.

Поэтому, когда данные искажены, кажется, что задание Spark выполняется очень медленно и может даже вызвать переполнение памяти из-за чрезмерного объема данных, обрабатываемых задачей.

Вышеприведенная картинка является очень наглядным примером: ключ hello соответствует в общей сложности 7 фрагментам данных на трех узлах, и эти данные будут подтягиваться в одну и ту же задачу для обработки; а два ключа world и you соответственно соответствуют 1 часть данных, поэтому двум другим задачам нужно обработать только 1 часть данных соответственно. В это время время выполнения первой задачи может быть в 7 раз больше, чем у двух других задач, а скорость выполнения всего этапа также определяется самой медленно выполняющейся задачей.

2.1.3 Как найти код искажения данных

Перекос данных происходит только во время перемешивания. Вот некоторые часто используемые операторы, которые могут инициировать операции перемешивания: Different, groupByKey, reduceByKey, aggregateByKey, join, cogroup, repartition и т. д. Когда возникает перекос данных, это может быть вызвано использованием одного из этих операторов в вашем коде.

Когда задача выполняется очень медленно
Первое, на что следует обратить внимание, — на каком этапе происходит искажение данных..
Если вы отправляете в режиме yarn-client, вы можете напрямую просматривать журнал локально, и вы можете найти количество этапов, которые в настоящее время выполняются в журнале; если вы отправляете в режиме yarn-cluster, вы можете просмотреть текущий через веб-интерфейс Spark. , Бежим к первому этапу. также,Независимо от того, используете ли вы режим yarn-client или yarn-cluster, мы можем тщательно изучить объем данных, выделенных каждой задачей на текущем этапе в веб-интерфейсе Spark, чтобы дополнительно определить, являются ли неравномерные данные, выделенные задачей. вызывает искажение данных.

Мы можем найти оператора с перекосом данных следующим образом:Spark Web UI Midpoint в приложении там будет много работы, потому что работа состоит в том, чтобы определить действие для отделения оператора, поэтому мы посмотрим на наш оператор действий кода, чтобы определить местоположение кода, а затем указать на работу Будет этап, сцена определяется классом оператора Shuffle, но и для определения положения положения волны кода в качестве основы, на этот раз потребляющая задача, которую они давно видят, он знал бы на каком этапе (оператор перемены) проблема


Например, на рисунке выше предпоследний столбец показывает время выполнения каждой задачи. Хорошо видно, что некоторые задачи выполняются очень быстро и для их выполнения требуется всего несколько секунд, а некоторые задачи выполняются очень медленно и для их выполнения требуется несколько минут. В настоящее время это можно определить только по времени выполнения. перекошенный. Кроме того, в предпоследнем столбце показано количество данных, обработанных каждой задачей.Очевидно, что задаче с очень коротким временем выполнения нужно обработать всего несколько сотен КБ данных, а задаче с очень большим временем выполнения необходимо обрабатывать несколько тысяч килобайт, объем обрабатываемых данных в 10 раз хуже. В настоящее время более очевидно, что произошел перекос данных.

Однако следует отметить, что перекос данных не может быть определен только случайным переполнением памяти. Из-за ошибок в коде, написанном вами, а также случайных исключений данных также могут возникать переполнения памяти. Следовательно, по-прежнему необходимо проверять время выполнения и выделенный объем данных каждой задачи этапа, на котором сообщается об ошибке через веб-интерфейс Spark, в соответствии с методом, упомянутым выше, чтобы определить, вызвано ли переполнение памяти перекос данных.

2.1.4 Просмотр распределения данных по ключам, вызывающим искажение данных

Узнав, где происходит перекос данных, обычно необходимо проанализировать таблицу RDD/Hive, которая выполнила операцию перемешивания и вызвала перекос данных, и проверить распределение ключей. Это делается главным образом для того, чтобы обеспечить основу для выбора технического решения в будущем. Для различных ситуаций, когда сочетаются разные распределения ключей и разные операторы тасования, может потребоваться выбор разных технических решений.

На этом этапе, в зависимости от того, как вы выполняете операцию, существует множество способов просмотра распределения ключей:

  1. Если данные искажены операторами group by и join в Spark SQL, запросите распределение ключей таблиц, используемых в SQL.
  2. Если данные искажаются из-за выполнения оператора перемешивания в RDD Spark, вы можете добавить код для проверки распределения ключей в задании Spark, например RDD.countByKey(). Затем соберите/отнесите клиенту подсчитанное количество вхождений каждого ключа и распечатайте его, и вы сможете увидеть распределение ключей.

2.2 Решения для перекоса данных

2.2.1 Предварительная обработка данных с помощью Hive ETL

На самом деле, этот трюк предназначен исключительно для того, чтобы сэкономить некоторые трудоемкие операции для запуска ранним утром, а затем напрямую получить результаты выполнения ранним утром, когда вам понадобятся данные на следующий день (обманите себя 😂). Это подходит для данных в самой таблице Hive, которые очень неравномерны (например, ключ соответствует 1 миллиону данных, другие ключи соответствуют только 10 данным), а бизнес-сценарий требует частого использования Spark для выполнения операции анализа на стол Улей

Предварительная обработка данных выполняется через Hive (то есть данные предварительно агрегируются по ключу через Hive ETL или предварительно объединяются с другими таблицами. В это время, поскольку данные были агрегированы или объединены заранее, задание Spark также будет Для выполнения таких операций нет необходимости использовать оригинальный оператор тасования, он прост и удобен в реализации, а эффект очень хороший

Но у этого трюка также будут сценарии, где его нельзя использовать, то есть, если вы хотите играть в реальном времени, его нельзя использовать.

2.2.2 Отфильтровать несколько ключей, вызывающих перекос

Если мы считаем, что несколько ключей с большим объемом данных не особенно важны для выполнения и результатов расчета задания, то просто отфильтруем эти несколько ключей. Если вам нужно динамически определять, какие ключи имеют наибольший объем данных каждый раз при выполнении задания, а затем фильтровать их, вы можете использовать оператор выборки для выборки RDD, затем вычислить номер каждого ключа и отфильтровать ключ с помощью наибольший объем данных.

После фильтрации ключей, вызывающих перекос данных, эти ключи не будут участвовать в расчете, и, естественно, генерировать перекос данных невозможно.Только что упомянутый оператор позиционирования пригодится сразу.Если у вас большое количество, я сразу Я удалил тебя (еще один трюк, чтобы обмануть себя 😂)

2.2.3 Улучшение параллелизма операций перемешивания

Увеличение количества задач случайного чтения позволяет назначить несколько клавиш, изначально назначенных одной задаче, нескольким задачам, что позволяет каждой задаче обрабатывать меньше данных, чем раньше.

Например, если изначально имеется 5 ключей, каждый ключ соответствует 10 элементам данных, и эти 5 ключей выделены задаче, то эта задача будет обрабатывать 50 элементов данных. После добавления задачи случайного чтения каждой задаче назначается ключ, то есть каждая задача обрабатывает 10 фрагментов данных, поэтому время выполнения каждой задачи естественно сократится. Его относительно просто реализовать, и он может эффективно смягчить и смягчить влияние перекоса данных.

Это решение на самом деле очень нелепое, на самом деле оно вообще не может решить перекос данных, потому что если есть какие-то крайние случаи, например, количество данных, соответствующих ключу, равно 1 миллиону, то сколько бы задач вы ни увеличивали, этот ключ соответствует 1 миллиону данных. Он определенно все еще будет выделен задаче для обработки, поэтому обязательно произойдет перекос данных.

Первые три решения бесполезны, и все они уходят от проблемы, начиная с четвертого решение становится разумным.

2.2.4 Двухэтапная агрегация (локальная агрегация + глобальная агрегация)

Это решение больше подходит при выполнении операторов перемешивания агрегации, таких как reduceByKey в RDD, или при использовании инструкций group by в Spark SQL для групповой агрегации.

Основная идея реализации заключается в выполнении двухэтапной агрегации.

  1. Первый раз — локальная агрегация.Во-первых, каждый ключ помечается случайным числом, например, случайным числом в пределах 10. В это время один и тот же ключ станет другим.
    Например(hello, 1) (hello, 1) (hello, 1) (hello, 1), станет(1_hello, 1) (1_hello, 1) (2_hello, 1) (2_hello, 1).

  2. Затем выполните операции агрегации, такие как reduceByKey для данных после случайного числа, и выполните локальную агрегацию, после чего результат локальной агрегации станет(1_hello, 2) (2_hello, 2).

  3. Затем удалите префикс каждого ключа, он станет(hello,2)(hello,2),

  4. Выполните глобальную операцию агрегирования еще раз, чтобы получить окончательный результат, и, наконец,(hello, 4).

Преимущество этого метода заключается в том, что перекос данных, вызванный операцией перемешивания класса агрегации, очень хорош. Обычно перекос данных удается устранить или, по крайней мере, значительно уменьшить, а производительность заданий Spark повысить в несколько раз. Однако недостаток также очевиден: он применим только к операции перемешивания класса агрегации, а область применения относительно узка. Если это операция перемешивания класса соединения, необходимо использовать другие решения.

2.2.5 Преобразование соединения сокращения в соединение карты

Это решение подходит при использовании операций соединения для RDD или операторов соединения в Spark SQL, а объем данных в RDD или таблице в операции соединения относительно невелик (например, несколько сотен М или один или два гигабайта).

Обычное соединение будет проходить через процесс перемешивания, и после перемешивания это эквивалентно извлечению данных одного и того же ключа в задачу чтения в случайном порядке, а затем объединению, которое является соединением сокращения. Но если RDD относительно невелик, вы можете использоватьВещание полных данных малого RDD + картографический операторДля достижения того же эффекта, что и при объединении, т. е. при объединении карты, в это время не будет выполняться операция перемешивания и перекос данных.

Преимущества: при перекосе данных, вызванном операцией объединения, непосредственно предотвращается перетасовка, и перекоса данных вообще нет.

Недостатки схемы: меньше применимых сценариев, т.к.Применяется только в случае одной большой таблицы и одной маленькой таблицы. Поскольку нам нужно транслировать небольшие таблицы, это будет потреблять больше ресурсов памяти.Драйвер и каждый Executor будут находиться в памяти небольшого RDD с полными данными. Если данные RDD, которые мы транслируем, относительно велики, например, более 10G, может произойти переполнение памяти. Поэтому он не подходит для случая, когда обе таблицы большие.

2.2.6 (Приятно) сэмплирование косого ключа и операция разделения соединения

Когда две таблицы RDD/Hive соединены, если объем данных относительно велик и нельзя использовать 2.2.5, вы можете посмотреть распределение ключей в двух таблицах RDD/Hive.Если данные искажены, это связано с тем, что объем данных нескольких ключей в одной таблице RDD/Hive слишком велик, в то время как все ключи в другой таблице RDD/Hive распределены равномерно., то это решение является более подходящим.

  1. Для RDD, который содержит несколько ключей со слишком большим объемом данных,Используйте оператор выборки для выборки выборки, затем подсчитайте количество каждого ключа и вычислите, какие ключи содержат наибольший объем данных..
  2. Затем извлеките данные, соответствующие этим ключам, из исходного RDD.Разделите его, чтобы сформировать отдельный RDD, и добавьте к каждому ключу префикс со случайным числом в пределах n, чтобы большинство искаженных ключей не образовывали еще один СДР.
  3. Затем другой СДР, который необходимо соединить, также отфильтровывается из данных, соответствующих наклонным клавишам, для формирования отдельного СДР, и каждые данные расширяются на n частей данных, и к этим n частям данных добавляется последовательность 0 Префикс ~n., большинство ключей, которые не вызывают искажения, также образуют другой RDD.

На этом этапе мы можем придумать эту процедуру.Если в RDDA1 или RDDB1 есть небольшой объем данных, которые могут соответствовать условиям схемы 5, то непосредственно выполнить схему 5

Но на самом деле в реальной производственной среде и то, и другое очень велико, поэтому нам нужно продолжать совершенствоваться.

  1. Затем присоедините независимый СДР со случайным префиксом и еще один независимый СДР емкостью n раз, при этом исходный же ключ можно разбить на n частей и распределить на несколько задач для объединения.
  2. Два других обычных RDD можно соединить как обычно.
  3. Наконец, результаты двух объединений можно объединить с помощью оператора объединения, который является окончательным результатом соединения.

Для перекоса данных, вызванного объединением, если только несколько ключей вызывают перекос, вы можетеРазделите несколько ключей на независимые RDD, добавьте случайные префиксы и разбейте их на n частей для объединения.В это время данные, соответствующие этим ключам, не будут сосредоточены на нескольких задачах, а распределены по нескольким задачам..

PS: Не надо думать, что расширять мощности в 3 раза нехорошо.Возникновение перекоса данных может привести к тому, что задача будет рассчитана на несколько дней или даже на десять дней.По сравнению с этим риском цена этого расширение мощностей того стоит.

Преимущества: для перекоса данных, вызванного объединением, если только несколько ключей вызывают перекос, этот метод можно использовать для разбиения ключей наиболее эффективным способом объединения. При этом необходимо лишь увеличить емкость данных, соответствующих нескольким наклонным ключам, в n раз, и нет необходимости расширять емкость всего объема данных. Не занимайте слишком много памяти.

Недостатки схемы: Если слишком много ключей, вызывающих перекос, например, тысячи ключей вызывают перекос данных, то этот метод не подходит.

2.2.7 Присоединение со случайным префиксом и масштабированием RDD

Если большое количество ключей в RDD приводит к искажению данных во время операции объединения, разбивать ключи бессмысленно, в настоящее время для решения проблемы можно использовать только это решение. Просто и грубо, необоснованно 😤

  1. Идея реализации этого решения в основном аналогична "Решению 6". Во-первых, проверьте распределение данных в таблице RDD/Hive и найдите таблицу RDD/Hive, которая вызывает перекос данных. Например, есть несколько ключей, соответствующих более 10 000 единиц данных.
  2. Затем каждый фрагмент данных в RDD имеет префикс со случайным префиксом в пределах n.
  3. В то же время другой обычный RDD расширяется, и каждая часть данных расширяется до n частей данных, и каждая часть расширенных данных по очереди имеет префикс 0 ~ n.
  4. Наконец, соедините два обработанных RDD.

Предыдущее решение состоит в том, чтобы выполнять специальную обработку только данных, соответствующих нескольким искаженным ключам.Поскольку СДР необходимо расширить в процессе обработки, использование памяти в предыдущем решении после расширения СДР невелико.В случае перекошенных ключей невозможно разделить некоторые ключи для отдельной обработки, поэтому расширение данных возможно только на весь RDD, что требует больших ресурсов памяти.

Синтетически и гибко управлять несколькими вышеупомянутыми решениями, ориентированными на данные, чтобы гарантировать, что проблемы, возникающие ежедневно, могут быть решены в основном с помощью идей.


3. Настройка в случайном порядке

Spark Shuffle был не очень зрелым до Spark 1.3.В то время в интервью может быть много упоминаний.На самом деле, текущего интервью недостаточно, чтобы спросить об этих знаниях, но если мы хотим понять технологию глубоко , то мне еще предстоит раскопать эти детали и узнать о прошлом и настоящем.

Содержание настройки здесь в основном состоит в том, чтобы поставить предыдущийРезюме и анализ Spark's ShuffleУпомянутые в нем параметры объясняются еще раз

3.1 (Повышение производительности) spark.shuffle.file.buffer

  • По умолчанию: 32 КБ
  • Описание параметра: Этот параметр используется для установки размера буфера BufferedOutputStream задачи случайной записи. Перед записью данных в файл на диске они сначала будут записаны в буферный буфер, а переполнение будет записано на диск только после заполнения буфера.
  • Предложение по настройке: если доступных ресурсов памяти для задания достаточно, вы можете соответствующим образом увеличить размер этого параметра (например, 64 КБ), тем самым уменьшив количество раз переполнения дисковых файлов во время процесса записи в случайном порядке, что также может уменьшить количество дисковых операций ввода-вывода

3.2 (Повышение производительности) spark.reducer.maxSizeInFlight

  • По умолчанию: 48 м
  • Описание параметра: этот параметр используется для установки размера буфера задачи чтения в случайном порядке иЭтот буфер буфера определяет, сколько данных может быть извлечено каждый раз.
  • Предложение по настройке: если доступных ресурсов памяти для задания достаточно, вы можете соответствующим образом увеличить размер этого параметра (например, 96 м), тем самым уменьшив количество извлечений данных, что также может уменьшить количество сетевых передач.

3.3 (улучшить стабильность) spark.shuffle.io.maxRetries

  • По умолчанию: 3
  • Описание параметра: Когда задача чтения в случайном порядке извлекает свои собственные данные с узла, на котором расположена задача записи в случайном порядке, если извлечение не удается из-за сбоя в сети, она автоматически повторяет попытку. Этот параметр представляет максимальное количество повторных попыток, которое может быть сделано. Если получение не будет успешным в течение указанного количества раз, это может привести к сбою задания.
  • Предложение по настройке: для тех заданий, которые включают особенно трудоемкие операции перемешивания, рекомендуется увеличить максимальное количество повторных попыток (например, 60 раз), чтобы избежать сбоев извлечения данных, вызванных такими факторами, как полная сборка мусора JVM или нестабильность сети. Настройка этого параметра может значительно улучшить стабильность процесса тасования для чрезвычайно больших объемов данных (от миллиардов до десятков миллиардов).

3.4 (улучшить стабильность) spark.shuffle.io.retryWait

  • По умолчанию: 5 с
  • Описание параметра: конкретное объяснение такое же, как и выше.Этот параметр представляет собой интервал ожидания для каждой попытки получения данных, и по умолчанию он равен 5 с.
  • Предложение по настройке: рекомендуется увеличить интервал времени (например, 60 с), чтобы повысить стабильность операции перемешивания.

3.5 (модель памяти) spark.shuffle.memoryFraction

  • По умолчанию: 0,2
  • Описание параметра:(Spark1.6 это параметр, после 1.6 параметр становится spark.memory.fraction) Этот параметр представляет долю памяти, выделенную задаче случайного чтения для операций агрегирования в памяти Executor.По умолчанию 20%.
  • Предложение по настройке: этот параметр объясняется в разделе Настройка параметров ресурсов. Если памяти достаточно и постоянные операции используются редко, рекомендуется увеличить это соотношение и выделить больше памяти для операции агрегации случайного чтения, чтобы избежать частого чтения и записи дисков в процессе агрегации из-за нехватки памяти. На практике установлено, что разумная регулировка этого параметра позволяет повысить производительность примерно на 10 %.

3.6 spark.shuffle.manager

  • По умолчанию: сортировать
  • Описание параметра: Этот параметр используется для установки типа ShuffleManager. После Spark 1.5 есть три варианта: hash, sort и tungsten-sort. HashShuffleManager был параметром по умолчанию до Spark 1.2, но в Spark 1.2 и более поздних версиях по умолчанию используется SortShuffleManager. После Spark 1.6 метод хеширования был удален, tungsten-sort похож на sort, но использует механизм управления памятью вне кучи в плане tungsten, что делает использование памяти более эффективным.
  • Предложение по настройке: поскольку SortShuffleManager сортирует данные по умолчанию, если этот механизм сортировки требуется в вашей бизнес-логике, вы можете использовать SortShuffleManager по умолчанию; и если ваша бизнес-логика не требует сортировки данных, рекомендуется обратиться к следующим параметрам: настроен, чтобы избежать операций сортировки с помощью механизма обхода или оптимизированного HashShuffleManager, обеспечивая при этом лучшую производительность чтения и записи на диск. Здесь следует отметить, что tungsten-sort следует использовать с осторожностью, поскольку некоторые соответствующие ошибки были обнаружены ранее.

3.7 spark.shuffle.sort.bypassMergeThreshold

  • По умолчанию: 200
  • Описание параметра: Когда ShuffleManager имеет значение SortShuffleManager, если количество задач чтения в случайном порядке меньше этого порога (по умолчанию 200), операция сортировки не будет выполняться в процессе записи в случайном порядке, а данные будут записаны непосредственно в пути неоптимизированного HashShuffleManager, но в итоге все временные файлы на диске, сгенерированные каждой задачей, будут объединены в один файл, и будет создан отдельный индексный файл.
  • Предложение по настройке: если вы используете SortShuffleManager, если вам не нужна операция сортировки, рекомендуется увеличить этот параметр больше, чем количество задач чтения в случайном порядке. Тогда механизм обхода будет автоматически включен в это время, и сторона карты не будет сортироваться, что снижает нагрузку на производительность сортировки. Однако таким образом все равно будет создано большое количество файлов на диске, поэтому производительность произвольной записи необходимо повысить.

finally

Десять тысяч слов — это непросто, это также последняя статья о ядре Spark. Я надеюсь быть полезным. В будущем на Spark Streaming будет больше контента.Заинтересованные друзья могут обратить на него внимание.Публичный аккаунт: Сообщите свои пожелания