Решение проблемы с маленькими файлами Spark SQL в OPPO

Spark

Эта статья подготовлена ​​командой OPPO Internet Basic Technology Team, пожалуйста, укажите автора для перепечатки. В то же время приглашаем обратить внимание на нашу общедоступную учетную запись: OPPO_tech, чтобы поделиться с вами передовыми интернет-технологиями и деятельностью OPPO.

Небольшие файлы Spark SQL относятся к файлам, размер которых значительно меньше размера блока hdfs. Слишком большое количество мелких файлов приведет к серьезным узким местам в производительности HDFS и создаст серьезные проблемы со стабильностью задач и обслуживанием кластеров.

Вообще говоря, задачи MR, запланированные Hive, могут просто установить следующие параметры для объединения небольших файлов, чтобы решить проблему небольших файлов, созданных задачами:

set hive.merge.mapfiles=true;
set hive.merge.mapredfiles=true;
set hive.merge.size.per.task=xxxx;
set hive.merge.smallfiles.avgsize=xxx;

Однако в процессе постепенного переноса задач автономного планирования из Hive в Spark, поскольку сам Spark не поддерживает функцию слияния небольших файлов, проблема небольших файлов становится все более заметной, что оказывает большое влияние на стабильность кластера и когда-то мешали нашей миграционной работе.

Чтобы решить проблему маленьких файлов, мы прошли разные этапы от непрерывной настройки параметров в начале до более поздней разработки кода.Вот вам простой обмен.

1. Почему Spark генерирует маленькие файлы

Количество файлов, генерируемых Spark, напрямую зависит от количества разделов в RDD и количества разделов таблицы. Обратите внимание, что две концепции разделов здесь разные: раздел RDD связан с параллелизмом задач, а раздел таблицы — это количество разделов Hive. Количество сгенерированных файлов обычно является произведением количества разделов RDD и количества разделов таблицы. Поэтому, когда параллелизм задачи слишком высок или количество разделов велико, легко сгенерировать много маленьких файлов.

Поэтому, если вам нужно настроить параметры для уменьшения количества генерируемых файлов, это может быть достигнуто только за счет уменьшения количества разделов RDD на последнем этапе (уменьшение количества разделов ограничено историческими данными и восходящими и нисходящими отношения, и это трудно изменить)

2. Решение на основе параметров версии сообщества

2.1 Простой статический раздел SQL без оператора Shuffle

Этот тип SQL относительно прост. В основном он предназначен для фильтрации части данных вышестоящей таблицы и записи их в нижестоящую таблицу или просто для объединения двух таблиц. Количество разделов для этой задачи в основном определяется количеством разделов. при чтении файлов.

  • Начиная со Spark 2.4 поддержка таблиц орков и паркета Hive была очень хорошей.Чтобы ускорить скорость работы, мы включили параметр для автоматического преобразования таблиц орков/паркетов Hive в DataSource. Для этого типа таблицы DataSource количество разделов в основном контролируется следующими тремя параметрами.
spark.sql.files.maxPartitionBytes;
spark.sql.files.opencostinbytes;
spark.default.parallelism;

Отношение показано на рисунке ниже, поэтому его можно настроить, настроив сегментирование входных данных, отрегулировав эти три параметра:

  • Вместо таблицы DataSource для чтения данных используется CombineInputFormat, поэтому настройка сегментации в основном выполняется через параметры MR: mapreduce.input.fileinputformat.split.minsize

Хотя мы можем скорректировать конечное количество файлов, настроив шардинг входных данных, такая корректировка нестабильна, и некоторые незначительные изменения в размере восходящих данных могут привести к повторной адаптации параметров.

Чтобы решить эту проблему просто и грубо, мы добавили в такой SQL намек на переразбиение и ввели новую перетасовку, чтобы количество файлов было фиксированным значением.

2.2 Статическая задача разбиения с оператором Shuffle

В ВЫПУСК СПАРК-9858 введен новый параметр: spark.sql.adaptive.shuffle.targetPostShuffleInputSize,

Позже, на основе адаптивной искры, этот параметр был дополнительно расширен.Количество разделов можно динамически регулировать, чтобы каждая задача максимально обрабатывала данные размером targetPostShuffleInputSize.Поэтому этот параметр также можно использовать для управления количеством файлов, созданных в определенной степени.

2.3 Задачи динамического разбиения

Для задач с динамическим разделом из-за переменного раздела трудно контролировать общее количество файлов, просто регулируя количество разделов на стороне rdd.

В улье мы можем установить hive.optimize.sort.dynamic.partition, чтобы облегчить ситуацию, когда узел задачи часто перемещается, когда задача выполняется из-за слишком большого количества файлов, созданных динамическим разделом. Такой параметр вводит новое перемешивание для изменения порядка данных и назначения одного и того же раздела одной и той же задаче для обработки, тем самым предотвращая одновременную работу задачи с несколькими файловыми дескрипторами.

Следовательно, мы можем использовать эту идею, чтобы использовать оператор распределения по для изменения sql для управления количеством файлов. В общем, если мы хотим генерировать не более N файлов на раздел, мы можем добавить DISTRIBUTE BY [столбец динамического раздела], ceil(rand() * N) в конце SQL.

3. Самостоятельно разработанная схема commitProtocol, которая может объединять файлы

Подводя итог, каждый метод имеет определенные недостатки, и многие правила также создают большие проблемы для бизнеса в процессе фактического использования.

Поэтому у нас возникла идея реализовать на стороне искры небольшой механизм слияния файлов, аналогичный улью. Среди нескольких возможных вариантов мы наконец выбрали: переписать метод spark.sql.sources.commitProtocolClass.

С одной стороны, это решение не мешает коду Spark, что удобно для обслуживания исходного кода Spark. В процессе использования бизнес-стороны требуются только простые настройки: spark.sql.sources.commitProtocolClass вы можете контролировать, включать ли слияние небольших файлов.

После включения параметра слияния небольших файлов мы получим все сгенерированные файлы на этапе фиксации и введем два новых задания для обработки этих файлов. Сначала получаем все файлы, размер которых меньше spark.compact.smallfile.size в первом задании, объединяем файлы по значению параметра spark.compact.size после завершения поиска и объединяем эти файлы во втором задании .