Spark Learning — настройка производительности (2)

Spark

Другие основные статьи по Java:
Базовое изучение Java (справочник)


Перейти к предыдущей статьеSpark Learning — настройка производительности (1)Объяснить

Уменьшить коэффициент памяти операций кэширования

Про использование RDD-памяти я напишу позже специальную статью, которую можно будет прочитать вместе с изучением

В spark память кучи разделена на две части: одна специально используется для кэширования RDD и операций сохранения для кэширования данных RDD; другая, как мы только что сказали, используется для искровых вычислений. Она используется операцией подфункции для хранения созданных им объектов в функции.

По умолчанию коэффициент памяти для операций кэширования RDD равен 0,6, и 60% памяти отводится операциям кэширования. Но проблема в том, что если кеш в некоторых случаях не такой плотный, то проблема в том, что в функции оператора задачи создается слишком много объектов, а памяти не слишком много, что приводит к частым минорным сборкам мусора и даже частым полным gc, что приводит к искровым частым остановкам. Влияние на производительность может быть огромным.

В ответ на вышеописанную ситуацию вы можете просмотреть статистику запуска вашего spark job в интерфейсе spark ui, если вы запускаете его через пряжу, то через пряжу интерфейс. Вы можете видеть статус выполнения каждого этапа, включая время выполнения каждой задачи, время gc и т. д. Если обнаруживается, что gc встречается слишком часто, время слишком велико. На этом этапе цена может быть скорректирована соответствующим образом.

Чтобы уменьшить долю памяти операций кэширования, очень важно использовать операции сохранения, выбрать запись части кэшированных данных RDD на диск или сериализовать их, а также сотрудничать с классом сериализации Kryo, чтобы уменьшить использование памяти кэшами RDD. ;уменьшить коэффициент памяти операций кэширования, соответствующий да, увеличен коэффициент памяти операторной функции. В это время можно уменьшить частоту второстепенных gc и уменьшить частоту полных gc одновременно. Это полезно для улучшения производительности.

Короче говоря, пусть функции оператора выполнения задач, тем больше можно использовать память.

SparkConf conf = new SparkConf()
  .set("spark.storage.memoryFraction", "0.5")

Исполнитель настройки

Иногда, если объем данных, обрабатываемых вашим заданием Spark, особенно велик, сотни миллионов данных, а затем после запуска задания Spark время от времени появляются сообщения об ошибках, таких как не удается найти файл в случайном порядке, исполнитель, задача потеряна и т. нехватка памяти (переполнение памяти);

Может случиться так, что памяти вне кучи исполнителя недостаточно, что может привести к переполнению памяти во время запущенного процесса исполнителя; тогда это может привести к тому, что карта перемешивания будет вытягиваться из некоторых исполнителей при выполнении задач последующего этапы выполняются, выходной файл, но исполнитель, возможно, умер, и связанный с ним диспетчер блоков исчез; поэтому он может сообщить, что выходной файл в случайном порядке не найден, повторная отправка задачи, исполнитель потерян, задание spark полностью завершилось сбоем.

В приведенном выше случае вы можете рассмотреть возможность настройки вне кучи памяти исполнителя. Ошибок можно избежать, кроме того, иногда, когда корректировка памяти вне кучи относительно велика, это также приводит к некоторому улучшению производительности.

Как настроить память executor вне кучи

--conf spark.yarn.executor.memoryOverhead=2048

В скрипте spark-submit используйте метод --conf для добавления конфигурации, обязательно обратите внимание! ! ! Помните, что не в вашем коде задания искры используйте new SparkConf().set(), чтобы установить его, не устанавливайте его так, это бесполезно! Обязательно установите его в скрипте spark-submit.

spark.yarn.executor.memoryOverhead(看名字,顾名思义,针对的是基于yarn的提交模式)

По умолчанию верхний предел этой памяти вне кучи составляет примерно более 300 Мб, в наших обычных проектах, когда мы действительно имеем дело с большими данными, здесь будут проблемы, вызывающие многократное падение спарк-джоба и невозможность его запуска; в это время мы настроим этот параметр, по крайней мере, на 1G (1024M), даже на 2G, 4G

Обычно после настройки этого параметра можно избежать некоторых нештатных проблем с JVM OOM, и в то же время производительность всей искровой работы будет значительно улучшена.

Настройте время ожидания соединения

Если локального менеджера блоков не существует, он будет получен путем удаленного подключения к менеджеру блоков исполнителя на других узлах через TransferService. Если случится так, что исполнителями на других узлах будут GC, в это время не будет ответа, и сетевое соединение не может быть установлено; оно зависнет; ок, ​​время ожидания по умолчанию для сетевого соединения spark составляет 60 с; если оно зависает на 60 секунд, соединение не может быть установлено. Если это так, то оно объявляется неудачным.

Столкнитесь с ситуацией, время от времени, время от времени, время от времени! ! ! нерегулярный! ! ! такой-то файл. Строка идентификаторов файлов. UUID (dsfsfd-2342vs--sdf--sdfsd). не найден. файл потерян.

В этом случае очень вероятно, что исполнитель с этими данными находится в файле jvm gc. Поэтому при вытягивании данных соединение не может быть установлено. Затем после 60-х по умолчанию он напрямую объявляет об отказе.

Если об ошибке сообщается несколько раз и данные не могут быть извлечены несколько раз, может произойти сбой задания Spark. Это также может привести к тому, что DAGScheduler будет повторно отправлять этапы несколько раз. TaskScheduler, повторно отправляет задачи несколько раз. Значительно увеличено время выполнения наших искровых заданий.

Рассмотрите возможность настройки периода ожидания подключения.

--conf spark.core.connection.ack.wait.timeout=300

Помните, что скрипт spark-submit не настроен так, как new SparkConf().set().

Объединить выходные файлы карты

Условия реальной производственной среды:
100 узлов (один исполнитель на узел): 100 исполнителей
Каждый исполнитель: 2 процессорных ядра
Всего 1000 задач: в среднем 10 задач на исполнителя
Каждый узел, 10 задач, сколько файлов карты будет выводить каждый узел? 10 * 1000 = 10 000 файлов
Сколько всего выходных файлов на стороне карты? 100 * 10000 = 1 миллион.

  • Операция записи на диск в случайном порядке — это, по сути, самая серьезная часть потребления производительности в случайном порядке. Согласно вышеприведенному анализу, случайное соединение искрового задания в обычной производственной среде запишет на диск 1 миллион файлов. Влияние дискового ввода-вывода на производительность и скорость выполнения искровых заданий чрезвычайно ошеломляюще и пугающе. По сути, производительность искровых заданий потребляется при перемешивании, хотя это не только часть выходного файла на стороне карты при перемешивании, но и очень большая точка потребления производительности.

Включить механизм слияния выходных файлов карты

Механизм слияния выходного файла стороны карты можно включить следующей командой

new SparkConf().set("spark.shuffle.consolidateFiles", "true")

Как показано выше:

  • На первом этапе запускать задачи ядра процессора одновременно, например, если имеется 2 ядра процессора, запускать параллельно 2 задачи, каждая задача создает количество файлов задач следующего этапа;
  • На первом этапе, после выполнения двух задач, запущенных параллельно, будут выполнены две другие задачи; две другие задачи не будут воссоздавать выходной файл; вместо этого будет повторно использоваться выходной файл на стороне карты, созданный предыдущей задачей. , а данные записываются в выходной файл предыдущего пакета задач.
  • На втором этапе, когда задача извлекает данные, она не извлекает выходной файл, созданный каждой задачей на предыдущем этапе, а извлекает небольшое количество выходных файлов.Каждый выходной файл может содержать несколько задач для выполнения. вывод на свою сторону карты.

Что происходит с приведенным выше примером после слияния выходных файлов на стороне карты?

  1. IO задачи записи карты в файлы на диск, сокращение: 1 миллион файлов -> 200 000 файлов
  2. Второй этап, изначально вытащить количество файлов задач первого этапа, 1000 задач, каждой задаче второго этапа нужно вытащить 1000 файлов и передать их по сети, после слияния 100 узлов, в каждом узле по 2 ядер процессора, а каждая задача второго этапа в основном тянет 100*2=200 файлов, сильно ли снижается потребление производительности передачи по сети?

Напоминаем (слияние выходных файлов на стороне карты):
Только задачи, выполняемые параллельно, будут создавать новые выходные файлы; следующая партия задач, выполняемых параллельно, будет повторно использовать существующие выходные файлы; но есть исключение, например, две задачи выполняются параллельно, но в это время нужно выполнить две задачи. выполняться при запуске; тогда в это время выходные файлы, созданные только что двумя задачами, не могут быть повторно использованы; вместо этого можно только создавать новые выходные файлы.

Для достижения эффекта слияния выходных файлов сначала должен быть выполнен пакет задач, а затем должен быть выполнен следующий пакет задач для повторного использования предыдущих выходных файлов; он отвечает за одновременное выполнение нескольких пакетов задач, но его нельзя использовать повторно.

Отрегулируйте соотношение памяти кэш-памяти на стороне карты и памяти на стороне уменьшения.

Буфер памяти по умолчанию на стороне карты составляет 32 КБ на задачу. Совокупный коэффициент памяти на стороне сокращения составляет 0,2, что составляет 20%.

Если задача на стороне карты, объем обрабатываемых данных относительно велик, но размер буфера памяти фиксирован. Что может случиться?

Каждая задача обрабатывает 320кб, 32кб, итого 320/32=10 раз переполнения на диск.
Каждая задача обрабатывает 32000кб, 32кб, итого 32000/32=1000 раз переполнения на диск.

В случае, если объем данных, обрабатываемых задачей карты, относительно велик, а буфер памяти вашей задачи по умолчанию относительно мал, 32 КБ. Это может привести к множественным операциям переполнения со стороны карты в файл на диске, что приведет к большому объему операций ввода-вывода на диске, что приведет к снижению производительности.

Уменьшить агрегированную память, пропорцию. По умолчанию 0,2. Если объем данных относительно велик, а задача редукции извлекает много данных, часто бывает так, что совокупной памяти на стороне редукции недостаточно, часто возникают операции сброса и на диск записывается переполнение. И самое ужасное, что чем больше объем данных, записанных при переполнении диска, при последующем выполнении операции агрегации данные на диске, скорее всего, будут многократно считаны для агрегации. По умолчанию не настраивается, в случае большого объема данных может часто происходить чтение и запись файлов на диске на стороне сокращения.

Причина, по которой эти два пункта объединены, заключается в том, что они взаимосвязаны. При увеличении объема данных обязательно будут какие-то проблемы на стороне карты, обязательно будут какие-то проблемы на стороне редукции, проблемы те же, то есть дисковый IO частый и увеличивается, что влияет на производительность.

Как настроить

调节map task内存缓冲:
new SparkConf().set("spark.shuffle.file.buffer", "64")
默认32k(spark 1.3.x不是这个参数,后面还有一个后缀,kb;spark 1.5.x以后,变了,就是现在这个参数)

调节reduce端聚合内存占比:
new SparkConf().set("spark.shuffle.memoryFraction", "0.3")
默认0.2

В реальной производственной среде, когда мы настраиваем два параметра?

Посмотрите на пользовательский интерфейс Spark.Если ваша компания решит использовать автономный режим, это очень просто.Когда ваша искра запустится, она отобразит адрес пользовательского интерфейса Spark и порт 4040. Зайдите и посмотрите его, нажмите на него в очередь, и вы можете увидеть, что каждая из ваших деталей каждого этапа, какие исполнители есть, какие задачи есть, объем случайной записи и чтения каждой задачи, диск и память случайного воспроизведения, объем прочитанных данных и написано; если он отправлен в режиме пряжи, из интерфейса пряжи Войдите, щелкните соответствующее приложение, войдите в пользовательский интерфейс Spark и просмотрите подробности.

Если вы обнаружите, что скорость записи и чтения диска в случайном порядке очень велика. В настоящее время это означает, что лучше настроить некоторые параметры перемешивания. настраивать. Прежде всего, конечно, рассмотрите возможность открытия механизма слияния выходных файлов на стороне карты.

Отрегулируйте два параметра, упомянутых выше. Принципы при настройке. spark.shuffle.file.buffer, каждый раз удваивать размер, а потом смотреть на эффект, 64, 128; spark.shuffle.memoryFraction, каждый раз увеличивать на 0,1, смотреть на эффект.

Его нельзя настроить слишком сильно.Если он слишком большой, то будет слишком много.Поскольку ресурсы памяти ограничены, если вы настроите здесь слишком много, возникнут проблемы с использованием памяти другими ссылками.

Настройка SortShuffleManager

//阈值设置
new SparkConf().set("spark.shuffle.sort.bypassMergeThreshold", "550")
  • После spark 1.5.x появился новый менеджер для менеджера перетасовки, tungsten-sort (вольфрамовая проволока), менеджер сортировки вольфрамовой проволоки. На официальном сайте обычно говорится, что менеджер сортировки вольфрамовой проволоки имеет тот же эффект, что и менеджер сортировки. Однако единственное отличие состоит в том, что диспетчер tungsten wire использует набор механизмов управления памятью, реализованных сам по себе, что значительно повышает производительность и позволяет избежать большого объема памяти OOM, GC и других исключений, генерируемых в процессе тасования.

Подводя итог, теперь это равносильно разговору о перетасовке искр. Все понимают глубже. хэш, сортировка, вольфрам-сортировка. Как выбрать?

  1. Нужно ли, чтобы spark сортировал данные за вас по умолчанию? Как и в случае с mapreduce, по умолчанию используется сортировка по ключу. На самом деле, если он вам не нужен, рекомендуется использовать самый простой HashShuffleManager, потому что первое соображение — это не сортировка в обмен на высокую производительность;

  2. Когда вам нужен менеджер сортировки? Если вам нужно, чтобы ваши данные сортировались по ключу, то выберите это, и учтите, что количество задач редукции должно быть больше 200, чтобы мог работать механизм сортировки и слияния (объединения нескольких файлов в один). Однако здесь следует отметить, что вы должны продумать для себя, нужно ли это делать в процессе перемешивания, ведь это повлияет на производительность.

  3. Если вам не нужна сортировка, и вы хотите, чтобы файлы, выводимые каждой задачей, были объединены в одну копию, вы думаете, что можете уменьшить нагрузку на производительность; вы можете настроить порог bypassMergeThreshold, например, количество ваших задач сокращения 500 пороговое значение по умолчанию равно 200, поэтому сортировка и прямое слияние будут выполняться по умолчанию, порог можно настроить на 550, и сортировка выполняться не будет. наконец-то слились в один файл. (Обязательно напомним всем, что этот параметр, на самом деле, мы его обычно не используем в продакшн-среде, и мы не проверяли, насколько производительность можно улучшить таким образом)

  4. Если вы хотите использовать менеджер тасования на основе сортировки, а искровая версия вашей компании относительно высока, это версия 1.5.x, то вы можете попробовать использовать менеджер тасования сортировки на основе вольфрама. Обратите внимание на улучшение производительности и стабильности.

Суммировать:

  1. В производственной среде не рекомендуется опрометчиво использовать третий и четвертый пункты:
  2. Если вы не хотите, чтобы ваши данные сортировались во время перемешивания, то настройте его самостоятельно и используйте менеджер перемешивания хэшей.
  3. Если вам действительно нужно, чтобы ваши данные сортировались во время перемешивания, то вам не нужно делать это по умолчанию, по умолчанию используется диспетчер сортировки в случайном порядке; или что? Если вам вообще плевать на сортировку, то пусть он сортируется по умолчанию. Настройте некоторые другие параметры (механизм консолидации). (80%, все используют это)
spark.shuffle.manager:hash、sort、tungsten-sort

new SparkConf().set("spark.shuffle.manager", "hash")
new SparkConf().set("spark.shuffle.manager", "tungsten-sort")
// 默认就是,new SparkConf().set("spark.shuffle.manager", "sort")