Это 18-й день моего участия в Gengwen Challenge.Подробности о мероприятии, пожалуйста, проверьте:Обновить вызов
Общая настройка производительности 4: Настройка параллелизма
Степень параллелизма в задании Spark относится к количеству задач на каждом этапе.
Если настройка параллелизма необоснованна и параллелизм слишком низкий, это приведет к большой трате ресурсов, например, если Исполнителей 20, каждому Исполнителю выделяется 3 ядра ЦП, а задание Spark имеет 40 задач, так что каждому исполнителю назначается количество задач, равное 2, что приводит к тому, что у каждого исполнителя одно ядро процессора простаивает, что приводит к пустой трате ресурсов.
Идеальная настройка параллелизма должна соответствовать параллелизму с ресурсами.Короче говоря, параллелизм должен быть установлен настолько большим, насколько это позволяют ресурсы, чтобы можно было полностью использовать ресурсы кластера. Разумная настройка параллелизма может повысить производительность и скорость выполнения всего задания Spark.
Spark официально рекомендует, чтобы количество задач было в 2–3 раза больше общего количества ядер ЦП в задании Spark. Причина, по которой не рекомендуется, чтобы количество задач равнялось общему количеству ядер процессора, заключается в том, что время выполнения задач различно.Некоторые задачи выполняются быстро, а некоторые задачи выполняются медленно.Если количество задач равно общее количество ядер ЦП выполняется быстрая задача, после чего ядро ЦП будет простаивать. Если количество задач в 2–3 раза превышает общее количество ядер ЦП, то после выполнения задачи ядро ЦП немедленно выполнит следующую задачу, что снижает потери ресурсов и повышает эффективность заданий Spark.
Параллелизм заданий Spark устанавливается следующим образом:
val conf = new SparkConf()
.set("spark.default.parallelism", "500")
Общая настройка производительности пять: широковещательная большая переменная
По умолчанию, если внешняя переменная используется в операторе в задаче, каждая задача получит копию переменной, которая приводит к огромному потреблению памяти. С одной стороны, если RDD впоследствии сохраняется, данные RDD могут не храниться в памяти, но могут быть записаны только на диск, и дисковый IO будет серьезно потреблять производительность; с другой стороны, когда задача создает задачу Объект может найти кучу памяти не может хранить вновь созданные объекты, которые приведут к частым GC, что приведет к тому, что рабочие потоки останавливаются, что, в свою очередь, вызывает искру, чтобы приостановить работу в течение определенного периода времени, что серьезно влияет на работу Выступление искры.
Предположим, что текущая задача настроена с 20 Исполнителями, указано 500 задач и переменная 20M является общей для всех задач.В это время в 500 задачах будет сгенерировано 500 копий, потребляющих 10G памяти в кластере.Если широковещательные переменные используются, то каждый Executor сохраняет копию, которая потребляет в общей сложности 400M памяти, а потребление памяти уменьшается в 5 раз. Широковещательная переменная сохраняет копию в каждом Исполнителе, и все задачи этого Исполнителя совместно используют эту широковещательную переменную, что значительно уменьшает количество копий, генерируемых переменной.
На начальном этапе широковещательная переменная имеет копию только в Драйвере. Когда задача выполняется, она хочет использовать данные в широковещательной переменной. В это время она сначала попытается получить переменную в BlockManager, соответствующую ее локальному исполнителю. Если она не существует локально, BlockManager будет удаленно извлекать ее из Драйвера или Блок-Менеджера других узлов Копией переменной управляет локальный Блок-Менеджер, после этого все задачи этого Исполнителя будут получать переменную напрямую из локального Блок-Менеджера.
Шестая настройка общей производительности: сериализация Kryo
По умолчанию Spark использует механизм сериализации Java. Механизм сериализации Java прост в использовании и не требует дополнительной настройки.Переменные, используемые в операторе, могут реализовывать интерфейс Serializable.Однако механизм сериализации Java неэффективен, скорость сериализации низкая, а сериализованные данные по-прежнему занимают место. большой.
Производительность механизма сериализации Kryo примерно в 10 раз выше, чем у механизма сериализации Java.Причина, по которой Spark не использует Kryo в качестве библиотеки классов сериализации по умолчанию, заключается в том, что он не поддерживает сериализацию всех объектов, а Kryo требует от пользователей регистрации и сериализовать перед использованием Тип недостаточно удобен, но, начиная с Spark 2.0.0, Shuffling RDD простого типа, массива простого типа и строкового типа по умолчанию использовали сериализацию Kryo.
public class MyKryoRegistrator implements KryoRegistrator
{
@Override
public void registerClasses(Kryo kryo)
{
kryo.register(StartupReportLogs.class);
}
}
Пример кода для настройки kryo сериализации
//创建SparkConf对象
val conf = new SparkConf().setMaster(…).setAppName(…)
//使用Kryo序列化库,如果要使用Java序列化库,需要把该行屏蔽掉
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
//在Kryo序列化库中注册自定义的类集合,如果要使用Java序列化库,需要把该行屏蔽掉
conf.set("spark.kryo.registrator", "atguigu.com.MyKryoRegistrator");