Основная практика оптимизации ByteDance на Spark SQL | Технологический салон ByteDance

Архитектура Spark
Основная практика оптимизации ByteDance на Spark SQL | Технологический салон ByteDance
26 октября,Салон технологий ByteDance | Сессия по архитектуре больших данных Он успешно завершился в штаб-квартире ByteDance в Шанхае. Мы пригласили Гуо Цзюня, руководителя архитектуры хранилища данных ByteDance, инженера по исследованиям и разработкам в области больших данных Kyligence — Тао Цзятао, инженера по хранению данных ByteDance — Сюй Минмина и старшего технического эксперта Alibaba Cloud — Бай Чена.

Ниже приводится отчет Гуо Цзюня, главы архитектуры хранилища данных ByteDance: «Основная практика оптимизации ByteDance на Spark SQL».

представление команды

Группа архитектуры хранилища данных отвечает за проектирование архитектуры области хранилища данных, поддерживая почти все линейки продуктов ByteDance (включая, помимо прочего, Douyin, Toutiao, Watermelon Video, Volcano Video), требования хранилища данных, такие как Spark SQL / Druid. вторичная разработка и оптимизация.

обобщать

Сегодняшняя публикация разделена на три части: первая часть представляет собой введение в архитектуру SparkSQL, вторая часть знакомит с практикой оптимизации ByteDance на движке SparkSQL, а третья часть представляет собой улучшение стабильности и оптимизацию производительности ByteDance в практике Spark Shuffle и исследование.

Введение в архитектуру Spark SQL

Кратко поговорим об архитектуре Spark SQL. На следующем рисунке показаны этапы, которые должен пройти SQL после отправки.Объединив эти этапы, вы увидите, какие ссылки можно оптимизировать.



Студенты, которые занимаются моделированием хранилища данных, часто предпочитают писать SQL напрямую, а не использовать DSL Spark. После отправки SQL он будет проанализирован синтаксическим анализатором и преобразован в неразрешенный логический план. Его основное внимание уделяется логическому плану или логическому плану, который описывает, какой запрос вы хотите выполнить. Неразрешенный означает, что некоторая информация, относящаяся к запросу, неизвестна, например схема и расположение данных целевой таблицы запроса.

Вышеуказанная информация хранится в Каталоге. В производственной среде служба каталога обычно предоставляется хранилищем метаданных Hive. Анализатор преобразует неразрешенный логический план в разрешенный логический план в сочетании с каталогом.

Недостаточно, чтобы добраться сюда. SQL, написанный разными людьми, отличается, сгенерированный Resolved Logical Plan также отличается, и эффективность выполнения также отличается. Чтобы обеспечить эффективное выполнение, независимо от того, как пользователь пишет SQL, Spark SQL необходимо оптимизировать разрешенный логический план, который выполняется оптимизатором. Оптимизатор содержит ряд правил для выполнения эквивалентного преобразования разрешенного логического плана и, наконец, создания оптимизированного логического плана. Оптимизированный логический план не обязательно будет оптимальным в глобальном масштабе, но, по крайней мере, близок к оптимальному.

Вышеупомянутый процесс связан только с SQL и запросами, но не имеет ничего общего со Spark, поэтому его нельзя напрямую отправить в Spark для выполнения. Планировщик запросов отвечает за преобразование оптимизированного логического плана в физический план, который затем может выполняться непосредственно Spark.

Потому что один и тот же логический оператор может иметь несколько физических реализаций. Например, существует несколько реализаций соединения, таких как ShuffledHashJoin, BroadcastHashJoin, BroadcastNestedLoopJoin, SortMergeJoin и т. д. Таким образом, Планировщик запросов может преобразовать Оптимизированный логический план в несколько физических планов. Выбор оптимального физического плана становится вопросом, который сильно влияет на конечную производительность выполнения. Лучшим способом является создание стоимостной модели, применение модели ко всем возможным физическим планам и выбор физического плана с наименьшей стоимостью в качестве окончательного выбранного физического плана.

Физический план можно напрямую преобразовать в RDD для выполнения Spark. Мы часто говорим, что "план не поспевает за изменениями". В процессе выполнения может быть обнаружено, что первоначальный план не оптимален. Если последующий план выполнения можно скорректировать в соответствии со статистикой времени выполнения, общая эффективность выполнения может быть улучшен. Эта часть динамической настройки выполняется Adaptive Execution.

Некоторые оптимизации, сделанные ByteDance в Spark SQL, представлены позже, в основном с упором на оптимизацию логического плана и оптимизацию физического плана, представленную в этом разделе.

Оптимизация движка Spark SQL

Улучшения объединения сегментов

В Spark фактически нет оператора Bucket Join. Упомянутое здесь объединение сегментов обычно относится к SortMergeJoin, для которого не требуется перемешивание.

На следующей диаграмме показан основной принцип SortMergeJoin. Таблица 1 и Таблица 2, представленные пунктирными прямоугольниками, представляют собой две таблицы, которые необходимо соединить определенным полем. Раздел от 0 до раздела m в пунктирной рамке — это разделы после преобразования таблицы в RDD, а не разделы таблицы. Предположим, что таблица 1 и таблица 2 содержат m и k разделов соответственно после преобразования в RDD. Чтобы присоединиться, необходимо использовать Shuffle, чтобы убедиться, что данные одного и того же ключа соединения находятся в одном и том же разделе и отсортированы по ключу в разделе, и в то же время убедиться, что Таблица 1 и Таблица 2 имеют одинаковые значения. количество разделов RDD после перемешивания.

Как показано на рисунке ниже, после Shuffle необходимо запустить только n задач, и каждая задача обрабатывает данные соответствующего раздела в таблице 1 и таблице 2 для соединения. Например, Задаче 0 нужно только последовательно сканировать левый и правый раздел 0 после перемешивания, чтобы завершить соединение.



Преимущество этого метода заключается в том, что он применим к широкому спектру сценариев и может использоваться для набора данных практически любого размера. Недостатком является то, что при каждом объединении необходимо перемешивать весь объем данных, а перемешивание — это ссылка, которая больше всего влияет на производительность Spark SQL. Если можно избежать Shuffle, производительность Spark SQL может быть значительно улучшена.

Для сценариев больших данных данные обычно записываются один раз и запрашиваются несколько раз. Если вы часто присоединяетесь к двум столам одним и тем же или похожим способом, вам нужно каждый раз платить цену Shuffle. Вместо этого лучше разрешить распределение данных таким образом, чтобы это способствовало объединению при записи данных, чтобы не было необходимости выполнять перемешивание при объединении. Как показано на рисунке ниже, данные в таблице 1 и таблице 2 группируются в соответствии с одним и тем же ключом, количество сегментов равно n, и сегменты сортируются по ключу. При объединении этих двух таблиц вы можете избежать случайного воспроизведения и напрямую запустить n задач для объединения.




ByteDance внесла четыре основных улучшения в BucketJoin Spark SQL.

Улучшение первое:Поддержка совместимости с Hive

За прошедший период времени ByteDance перенесла большое количество заданий Hive в SparkSQL. Hive несовместим с таблицей Bucket Spark SQL. В сценариях, где используются таблицы сегментов, если вычислительный движок обновляется напрямую, данные, записанные в таблицу сегментов Hive с помощью Spark SQL, не будут использоваться в качестве таблицы сегментов для объединения сегментов последующими заданиями Hive, что приведет к увеличению времени выполнения задания и возможно влияет на SLA.

Чтобы решить эту проблему, мы позволяем Spark SQL поддерживать режим совместимости с Hive, чтобы гарантировать, что таблица Bucket, написанная Spark SQL, имеет тот же эффект, что и таблица Bucket, написанная Hive, и этот тип таблицы можно использовать в качестве таблицы. Таблица сегментов Hive и Spark SQL для соединения сегментов. Перемешивание не требуется. Таким образом гарантируется прозрачная миграция Hive в Spark SQL.

Первая проблема, которую необходимо решить, заключается в том, что Bucket of Hive обычно содержит только один файл, тогда как Bucket of Spark SQL может содержать несколько файлов. Решение состоит в динамическом добавлении Shuffle с ключом Bucket в качестве ключа и с той же степенью параллелизма, что и количество Bucket.




Вторая проблема, которую необходимо решить, заключается в том, что метод хеширования Hive 1.x отличается от метода Spark SQL 2.x (Murmur3Hash), поэтому идентификатор сегмента одних и тех же данных в Hive отличается от идентификатора в Spark SQL. , И не может присоединиться напрямую. В режиме совместимости с Hive мы решаем эту проблему, заставляя вышеупомянутый динамически добавленный Shuffle использовать тот же метод хеширования, что и Hive.

Улучшение два:Поддержка нескольких отношений Bucket Join

Spark SQL требует, чтобы только таблицы с одним и тем же сегментом могли (необходимое, но недостаточное условие) использоваться для объединения сегментов. Для двух таблиц с большими различиями в размере, таких как таблица измерений в сотни ГБ и таблица фактов в десятки терабайт (одна секция), количество сегментов часто отличается, и количество сегментов сильно различается. не может выполняться по умолчанию. Таким образом, мы поддерживаем объединение сегментов с несколькими отношениями двумя способами, то есть объединение сегментов поддерживается, когда количество сегментов двух таблиц сегментов представляет собой множественное отношение.

В первом методе количество задач равно количеству сегментов в маленькой таблице. Как показано на рисунке ниже, таблица A содержит 3 сегмента, а таблица B — 6 сегментов. В это время набор данных корзины 0 и корзины 3 таблицы B должен быть объединен с корзиной 0 таблицы A. В этом случае можно запустить 3 Задания. Среди них Задача 0 объединяет сегмент 0 таблицы A и сегмент 0 + сегмент 3 таблицы B. Здесь необходимо выполнить еще одну сортировку слиянием данных корзины 0 и корзины 3 таблицы B, чтобы обеспечить порядок коллекции.


Если количество сегментов в таблице A и таблице B не сильно отличается, вы можете использовать описанный выше метод. Если количество сегментов в таблице B в 10 раз превышает количество сегментов в сегменте A, хотя описанный выше метод не использует Shuffle, он может быть медленнее, чем SortMergeJoin, включая Shuffle, из-за недостаточного параллелизма. В это время можно использовать другой метод, то есть количество задач равно количеству сегментов в большой таблице, как показано на следующем рисунке.



По этой схеме три сегмента таблицы А можно читать несколько раз. На приведенном выше рисунке непосредственное выполнение Bucket Union между таблицей A и таблицей A (новый оператор, аналогичный Union, но сохраняющий функцию Bucket), результат эквивалентен 6 Bucket, что совпадает с количеством Bucket таблицы. B, чтобы можно было выполнить объединение сегментов.

Улучшение третье:Поддержка перехода на BucketJoin

Раньше в компании использовалось несколько таблиц, которые использовали Bucket, но после того, как мы внесли ряд улучшений в Bucket, большое количество пользователей захотело преобразовать таблицы в таблицы Bucket. После преобразования метаинформация таблицы показывает, что таблица является таблицей Bucket, а данные в историческом разделе не распределяются в соответствии с требованиями таблицы Bucket.При запросе исторических данных Bucket не может быть идентифицирован.

В то же время из-за быстрого увеличения объема данных средний размер корзины также быстро увеличивался. Это приведет к тому, что объем данных, обрабатываемых одной задачей, будет слишком большим, и эффект от использования Bucket может быть не таким хорошим, как при непосредственном соединении на основе Shuffle.

Чтобы решить вышеуказанные проблемы, мы внедрили таблицу Bucket, которая поддерживает переход на более раннюю версию. Основной принцип заключается в том, что каждый раз, когда изменяется информация о корзине (включая два вышеупомянутых случая — преобразование таблицы, не относящейся к корзине, в таблицу корзины и изменение количества корзин), записывается дата модификации. И при принятии решения о том, какой метод соединения использовать для таблицы Bucket, сначала проверьте, содержат ли запрошенные данные только разделы после даты. Если да, то она будет рассматриваться как таблица Bucket и поддерживается Bucket Join; в противном случае она будет рассматриваться как обычная таблица без Bucket.

Улучшение четвертое:поддержка суперсет

Для общей таблицы она может соединиться с другой таблицей в соответствии с полем «Пользователь» или с другой таблицей в соответствии с полями «Пользователь» и «Приложение» и соединиться с другими таблицами в соответствии с полями «Пользователь» и «Элемент». Для собственного объединения сегментов в Spark SQL требуется, чтобы набор ключей объединения точно совпадал с набором ключей сегмента таблицы для выполнения объединения сегментов. В этом случае наборы ключей разных соединений отличаются, поэтому объединение сегментов нельзя использовать одновременно. Это сильно ограничивает применимые сценарии Bucket Join.
Чтобы решить эту проблему, мы поддерживаем Bucket Join в сценарии надмножества. Объединение сегмента может быть выполнено до тех пор, пока набор ключей соединения содержит набор ключей сегмента.

Как показано на рисунке ниже, таблица X и таблица Y разделены на сегменты в соответствии с полем A. Запрос должен объединить таблицу X и таблицу Y, а набор ключей соединения — это A и B. В настоящее время, поскольку данные A равны, а идентификаторы сегментов в двух таблицах одинаковы, идентификаторы сегментов равных данных в A и B должны быть одинаковыми в двух таблицах, чтобы распределение данных соответствовало требованиям. Присоединяйтесь и не нуждается в перемешивании. В то же время Bucket Join также должен гарантировать, что две таблицы отсортированы в соответствии с набором ключей соединения, а именно A и B. В этом случае в разделе необходимо отсортировать только таблицы X и Y. Поскольку обе стороны уже отсортированы по полю A, в настоящее время сортировка по полям A и B относительно дешева.


материализованный столбец

Когда Spark SQL обрабатывает данные вложенных типов, возникают следующие проблемы:

  • читать много ненужных данных: Для столбцовых форматов хранения, таких как Parquet/ORC, могут быть прочитаны только обязательные поля, а другие поля могут быть пропущены напрямую, что значительно экономит ввод-вывод. Для полей вложенных типов данных, таких как поле people типа Map на следующем рисунке, часто необходимо только прочитать его подполя, такие как people.age. Однако необходимо прочитать все поля людей всего типа карты, а затем извлечь поле people.age. Это вводит много бессмысленных накладных расходов на ввод-вывод. В нашем сценарии много полей типа Map, и многие из них содержат от десятков до сотен ключей, а это значит, что IO усиливается в десятки-сотни раз.
  • Невозможно выполнить векторизованное чтение: И векторизованное чтение может значительно повысить производительность. Но на данный момент (26 октября 2019 г.) Spark не поддерживает векторизованное чтение с вложенными типами данных. Это сильно влияет на производительность запросов, содержащих вложенные типы данных.
  • Выталкивание фильтра не поддерживается: В настоящее время (26 октября 2019 г.) Spark не поддерживает раскрывающийся фильтр для полей вложенного типа.

  • Повторный расчет: поле JSON, которое существует как тип String в Spark SQL и не является строго вложенным типом данных. Однако на практике это также часто используется для сохранения нескольких полей, которые не являются фиксированными.При запросе целевое подполе извлекается через JSON Path, а извлечение поля больших строк JSON потребляет много ресурсов ЦП. Для таблицы горячих точек очень расточительно часто и многократно извлекать одно и то же подполе.


    Для этой проблемы студенты, занимающиеся хранилищами данных, также придумали несколько решений. Как показано на рисунке ниже, таблица с именем sub_table создается вне таблицы с именем base_table, а часто используемое подполе people.age задается как дополнительное поле типа Integer. Нисходящий поток больше не запрашивает people.age через base_table, а вместо этого использует поле age в sub_table. Таким образом, запрос к полю вложенного типа превращается в запрос к полю примитивного типа, и одновременно решаются вышеуказанные проблемы.


    В этой схеме есть явные недостатки:
    • Поддерживается дополнительная таблица, что приводит к большим дополнительным затратам памяти/вычислений.
    • Исторические данные вновь добавленных полей не могут быть запрошены в новой таблице (если вы хотите поддерживать запрос исторических данных, вам необходимо перезапустить историческое задание, что слишком дорого и неприемлемо).
    • Сопровождающий таблицы должен изменить работу по вставке данных после изменения структуры таблицы.
    • Стороне запроса ниже по течению необходимо изменить оператор запроса, и стоимость продвижения высока.
    • Высокие эксплуатационные расходы: если изменяются часто используемые подполя, необходимо удалить независимые подполя, которые больше не нужны, и добавить новые подполя в качестве независимых полей. Перед удалением необходимо убедиться, что никакие последующие предприятия не используют это поле. Необходимо уведомить о новых добавленных полях и передать их нижестоящим бизнес-сторонам для использования новых полей.
    Чтобы решить все вышеперечисленные проблемы, мы разработали и внедрили материализованные столбцы. Его принцип:
    • Добавьте поле примитивного типа, например поле age типа Integer, и укажите, что это материализованное поле people.age.
    • При вставке данных данные автоматически генерируются для материализованного поля, а материализованное отношение сохраняется в параметре раздела. Поэтому операция вставки данных полностью прозрачна, и сопровождающему таблицы не нужно модифицировать существующую операцию.
    • При запросе проверьте все разделы требуемого запроса, если они содержат материализованную информацию (сопоставление людей.возраст с возрастом), напрямую перепишите select people.age для выбора возраста, чтобы добиться полностью прозрачной оптимизации для нижестоящей стороны запроса. Также совместим с историческими данными.

    На следующей диаграмме показаны преимущества использования материализованных столбцов в основной таблице:



    материализованное представление

    В поле OLAP трудоемкие операции, такие как Group By и Aggregate/Join, часто выполняются с определенными фиксированными полями одной и той же таблицы, что приводит к большому количеству повторяющихся вычислений, пустой трате ресурсов и влиянию на производительность запросов, что не способствует для улучшения пользовательского опыта.

    Мы реализовали функции оптимизации на основе материализованных представлений:



    Как показано на изображении выше, история запросов показывает большое количество запросов, которые группируются по пользователю, а затем суммируются или подсчитываются по количеству. На этом этапе вы можете создать материализованное представление и выполнить gorup для пользователя и avg для num (среднее значение будет автоматически преобразовано в количество и сумму). Когда пользователь выполняет запрос select user, sum(num) к исходной таблице, Spark SQL автоматически переписывает запрос в запрос select user, sum_num к материализованному представлению.

    Другие оптимизации в движке Spark SQL

    На диаграмме ниже показаны другие части нашей работы по оптимизации Spark SQL:


    Повышение стабильности Spark Shuffle и оптимизация производительности

    Проблемы с Spark Shuffle

    Принцип Shuffle, многие ученики уже должны быть знакомы с ним. Ввиду временных отношений я не буду вводить здесь слишком много деталей, а лишь кратко представлю базовую модель.


    Как показано на рисунке выше, мы называем этап Shuffle upstream Mapper Stage, а Task в нем называется Mapper. Нисходящий этап Shuffle называется этапом редуктора, а задача в нем называется редуктором.

    Каждый Mapper делит свои данные не более чем на N частей, где N — количество редукторов. Каждый редьюсер должен обратиться не более чем к M (количество картографов) картографов, чтобы получить свою часть данных.

    В этой архитектуре есть две проблемы:

    • проблемы со стабильностью: данные Mapper's Shuffle Write хранятся на локальном диске Mapper, и существует только одна копия. Когда на машине произошел сбой диска или ввод-вывод переполнен, а ЦП переполнен, Редюсер не может прочитать данные, вызывая исключение FetchFailedException, которое, в свою очередь, приводит к повторной попытке этапа. Stage Retry увеличивает время выполнения задания и напрямую влияет на SLA. В то же время, чем больше время выполнения, тем больше вероятность того, что данные Shuffle не могут быть прочитаны, что, в свою очередь, вызовет больше повторных попыток выполнения. Такой цикл может привести к сбою выполнения больших заданий.

    • проблемы с производительностью: данные каждого картографа будут считываться большим количеством редукторов, а разные части считываются случайным образом. Предполагая, что выходные данные Mapper в случайном порядке составляют 512 МБ и имеется 100 000 редукторов, средние данные, считываемые каждым редуктором, составляют 512 МБ / 100 000 = 5,24 КБ. Кроме того, разные редукторы считывают данные параллельно. Для выходного файла Mapper выполняется множество случайных чтений. Производительность произвольного ввода-вывода жесткого диска намного ниже, чем последовательный ввод-вывод. Последнее явление заключается в том, что Reducer считывает данные Shuffle очень медленно, что отражается в метриках, свидетельствующих о том, что время блокировки чтения в случайном порядке в Reducer больше, даже на его долю приходится более половины всего времени выполнения Reducer, как показано на следующем рисунке.



    Улучшение стабильности в случайном порядке на основе HDFS

    Было замечено, что самым большим фактором, вызывающим сбой в случайном порядке, является не аппаратная проблема, такая как сбой диска, а полная загрузка ЦП и полная загрузка дискового ввода-вывода.


    Как показано на рисунке выше, загрузка ЦП машины близка к 100 %, из-за чего служба Spark External Shuffle в диспетчере узлов на стороне Mapper не может вовремя предоставить службу Shuffle.

    На рисунке ниже Data Node занимает 84% ресурсов ввода-вывода всей машины, а часть дискового ввода-вывода полностью заполнена, что делает чтение данных Shuffle очень медленным, что делает сторону Reducer неспособной читать данные внутри период тайм-аута, приводящий к FetchFailedException.



    Независимо от причины, суть проблемы в том, что данные случайной записи на стороне Mapper хранятся только локально.Как только проблема возникает на узле, все данные произвольной записи на узле не могут быть прочитаны Reducer. Общее решение этой проблемы — обеспечить доступность с помощью нескольких реплик.

    Первоначальное простое решение заключается в том, что окончательные файлы данных и индексные файлы на стороне Mapper не записываются на локальный диск, а напрямую записываются в HDFS. Редуктор больше не считывает данные Shuffle через внешнюю службу Shuffle на стороне Mapper, а напрямую получает данные из HDFS, как показано на следующем рисунке.



    После быстрой реализации этого решения мы провели несколько простых наборов тестов. Результаты показывают:

    • Когда не так много картографов и редукторов, производительность чтения и записи Shuffle ничем не отличается от исходного решения.
    • Когда есть много картографов и редукторов, чтение в случайном порядке становится очень медленным.




      Во время вышеописанного эксперимента HDFS выдала тревожное сообщение. Как показано на рисунке ниже, пиковое количество запросов в секунду прокси-сервера имен HDFS достигает 600 000. (Примечание: ByteDance разработала прокси-сервер имени узла и реализовала кеш на уровне прокси, поэтому чтение QPS может поддерживать этот порядок величины).



      Причина в том, что в общей сложности 10 000 редукторов, файлы данных и индексные файлы должны быть прочитаны из 10 000 картографов, и в общей сложности необходимо прочитать 10 000 * 1 000 * 2 = 200 миллионов HDFS.

      Если это всего лишь проблема с производительностью Name Node, существует несколько простых способов ее решения. Например, местоположение блока всех картографов сохраняется на стороне драйвера Spark, а затем драйвер транслирует информацию всем исполнителям.Каждый редуктор может напрямую получить расположение блока от исполнителя, а затем напрямую считывает данные из узла данных. без подключения к Name Node. Однако, учитывая многопоточную модель Data Node, это решение окажет большее влияние на Data Node.

      В итоге мы выбрали относительно простое и выполнимое решение, как показано на следующем рисунке.



      Выходные данные Mapper's Shuffle по-прежнему записываются на локальный диск по оригинальной схеме, а после записи загружаются в HDFS. Редуктор по-прежнему считывает данные Shuffle через External Shuffle Service на стороне Mapper по исходной схеме. Если это не удается, прочитайте из HDFS. Эта схема значительно снижает частоту доступа к HDFS.

      Программа существует уже почти год:

      • Охватывает более 57% данных Spark Shuffle.
      • Это повышает общую производительность заданий Spark на 14%.
      • Производительность крупномасштабных операций увеличена на 18%.
      • Почасовая производительность труда увеличилась на 12%.


      Это решение направлено на повышение стабильности Spark Shuffle и, таким образом, на повышение стабильности работы, но в конечном итоге не использует такие показатели, как дисперсия, для измерения улучшения стабильности. Причина в том, что нагрузка на кластер каждый день разная, и общая дисперсия велика. После улучшения стабильности Shuffle число повторных попыток значительно сокращается, а общее время выполнения задания сокращается, то есть повышается производительность. Наконец, повышение производительности сравнивается путем сравнения общего времени выполнения задания до и после использования схемы для измерения эффекта схемы.

      Практика и исследование оптимизации производительности в случайном порядке

      Как проанализировано выше, причина проблемы с производительностью Shuffle заключается в том, что Shuffle Write выполняется Mapper, а затем Reducer должен считывать данные со всех Mapper. Эту модель мы называем Mapper-centric Shuffle. Проблема с ним заключается в следующем:

      • На стороне Mapper будет M операций ввода-вывода с последовательной записью.
      • На стороне картографа будет M * N * 2 операций ввода-вывода со случайным чтением (это самое большое узкое место в производительности).
      • Внешняя служба Shuffle на стороне Mapper должна располагаться на том же компьютере, что и Mapper, который не может эффективно разделить хранилище и вычисления, а службу Shuffle нельзя расширять независимо.
      В ответ на вышеуказанные проблемы мы предлагаем схему Shuffle, основанную на Reducer, которая разделяет хранение и вычисления, как показано на следующем рисунке.




      Принцип этого решения заключается в том, что Mapper напрямую записывает данные, принадлежащие разным редюсерам, в разные сервисы Shuffle. На приведенном выше рисунке всего 2 картографа, 5 редукторов и 5 сервисов перетасовки. Все преобразователи удаленно передают данные, принадлежащие Reducer 0, в Shuffle Service 0, который последовательно записывает их на диск. Редюсеру 0 нужно только последовательно прочитать все данные из Shuffle Service 0, и ему не нужно извлекать данные из M Mappers. Преимущества этой схемы:
      • Превратите M * N * 2 случайных операций ввода-вывода в N последовательных операций ввода-вывода.
      • Службу Shuffle можно развернуть независимо от Mapper или Reducer, чтобы добиться независимого расширения и разделения хранилища и вычислений.
      • Служба Shuffle может напрямую хранить данные в хранилище с высокой доступностью, таком как HDFS, что одновременно решает проблему стабильности Shuffle.
      Вот и все, что я поделился, спасибо всем.

      Основные моменты контроля качества

      - Вопрос: Добавьте новый столбец в материализованный столбец, вам нужно изменить исторические данные?

      Ответ: Исторических данных слишком много, и изменять исторические данные нецелесообразно.

      - Вопрос: Что делать, если запрос пользователя содержит как новые данные, так и исторические данные?

      Ответ: Вообще говоря, данные, измененные пользователем, находятся в единице раздела. Поэтому мы сохраняем информацию о материализованном столбце в параметре раздела. Если запрос пользователя содержит как новый раздел, так и исторический раздел, мы выполним SQL Rewrite для материализованного столбца в новом разделе, но не в историческом разделе, а затем объединим новый и старый разделы, чтобы обеспечить правильность как можно больше данных. Воспользуйтесь преимуществами материализованных столбцов.

      - Вопрос: Здравствуйте, вы сделали много ценных оптимизаций для пользовательских сценариев. Подобно материализованным столбцам и материализованным представлениям, все они должны быть установлены в соответствии с шаблоном запроса пользователя. Вы сейчас анализируете эти запросы вручную или есть какой-то механизм для их автоматического анализа и оптимизации?

      Ответ: В настоящее время мы в основном используем некоторую аудиторскую информацию для облегчения ручного анализа. В то же время мы также делаем рекомендательные службы для материализованных столбцов и материализованных представлений и, наконец, добиваемся интеллектуального построения материализованных столбцов и материализованных представлений.

      - Вопрос. Может ли только что представленное решение для повышения стабильности Spark Shuffle на основе HDFS асинхронно загружать данные Shuffle в HDFS?

      Ответ: Идея хорошая, мы ее уже рассматривали, но по ряду соображений так и не сделали в итоге. Во-первых, объем выходных данных Shuffle для одного Mapper, как правило, невелик, и загрузка в HDFS занимает менее 2 секунд, и на это время можно не обращать внимания.Во-вторых, мы широко используем службу External Shuffle Service и Dynamic Allocation. Mapper выполняется, Executor может быть Recycling, если вы хотите загружать асинхронно, вы должны полагаться на другие компоненты, что увеличит сложность и снизит ROI.

      Еще больше чудесного обмена

      Обзор Шанхайского салона | Как ByteDance оптимизировала платформу HDFS с 10 000 узлов

      Обзор салона в Шанхае | Введение в принципы Apache Kylin и совместное использование новой архитектуры (Kylin On Parquet)

      Обзор салона в Шанхае | Применение Redis Cache в сценариях больших данных

      Салон технологий ByteDance

      Технологический салон ByteDance — это мероприятие по обмену техническими знаниями, инициированное Технологическим институтом ByteDance и совместно спонсируемое Технологическим институтом ByteDance и Технологическим сообществом Nuggets.

      Технологический салон ByteDance приглашает технических экспертов из ByteDance и интернет-компаний отрасли для обмена горячими техническими темами и передовым практическим опытом, охватывающим архитектуру, большие данные, интерфейс, тестирование, эксплуатацию и обслуживание, алгоритмы, системы и другие технические области. .

      ByteDance Technology Salon стремится предоставить открытую и бесплатную платформу обмена и обучения для талантов в технической области, чтобы помочь техническим специалистам учиться, расти и продолжать развиваться.

      Добро пожаловать в "Техническую команду ByteDance"