Ниже приводится отчет Гуо Цзюня, главы архитектуры хранилища данных ByteDance: «Основная практика оптимизации ByteDance на Spark SQL».
представление команды
обобщать
Введение в архитектуру Spark SQL
Вышеуказанная информация хранится в Каталоге. В производственной среде служба каталога обычно предоставляется хранилищем метаданных Hive. Анализатор преобразует неразрешенный логический план в разрешенный логический план в сочетании с каталогом.
Недостаточно, чтобы добраться сюда. SQL, написанный разными людьми, отличается, сгенерированный Resolved Logical Plan также отличается, и эффективность выполнения также отличается. Чтобы обеспечить эффективное выполнение, независимо от того, как пользователь пишет SQL, Spark SQL необходимо оптимизировать разрешенный логический план, который выполняется оптимизатором. Оптимизатор содержит ряд правил для выполнения эквивалентного преобразования разрешенного логического плана и, наконец, создания оптимизированного логического плана. Оптимизированный логический план не обязательно будет оптимальным в глобальном масштабе, но, по крайней мере, близок к оптимальному.
Потому что один и тот же логический оператор может иметь несколько физических реализаций. Например, существует несколько реализаций соединения, таких как ShuffledHashJoin, BroadcastHashJoin, BroadcastNestedLoopJoin, SortMergeJoin и т. д. Таким образом, Планировщик запросов может преобразовать Оптимизированный логический план в несколько физических планов. Выбор оптимального физического плана становится вопросом, который сильно влияет на конечную производительность выполнения. Лучшим способом является создание стоимостной модели, применение модели ко всем возможным физическим планам и выбор физического плана с наименьшей стоимостью в качестве окончательного выбранного физического плана.
Некоторые оптимизации, сделанные ByteDance в Spark SQL, представлены позже, в основном с упором на оптимизацию логического плана и оптимизацию физического плана, представленную в этом разделе.
Оптимизация движка Spark SQL
Улучшения объединения сегментов
На следующей диаграмме показан основной принцип SortMergeJoin. Таблица 1 и Таблица 2, представленные пунктирными прямоугольниками, представляют собой две таблицы, которые необходимо соединить определенным полем. Раздел от 0 до раздела m в пунктирной рамке — это разделы после преобразования таблицы в RDD, а не разделы таблицы. Предположим, что таблица 1 и таблица 2 содержат m и k разделов соответственно после преобразования в RDD. Чтобы присоединиться, необходимо использовать Shuffle, чтобы убедиться, что данные одного и того же ключа соединения находятся в одном и том же разделе и отсортированы по ключу в разделе, и в то же время убедиться, что Таблица 1 и Таблица 2 имеют одинаковые значения. количество разделов RDD после перемешивания.
Для сценариев больших данных данные обычно записываются один раз и запрашиваются несколько раз. Если вы часто присоединяетесь к двум столам одним и тем же или похожим способом, вам нужно каждый раз платить цену Shuffle. Вместо этого лучше разрешить распределение данных таким образом, чтобы это способствовало объединению при записи данных, чтобы не было необходимости выполнять перемешивание при объединении. Как показано на рисунке ниже, данные в таблице 1 и таблице 2 группируются в соответствии с одним и тем же ключом, количество сегментов равно n, и сегменты сортируются по ключу. При объединении этих двух таблиц вы можете избежать случайного воспроизведения и напрямую запустить n задач для объединения.
Улучшение первое:Поддержка совместимости с Hive
Чтобы решить эту проблему, мы позволяем Spark SQL поддерживать режим совместимости с Hive, чтобы гарантировать, что таблица Bucket, написанная Spark SQL, имеет тот же эффект, что и таблица Bucket, написанная Hive, и этот тип таблицы можно использовать в качестве таблицы. Таблица сегментов Hive и Spark SQL для соединения сегментов. Перемешивание не требуется. Таким образом гарантируется прозрачная миграция Hive в Spark SQL.
Улучшение два:Поддержка нескольких отношений Bucket Join
В первом методе количество задач равно количеству сегментов в маленькой таблице. Как показано на рисунке ниже, таблица A содержит 3 сегмента, а таблица B — 6 сегментов. В это время набор данных корзины 0 и корзины 3 таблицы B должен быть объединен с корзиной 0 таблицы A. В этом случае можно запустить 3 Задания. Среди них Задача 0 объединяет сегмент 0 таблицы A и сегмент 0 + сегмент 3 таблицы B. Здесь необходимо выполнить еще одну сортировку слиянием данных корзины 0 и корзины 3 таблицы B, чтобы обеспечить порядок коллекции.
Если количество сегментов в таблице A и таблице B не сильно отличается, вы можете использовать описанный выше метод. Если количество сегментов в таблице B в 10 раз превышает количество сегментов в сегменте A, хотя описанный выше метод не использует Shuffle, он может быть медленнее, чем SortMergeJoin, включая Shuffle, из-за недостаточного параллелизма. В это время можно использовать другой метод, то есть количество задач равно количеству сегментов в большой таблице, как показано на следующем рисунке.
Улучшение третье:Поддержка перехода на BucketJoin
В то же время из-за быстрого увеличения объема данных средний размер корзины также быстро увеличивался. Это приведет к тому, что объем данных, обрабатываемых одной задачей, будет слишком большим, и эффект от использования Bucket может быть не таким хорошим, как при непосредственном соединении на основе Shuffle.
Улучшение четвертое:поддержка суперсет
Как показано на рисунке ниже, таблица X и таблица Y разделены на сегменты в соответствии с полем A. Запрос должен объединить таблицу X и таблицу Y, а набор ключей соединения — это A и B. В настоящее время, поскольку данные A равны, а идентификаторы сегментов в двух таблицах одинаковы, идентификаторы сегментов равных данных в A и B должны быть одинаковыми в двух таблицах, чтобы распределение данных соответствовало требованиям. Присоединяйтесь и не нуждается в перемешивании. В то же время Bucket Join также должен гарантировать, что две таблицы отсортированы в соответствии с набором ключей соединения, а именно A и B. В этом случае в разделе необходимо отсортировать только таблицы X и Y. Поскольку обе стороны уже отсортированы по полю A, в настоящее время сортировка по полям A и B относительно дешева.
материализованный столбец
Когда Spark SQL обрабатывает данные вложенных типов, возникают следующие проблемы:
- читать много ненужных данных: Для столбцовых форматов хранения, таких как Parquet/ORC, могут быть прочитаны только обязательные поля, а другие поля могут быть пропущены напрямую, что значительно экономит ввод-вывод. Для полей вложенных типов данных, таких как поле people типа Map на следующем рисунке, часто необходимо только прочитать его подполя, такие как people.age. Однако необходимо прочитать все поля людей всего типа карты, а затем извлечь поле people.age. Это вводит много бессмысленных накладных расходов на ввод-вывод. В нашем сценарии много полей типа Map, и многие из них содержат от десятков до сотен ключей, а это значит, что IO усиливается в десятки-сотни раз.
- Невозможно выполнить векторизованное чтение: И векторизованное чтение может значительно повысить производительность. Но на данный момент (26 октября 2019 г.) Spark не поддерживает векторизованное чтение с вложенными типами данных. Это сильно влияет на производительность запросов, содержащих вложенные типы данных.
Выталкивание фильтра не поддерживается: В настоящее время (26 октября 2019 г.) Spark не поддерживает раскрывающийся фильтр для полей вложенного типа.
Повторный расчет: поле JSON, которое существует как тип String в Spark SQL и не является строго вложенным типом данных. Однако на практике это также часто используется для сохранения нескольких полей, которые не являются фиксированными.При запросе целевое подполе извлекается через JSON Path, а извлечение поля больших строк JSON потребляет много ресурсов ЦП. Для таблицы горячих точек очень расточительно часто и многократно извлекать одно и то же подполе.
Для этой проблемы студенты, занимающиеся хранилищами данных, также придумали несколько решений. Как показано на рисунке ниже, таблица с именем sub_table создается вне таблицы с именем base_table, а часто используемое подполе people.age задается как дополнительное поле типа Integer. Нисходящий поток больше не запрашивает people.age через base_table, а вместо этого использует поле age в sub_table. Таким образом, запрос к полю вложенного типа превращается в запрос к полю примитивного типа, и одновременно решаются вышеуказанные проблемы.
- Поддерживается дополнительная таблица, что приводит к большим дополнительным затратам памяти/вычислений.
- Исторические данные вновь добавленных полей не могут быть запрошены в новой таблице (если вы хотите поддерживать запрос исторических данных, вам необходимо перезапустить историческое задание, что слишком дорого и неприемлемо).
- Сопровождающий таблицы должен изменить работу по вставке данных после изменения структуры таблицы.
- Стороне запроса ниже по течению необходимо изменить оператор запроса, и стоимость продвижения высока.
- Высокие эксплуатационные расходы: если изменяются часто используемые подполя, необходимо удалить независимые подполя, которые больше не нужны, и добавить новые подполя в качестве независимых полей. Перед удалением необходимо убедиться, что никакие последующие предприятия не используют это поле. Необходимо уведомить о новых добавленных полях и передать их нижестоящим бизнес-сторонам для использования новых полей.
- Добавьте поле примитивного типа, например поле age типа Integer, и укажите, что это материализованное поле people.age.
- При вставке данных данные автоматически генерируются для материализованного поля, а материализованное отношение сохраняется в параметре раздела. Поэтому операция вставки данных полностью прозрачна, и сопровождающему таблицы не нужно модифицировать существующую операцию.
- При запросе проверьте все разделы требуемого запроса, если они содержат материализованную информацию (сопоставление людей.возраст с возрастом), напрямую перепишите select people.age для выбора возраста, чтобы добиться полностью прозрачной оптимизации для нижестоящей стороны запроса. Также совместим с историческими данными.
На следующей диаграмме показаны преимущества использования материализованных столбцов в основной таблице:
В поле OLAP трудоемкие операции, такие как Group By и Aggregate/Join, часто выполняются с определенными фиксированными полями одной и той же таблицы, что приводит к большому количеству повторяющихся вычислений, пустой трате ресурсов и влиянию на производительность запросов, что не способствует для улучшения пользовательского опыта.
Как показано на изображении выше, история запросов показывает большое количество запросов, которые группируются по пользователю, а затем суммируются или подсчитываются по количеству. На этом этапе вы можете создать материализованное представление и выполнить gorup для пользователя и avg для num (среднее значение будет автоматически преобразовано в количество и сумму). Когда пользователь выполняет запрос select user, sum(num) к исходной таблице, Spark SQL автоматически переписывает запрос в запрос select user, sum_num к материализованному представлению.
Другие оптимизации в движке Spark SQL
Повышение стабильности Spark Shuffle и оптимизация производительности
Проблемы с Spark Shuffle
Как показано на рисунке выше, мы называем этап Shuffle upstream Mapper Stage, а Task в нем называется Mapper. Нисходящий этап Shuffle называется этапом редуктора, а задача в нем называется редуктором.
Каждый Mapper делит свои данные не более чем на N частей, где N — количество редукторов. Каждый редьюсер должен обратиться не более чем к M (количество картографов) картографов, чтобы получить свою часть данных.
- проблемы со стабильностью: данные Mapper's Shuffle Write хранятся на локальном диске Mapper, и существует только одна копия. Когда на машине произошел сбой диска или ввод-вывод переполнен, а ЦП переполнен, Редюсер не может прочитать данные, вызывая исключение FetchFailedException, которое, в свою очередь, приводит к повторной попытке этапа. Stage Retry увеличивает время выполнения задания и напрямую влияет на SLA. В то же время, чем больше время выполнения, тем больше вероятность того, что данные Shuffle не могут быть прочитаны, что, в свою очередь, вызовет больше повторных попыток выполнения. Такой цикл может привести к сбою выполнения больших заданий.
- проблемы с производительностью: данные каждого картографа будут считываться большим количеством редукторов, а разные части считываются случайным образом. Предполагая, что выходные данные Mapper в случайном порядке составляют 512 МБ и имеется 100 000 редукторов, средние данные, считываемые каждым редуктором, составляют 512 МБ / 100 000 = 5,24 КБ. Кроме того, разные редукторы считывают данные параллельно. Для выходного файла Mapper выполняется множество случайных чтений. Производительность произвольного ввода-вывода жесткого диска намного ниже, чем последовательный ввод-вывод. Последнее явление заключается в том, что Reducer считывает данные Shuffle очень медленно, что отражается в метриках, свидетельствующих о том, что время блокировки чтения в случайном порядке в Reducer больше, даже на его долю приходится более половины всего времени выполнения Reducer, как показано на следующем рисунке.
Улучшение стабильности в случайном порядке на основе HDFS
Как показано на рисунке выше, загрузка ЦП машины близка к 100 %, из-за чего служба Spark External Shuffle в диспетчере узлов на стороне Mapper не может вовремя предоставить службу Shuffle.
Независимо от причины, суть проблемы в том, что данные случайной записи на стороне Mapper хранятся только локально.Как только проблема возникает на узле, все данные произвольной записи на узле не могут быть прочитаны Reducer. Общее решение этой проблемы — обеспечить доступность с помощью нескольких реплик.
Первоначальное простое решение заключается в том, что окончательные файлы данных и индексные файлы на стороне Mapper не записываются на локальный диск, а напрямую записываются в HDFS. Редуктор больше не считывает данные Shuffle через внешнюю службу Shuffle на стороне Mapper, а напрямую получает данные из HDFS, как показано на следующем рисунке.
После быстрой реализации этого решения мы провели несколько простых наборов тестов. Результаты показывают:
- Когда не так много картографов и редукторов, производительность чтения и записи Shuffle ничем не отличается от исходного решения.
- Когда есть много картографов и редукторов, чтение в случайном порядке становится очень медленным.
Причина в том, что в общей сложности 10 000 редукторов, файлы данных и индексные файлы должны быть прочитаны из 10 000 картографов, и в общей сложности необходимо прочитать 10 000 * 1 000 * 2 = 200 миллионов HDFS.
Если это всего лишь проблема с производительностью Name Node, существует несколько простых способов ее решения. Например, местоположение блока всех картографов сохраняется на стороне драйвера Spark, а затем драйвер транслирует информацию всем исполнителям.Каждый редуктор может напрямую получить расположение блока от исполнителя, а затем напрямую считывает данные из узла данных. без подключения к Name Node. Однако, учитывая многопоточную модель Data Node, это решение окажет большее влияние на Data Node.
Выходные данные Mapper's Shuffle по-прежнему записываются на локальный диск по оригинальной схеме, а после записи загружаются в HDFS. Редуктор по-прежнему считывает данные Shuffle через External Shuffle Service на стороне Mapper по исходной схеме. Если это не удается, прочитайте из HDFS. Эта схема значительно снижает частоту доступа к HDFS.
Программа существует уже почти год:
- Охватывает более 57% данных Spark Shuffle.
- Это повышает общую производительность заданий Spark на 14%.
- Производительность крупномасштабных операций увеличена на 18%.
Почасовая производительность труда увеличилась на 12%.
Это решение направлено на повышение стабильности Spark Shuffle и, таким образом, на повышение стабильности работы, но в конечном итоге не использует такие показатели, как дисперсия, для измерения улучшения стабильности. Причина в том, что нагрузка на кластер каждый день разная, и общая дисперсия велика. После улучшения стабильности Shuffle число повторных попыток значительно сокращается, а общее время выполнения задания сокращается, то есть повышается производительность. Наконец, повышение производительности сравнивается путем сравнения общего времени выполнения задания до и после использования схемы для измерения эффекта схемы.
Как проанализировано выше, причина проблемы с производительностью Shuffle заключается в том, что Shuffle Write выполняется Mapper, а затем Reducer должен считывать данные со всех Mapper. Эту модель мы называем Mapper-centric Shuffle. Проблема с ним заключается в следующем:
- На стороне Mapper будет M операций ввода-вывода с последовательной записью.
- На стороне картографа будет M * N * 2 операций ввода-вывода со случайным чтением (это самое большое узкое место в производительности).
- Внешняя служба Shuffle на стороне Mapper должна располагаться на том же компьютере, что и Mapper, который не может эффективно разделить хранилище и вычисления, а службу Shuffle нельзя расширять независимо.
- Превратите M * N * 2 случайных операций ввода-вывода в N последовательных операций ввода-вывода.
- Службу Shuffle можно развернуть независимо от Mapper или Reducer, чтобы добиться независимого расширения и разделения хранилища и вычислений.
- Служба Shuffle может напрямую хранить данные в хранилище с высокой доступностью, таком как HDFS, что одновременно решает проблему стабильности Shuffle.
Основные моменты контроля качества
Ответ: Исторических данных слишком много, и изменять исторические данные нецелесообразно.
Ответ: Вообще говоря, данные, измененные пользователем, находятся в единице раздела. Поэтому мы сохраняем информацию о материализованном столбце в параметре раздела. Если запрос пользователя содержит как новый раздел, так и исторический раздел, мы выполним SQL Rewrite для материализованного столбца в новом разделе, но не в историческом разделе, а затем объединим новый и старый разделы, чтобы обеспечить правильность как можно больше данных. Воспользуйтесь преимуществами материализованных столбцов.
Ответ: В настоящее время мы в основном используем некоторую аудиторскую информацию для облегчения ручного анализа. В то же время мы также делаем рекомендательные службы для материализованных столбцов и материализованных представлений и, наконец, добиваемся интеллектуального построения материализованных столбцов и материализованных представлений.
Ответ: Идея хорошая, мы ее уже рассматривали, но по ряду соображений так и не сделали в итоге. Во-первых, объем выходных данных Shuffle для одного Mapper, как правило, невелик, и загрузка в HDFS занимает менее 2 секунд, и на это время можно не обращать внимания.Во-вторых, мы широко используем службу External Shuffle Service и Dynamic Allocation. Mapper выполняется, Executor может быть Recycling, если вы хотите загружать асинхронно, вы должны полагаться на другие компоненты, что увеличит сложность и снизит ROI.
Еще больше чудесного обмена
Обзор Шанхайского салона | Как ByteDance оптимизировала платформу HDFS с 10 000 узлов
Обзор салона в Шанхае | Применение Redis Cache в сценариях больших данных
Салон технологий ByteDance
Технологический салон ByteDance приглашает технических экспертов из ByteDance и интернет-компаний отрасли для обмена горячими техническими темами и передовым практическим опытом, охватывающим архитектуру, большие данные, интерфейс, тестирование, эксплуатацию и обслуживание, алгоритмы, системы и другие технические области. .
ByteDance Technology Salon стремится предоставить открытую и бесплатную платформу обмена и обучения для талантов в технической области, чтобы помочь техническим специалистам учиться, расти и продолжать развиваться.