Это 22-й день моего участия в Gengwen Challenge, Подробную информацию о мероприятии см.:Обновить вызов
Четвертая настройка оператора: использование фильтра и объединения
В задачах Spark мы часто используем оператор фильтра для фильтрации данных в RDD.На начальном этапе задачи объем данных, загружаемых из каждого раздела, одинаков, но после фильтрации с помощью фильтра объем данных в каждом разделе Там могут быть большие различия, как показано на рисунке:
- Согласно информации на рисунке, мы можем найти две проблемы:
-
Объем данных в каждом разделе стал меньше, если текущие данные обрабатываются по количеству задач, равному предыдущему разделу, это приведет к трате вычислительных ресурсов задачи;
-
Объем данных в каждом разделе разный, поэтому каждая задача будет обрабатывать разный объем данных, когда каждая последующая задача обрабатывает данные в каждом разделе, что, вероятно, приведет к искажению данных.
-
Как показано на рисунке выше, после фильтрации второй секции остается только 100 единиц данных, а после фильтрации третьей секции остается 800. При той же логике обработки количество данных, обрабатываемых задачей, соответствующей второй раздел Разница в количестве данных, обрабатываемых задачей, соответствующей третьему разделу, составляет 8 раз, что также приведет к разнице в скорости выполнения в несколько раз, что является проблемой перекоса данных.
-
Ввиду указанных выше двух проблем, проанализируем их отдельно:
-
По первому вопросу, поскольку объем данных в разделе стал меньше, мы надеемся, что данные раздела можно будет перераспределить, например, преобразовать данные исходных 4 разделов в 2 раздела, чтобы нам нужно было использовать только следующий две задачи на обработку, то есть избегается трата ресурсов.
-
Для второй проблемы решение очень похоже на решение первой проблемы.Данные раздела перераспределяются таким образом, чтобы объем данных в каждом разделе был одинаковым, что позволяет избежать проблемы перекоса данных.
-
Итак, как следует реализовать вышеуказанное решение? Нам нужен оператор объединения.
Для повторного разбиения можно использовать как repartition, так и объединение, где перераспределение — это просто простая реализация перетасовки в интерфейсе объединения, правда, объединение не выполняет перемешивание по умолчанию, но это можно установить с помощью параметров.
- Предположим, мы хотим изменить исходное количество разделов A на B путем переразметки, тогда возможны следующие ситуации:
-
A > B (большинство разделов объединены в меньшинство)
- Разница между A и B невелика: В это время вы можете использовать объединение без процесса перемешивания.
- Разница между A и B очень велика: В это время можно использовать объединение и процесс перемешивания не включен, но производительность процесса объединения будет низкой, поэтому рекомендуется установить второй параметр объединения в значение true. , то есть запустить процесс перемешивания.
-
A
- В это время можно использовать перераспределение.Если используется объединение, необходимо установить в случайном порядке значение true, иначе объединение будет недействительным.
После операции фильтра мы можем использовать оператор объединения для сжатия количества разделов, когда объем данных каждого раздела отличается, и сделать объем данных каждого раздела как можно более однородным и компактным, чтобы последующие задачи могли выполнять вычисления. В определенной степени производительность может быть улучшена до определенной степени.
- В это время можно использовать перераспределение.Если используется объединение, необходимо установить в случайном порядке значение true, иначе объединение будет недействительным.
-
Примечание. Локальный режим имитирует работу кластера внутри процесса и уже оптимизировал степень параллелизма и количество разделов внутри, поэтому нет необходимости устанавливать степень параллелизма и количество разделов.
Пятая настройка оператора: предварительная агрегация reduceByKey
По сравнению с обычной операцией перемешивания важной особенностью метода reduceByKey является то, что он выполняет локальную агрегацию на стороне карты.Сначала карта выполняет операцию объединения локальных данных, а затем записывает данные в файл, созданный каждой задачей. следующего этапа, то есть на стороне карты оператор-функция reduceByKey выполняется для значения, соответствующего каждому ключу. Процесс выполнения оператора reduceByKey показан на рисунке:
- Повышение производительности при использовании reduceByKey выглядит следующим образом:
- После локальной агрегации объем данных на стороне карты уменьшается, что снижает дисковый ввод-вывод и уменьшает занимаемое дисковое пространство;
- После локальной агрегации объем данных, получаемых на следующем этапе, уменьшается, что снижает объем данных, передаваемых по сети;
- После локальной агрегации использование памяти для кэширования данных на стороне сокращения уменьшается;
- После локальной агрегации объем данных, подлежащих агрегированию на стороне редукции, уменьшается.
Основываясь на функции локального агрегирования в reduceByKey, нам следует рассмотреть возможность использования reduceByKey вместо других операторов тасования, таких как groupByKey. Принципы работы reduceByKey и groupByKey показаны на рисунке:
принцип groupByKey
принцип сокращения по ключу
Согласно приведенному выше рисунку, groupByKey не выполняет агрегацию на стороне карты, а перемешивает все данные на стороне карты на стороне редукции, а затем выполняет операции агрегации данных на стороне редукции. Так как у reduceByKey есть функция агрегации на стороне карты, объем данных, передаваемых по сети, уменьшается, поэтому эффективность значительно выше, чем у groupByKey.