Можно сказать, что Spark sql является сущностью Spark. Я чувствую, что общая сложность более чем в 5 раз выше, чем у Spark Streaming. Теперь Spark официально продвигает структурированный Streaming, а Spark Streaming активно не поддерживается. Мы строим задачи вычислений больших данных на базе spark., фокус тоже надо сместить на DataSet.Мигрируется исходный код написанный на основе RDD, польза очень большая, особенно по производительности, есть качественное улучшение, различные встроенные оптимизации производительности в spark sql есть лучше, чем голое письмоДля RDD более надежно соблюдать различные так называемые передовые практики, особенно для новичков.Например, некоторые передовые практики относятся сначала к операциям фильтрации, а затем к операциям сопоставления.Этот тип искры sql автоматически выталкивает предикаты , например, старайтесь избегать использования Для операции в случайном порядке, если вы включите соответствующую конфигурацию в spark sql, широковещательное соединение будет автоматически использоваться для трансляции небольших таблиц, случайное соединение будет преобразовано в соединение с картой и т. д., что действительно может сэкономить нам много беспокойства.
Сложность кода spark sql вызвана существенной сложностью проблемы.Большая часть логики платформы Catalyst в spark sql заключается в выполнении различных подбрасываний в структуре данных древовидного типа.Это очень элегантно реализовать на основе scala.Частично Функция и мощное регулярное сопоставление case делают весь код понятным.Эта статья кратко описывает некоторые механизмы и концепции в spark sql.
SparkSession — это точка входа для написания кода приложения spark. Запуск оболочки spark предоставит вам создание SparkSession. Этот объект является отправной точкой всего приложения spark. Давайте рассмотрим некоторые важные переменные и методы sparkSession. :

Упомянутый выше sessionState очень важен. Он поддерживает все данные о состоянии, используемые текущим сеансом. Есть следующие вещи, которые необходимо поддерживать:
Spark sql внутренне использует dataFrame и Dataset для представления набора данных, а затем вы можете применять к этому набору данных различные статистические функции и операторы.Некоторые люди могут не знать разницу между DataFrame и набором данных.На самом деле DataFrame является типом Row DataSet ,
type DataFrame = Dataset[Row]
Упомянутый здесь тип Row находится на уровне API, предоставляемом Spark sql. Однако DataSet не требует, чтобы тип ввода был Row, это также могут быть строго типизированные данные. Тип данных, обрабатываемый в нижней части DataSet, — это Катализатор внутреннего типа InternalRow или UnsafeRow. За ним стоит Encoder для неявного преобразования, который преобразует ваши входные данные во внутренний InternalRow, поэтому вывод DataFrame соответствует RowEncoder.
Выполнение операции преобразования в наборе данных приведет к созданию древовидной структуры с элементом типа LogicalPlan.Давайте возьмем пример.Если у меня есть таблица учащихся и таблица оценок, требование состоит в том, чтобы подсчитать общие баллы всех учащихся старше 11 лет. Старый.

Это queryExecution является механизмом выполнения всего плана выполнения, который содержит различные промежуточные переменные процесса в процессе выполнения.Весь процесс выполнения выглядит следующим образом

Тогда оператор sql в нашем примере выше станет абстрактным синтаксическим деревом после анализа синтаксическим анализатором, а соответствующий проанализированный логический план AST будет

Рисунок немного с изображением для представления

Мы видим, что условие фильтра становится узлом Filter, который имеет тип UnaryNode, то есть есть только один дочерний элемент, а данные в двух таблицах становятся узлом UnresolvedRelation, который имеет тип LeafNode. является узлом BinaryNode, имеет двух дочерних элементов.
Упомянутые выше узлы относятся к типу LogicalPlan, который можно понимать как операторы, выполняющие различные операции.Spark SQL определяет различные операторы для различных операций.
Абстрактное синтаксическое дерево, состоящее из этих операторов, является основой всей оптимизации Catatyst.Оптимизатор Catatyst будет выполнять различные подбрасывания в этом дереве и перемещать узлы дерева для оптимизации.
Теперь у меня есть абстрактное синтаксическое дерево через Parser, но я не знаю, что такое счет и сумма, поэтому мне нужен анализатор, чтобы найти его. Анализатор преобразует все неразрешенные вещи в AST в разрешенное состояние. правила разрешения, все из которых хорошо понятны, например, ResolverRelations предназначен для анализа основных типов таблиц (столбцов) и другой информации, ResolveFuncions — это основная информация анализируемой функции, такая как функция суммы в примере, ResolveReferences могут быть не совсем понятны, поля, которые мы используем в операторах sql, таких как Имя в Select name соответствует переменной, которая существует как переменная (тип Attribute) при анализе таблицы, затем та же переменная, соответствующая узлу Project, соответствующему Select становится ссылкой, и у них один и тот же идентификатор, поэтому после обработки ResolveReferences он становится типом AttributeReference, гарантируя, что им присваивается одно и то же значение, когда данные фактически загружаются в конце, точно так же, как мы определяем переменную, когда мы при написании кода эти правила действуют на узел многократно. , указанный узел дерева имеет тенденцию быть стабильным. Конечно, если количество оптимизаций слишком много, производительность будет потеряна, поэтому некоторые правила работают один раз, а некоторые правила работают с фиксированной точкой, что есть выбор. Что ж, без лишних слов, давайте проведем небольшой эксперимент.
Мы используем ResolverRelations для синтаксического анализа нашего AST. После синтаксического анализа мы видим, что исходное UnresolvedRelation стало LocalRelation, которое представляет собой таблицу в локальной памяти. Эта таблица регистрируется в каталоге, когда мы используем createOrReplaceTempView. Эта операция relove не что иное, как It заключается в том, чтобы найти таблицу в каталоге, найти схему таблицы и проанализировать соответствующие поля, преобразовать каждое StructField, определенное внешним пользователем, в AttibuteReference и пометить его идентификатором.

Давайте снова воспользуемся ResolveReferences, и вы обнаружите, что одни и те же поля в верхнем узле стали ссылками с тем же идентификатором, и все их типы — AttibuteReference. После окончательного применения всех правил весь AST становится

Ключевым моментом является следующее.Нам необходимо оптимизировать логику.Давайте рассмотрим оптимизацию логики:

В sparksql есть много видов логической оптимизации.Большая часть логики среды Catalyst в sparksql заключается в выполнении различных подбрасываний в структуре данных древовидного типа.Это очень элегантно реализовать на основе scala.Частичная функция scala соответствует мощной case регулярность. , чтобы весь код выглядел понятно, не будем нести чушь, а проведем небольшой эксперимент.
Вы видели это, заменил мой (100 + 10) на 110.

Используйте PushPredicateThroughJoin, чтобы применить фильтр, который отфильтровывает одну пару таблиц перед соединением, что загрузит много данных и оптимизирует производительность Давайте взглянем на окончательный вид.

По крайней мере, я использовал методы оптимизации логики ColumnPruning, PushPredicateThroughJoin, ConstantFolding, RemoveRedundantAliases, и теперь мое маленькое дерево выглядит так:
После того, как логическая оптимизация выполнена, это всего лишь абстрактный логический уровень, и его необходимо сначала преобразовать в физический план выполнения, чтобы логически выполнимый план выполнения можно было изменить в план, который Spark может фактически выполнить.

spark sql преобразует логические узлы в соответствующие физические узлы, такие как оператор Join.Spark формулирует разные алгоритмические стратегии для этого оператора в соответствии с разными сценариями, включая BroadcastHashJoin, ShuffleHashJoin, SortMergeJoin и т. д. Конечно, в нем много моментов оптимизации. При конвертации он будет разумно выбран в соответствии с некоторыми статистическими данными, что включает в себя оптимизацию на основе затрат, что также является большой частью.Вы можете открыть статью позже.В нашем примере, поскольку объем данных составляет менее 10 М, это будет автоматически переключаться на BroadcastHashJoin, зоркие учащиеся могут видеть, что узлов стало больше. Объясним, что узел BroadcastExchange наследует класс Exchage и используется для обмена данными между узлами. Здесь BroadcastExchange предназначен для передачи данных из LocalTableScan каждому узлу-исполнителю, используемому для соединения на стороне карты. Заключительная операция агрегирования делится на два этапа. Первый этап — выполнение параллельного агрегирования, а затем — окончательное агрегирование агрегированных результатов. добавление посередине.Обмен хэш-разделами, это делается для того, чтобы один и тот же ключ перетасовывался в один и тот же раздел.Когда распределение дочерних выходных данных текущего физического плана не соответствует требованиям, необходимо выполнить перетасовку.Это это узел данных обмена, вставленный на заключительном этапе SureRequirement. В этой области есть поговорка, что тот, кто выигрывает соединение, выигрывает мир. Мы сосредоточимся на некоторых компромиссах, которые искра sql делает в операции соединения.
Операция соединения может в основном разделить две таблицы соединения на большую и маленькую таблицы. Большая таблица используется в качестве таблицы потокового обхода, а маленькая таблица используется в качестве таблицы поиска. Затем для каждой записи в большой таблице , поиск осуществляется по Ключу, в таблице брать записи с таким же ключом.
spark поддерживает все типы соединений:

Операция соединения в Spark SQL выбирает различные стратегии соединения в соответствии с различными условиями, которые делятся на BroadcastHashJoin, SortMergeJoin и ShuffleHashJoin.
-
BroadcastHashJoin: если spark определяет, что объем памяти таблицы меньше порогового значения широковещательной рассылки (параметр spark.sql.autoBroadcastJoinThreshold используется в Spark для управления пороговым значением для выбора BroadcastHashJoin, значение по умолчанию равно 10 МБ), он должен транслировать небольшой таблицу в Executor, а затем поместите небольшую таблицу в хеш-таблицу, которая используется в качестве таблицы поиска, и операция соединения может быть выполнена с помощью операции отображения, избегая операции перемешивания с относительно большим кодом производительности.Однако следует отметить что BroadcastHashJoin не поддерживает полное внешнее соединение.Для правого внешнего соединения транслируется левая таблица, для левого внешнего соединения, левое полусоединение, левое антисоединение, транслируется правая таблица, для внутреннего соединения передается меньшая таблица.
-
SortMergeJoin: если данные в двух таблицах большие, лучше использовать SortMergeJoin. SortMergeJoin использует операцию перемешивания для перемешивания записей одного и того же ключа в один раздел, после чего две таблицы уже отсортированы. Также допустимо.
-
ShuffleHashJoin: Это означает, что сортировка не выполняется во время процесса перемешивания, а таблица поиска помещается в хеш-таблицу для поиска и объединения Когда будет выполняться ShuffleHashJoin? Размер таблицы поиска должен превышать значение spark.sql.autoBroadcastJoinThreshold, в противном случае используется BroadcastHashJoin, а средний размер каждой секции не может превышать spark.sql.autoBroadcastJoinThreshold. Это гарантирует, что таблицу поиска можно разместить в памяти без OOM. Другое условие состоит в том, что большая таблица более чем в 3 раза больше, чем маленькая таблица, чтобы можно было использовать преимущества этого типа соединения.
Как упоминалось выше, узлы выше AST были преобразованы в физические узлы.Эти физические узлы в конечном итоге рекурсивно вызывают метод выполнения из головного узла, который вызовет операцию преобразования на RDD, сгенерированную дочерним элементом, который сгенерирует цепочку RDD. chain, за которой следует искра. Внутри вышеприведенного DStream рекурсивно вызовите это. Окончательное выполненное изображение выглядит следующим образом:

Можно видеть, что окончательное выполнение разделено на два этапа: небольшая таблица broadcastExechage переносится в большую таблицу для выполнения BroadcastHashJoin без операции перемешивания эволюции, а затем на последнем этапе агрегации функция суммирования HashAggregate сначала выполняется в сегмент карты, а затем операция Exchage перемешивает данные того же ключа в тот же раздел в соответствии с именем, а затем выполняет окончательную операцию суммирования HashAggregate.Странно иметь здесь WholeStageCodegen.Что это делает, потому что когда мы выполняем фильтр, проецируем эти операторы, эти оператор содержит много выражений, таких как SELECT сумма (v), имя, где сумма и v оба являются выражениями, где v является выражением переменной атрибута, а выражение также является древовидной структурой данных , sum(v) — это узел суммы и древовидная структура, состоящая из дочерних узлов v суммы. Эти выражения могут быть оценены и сгенерированы кодом. Самая основная функция выражений — оценить и вычислить входную строку. Выражение необходимо реализовать def eval(input: InternalRow = null): любая функция для реализации своей функциональности.
Выражение предназначено для обработки Row, и выходные данные могут быть любого типа, но тип вывода Plan для Project и Filter — это вывод по умолчанию: Seq[Attribute], который представляет собой набор переменных, таких как Filter (возраст >= в наш пример) 11) В этом плане age>11 является выражением Это > выражение зависит от двух дочерних узлов.Литеральное константное выражение оценивается как 11, а другое является выражением переменной атрибута age, которое изменяется на этапе анализа. Для типа AttributeReference, но это Unevaluable, чтобы получить соответствующее значение атрибута во входной строке, необходимо привязать индекс этой переменной к строке данных в соответствии со ассоциацией схемы для создания BoundReference, и тогда выражение BoundReference eval.Значение в Row можно получить по индексу. Окончательный тип вывода выражения age>11 имеет тип boolean, но тип вывода плана фильтра имеет тип Seq[Attribute].
Вполне возможно, что данные передаются в планах один за другим, а затем выражения в каждом плане будут обрабатывать данные, что эквивалентно обработке небольших вызовов функций, и возникает много накладных расходов на вызовы функций, тогда можем ли мы встроить эти маленькие функции в большую функцию?WholeStageCodegen делает это.

Вы можете видеть, что перед каждым узлом окончательного плана выполнения есть знак *, указывающий, что генерация всего кода включена.В нашем примере Filter, Project, BroadcastHashJoin, Project и HashAggregate включают генерацию всего кода, cascade Для двух больших функций, если вам интересно, вы можете использовать a.queryExecution.debug.codegen, чтобы увидеть, как выглядит сгенерированный код. Однако оператор Exchange не реализует полную генерацию кода, поскольку ему необходимо отправлять данные по сети.
На этом я заканчиваю сегодняшнюю беседу, на самом деле в spark sql есть много интересного, но из-за существенной сложности задачи требуется высокая степень абстракции, чтобы все это выпрямить, что затрудняет код читатели, чтобы понять, но если вы действительно посмотрите на это, вы получите много. Если у вас есть какие-либо идеи по этой статье, пожалуйста, оставьте сообщение в конце статьи, чтобы выразить свои мысли.
коровы говорят
Рубрика «Niu Ren Shuo» посвящена открытию мыслей технических людей, включая техническую практику, технические галантерейные товары, технические идеи, опыт роста и все, что заслуживает открытия. Мы надеемся собрать лучших техников, чтобы задействовать уникальные, острые и современные голоса.
Электронная почта для отправки: marketing@qiniu.com