Как работает искра

Архитектура исходный код Spark

адрес блога:Joey771. Талант/2018/10/25/…

Принцип работы искры — вопрос, который часто задают в процессе собеседования на должности разработчиков больших данных.Когда мне впервые задали этот вопрос, я немного растерялся.Как мне ответить на такой большой вопрос? Собираетесь ли вы описать архитектурную композицию искры или рассказать о лежащих в ее основе деталях вызова? Позже я нашел некоторую информацию, и после прочтения некоторых книг у меня есть некоторое понимание этого вопроса.На самом деле человек, который задал этот вопрос, может хотеть, чтобы мы ответили на детали процесса запуска Spark.Опишите, какие шаги были предприняты во время выполнение. Если вы можете добавить некоторые пояснения к деталям базового исходного кода Spark в процессе описания, это произведет хорошее впечатление на спрашивающего, который будет думать, что вы не только продолжаете использовать Spark, но и имеете некоторое представление о принципах базовый исходный код.

Кратко опишите, как работает Spark

После того, как пользователь отправляет задание с помощью spark-submit, сначала запускается процесс драйвера, который обращается к диспетчеру кластера (автономному, YARN, Mesos) за ресурсами, необходимыми для этой операции (здесь ресурсы включают ядро ​​и памяти, которую можно найти в наборе в параметрах spark-submit), менеджер кластера запустит экзекьютор на каждой ноде по нужным нам параметрам. После подачи заявки на соответствующий ресурс процесс драйвера начнет планировать и выполнять написанный нами код задания. Задание будет отправлено в DAGScheduler, и DAGScheduler разделит задание на несколько этапов в соответствии с зависимостями RDD в задании.Принцип разделения основан на наличии широкой зависимости, и каждый этап будет содержать столько последовательных узких, сколько возможно. полагаться. Каждый этап содержит часть задания, и TaskSet будет сгенерирован и отправлен базовому планировщику TaskScheduler, а TaskScheduler отправит TaskSet в кластер для выполнения исполнителем. Разделение задач основано на разделе данных, а раздел делится на задачу. Этот цикл повторяется до тех пор, пока не будет выполнена вся логика кода написанной программы-драйвера и не будут вычислены все данные.

Простой процесс работы выглядит следующим образом:

Рис. 1. Процесс запуска искры

SparkContext

Весь рабочий процесс программы Spark вращается вокруг программы драйвера искры. Наиболее важной частью программы драйвера искры является SparkContext. Инициализация SparkContext заключается в подготовке рабочей среды приложения Spark. Менеджер кластера подает заявку на ресурсы , назначает и контролирует задачи и т. д.

Архитектура между драйвером и воркером показана на рисунке ниже: драйвер отвечает за распределение задач воркеру, а воркер возвращает обработанный результат драйверу.

Рис. 2. Архитектура драйвера

Основная функция SparkContext заключается в инициализации основных компонентов, необходимых для запуска приложения Spark, включая планировщик высокого уровня DAGScheduler, планировщик нижнего уровня TaskScheduler и коммуникационный терминал планировщика SchedulerBackend. зарегистрировать программу на Мастере. СДР в приложениях Spark создаются SparkContext, например, с помощью таких API, как SparkContext.textFile() и SparkContext.parallel(). Заявка на вычислительные ресурсы в Cluster Manager, упомянутая в запущенном процессе, также запрашивается объектами, сгенерированными SparkContext. Далее давайте узнаем о SparkContext с точки зрения исходного кода.Что касается различных компонентов, созданных SparkContext, в классе SparkContext есть такой фрагмент кода для создания этих компонентов:

DAGScheduler

DAGScheduler – это высокоуровневый планировщик, который может разделить каждый RDD группы DAG на разные этапы и построить отношения "родитель-потомок" между этими этапами. Наконец, каждый этап делится на несколько задач в соответствии с разделом и отправляется на нижний уровень в форма TaskSet Scheduler TaskScheduler. Разделение стадий основано на том, существует ли широкая зависимость в зависимости RDD. Широкая зависимость относится к разделу в родительском RDD, от которого зависят несколько разделов дочернего RDD. Короче говоря, исходящая степень раздела родительского СДР больше 1. Точно так же узкая зависимость означает, что от раздела родительского СДР зависит только раздел дочернего СДР, то есть исходящая степень раздела родительского СДР равна 1. Каждый этап будет содержать как можно больше узких зависимостей, и каждый узко зависимый оператор будет формировать для запуска целый конвейер, что может уменьшить чтение и запись RDD между каждым оператором, в отличие от MapReduce, каждое задание содержит только одну задачу Map A и Задача Reduce, следующая задача Map должна дождаться завершения предыдущей задачи Reduce, прежде чем она сможет быть выполнена.Во время выполнения формы конвейера нет перетасовки, и очевидно, что эффективнее выполнять их вместе. Между этапом и этапом будет перетасовка, здесь перетасовка также часто исследуется, и в других статьях будет подробно объяснено. DAGScheduler также должен записывать, какие RDD хранятся на диске и другие материализованные действия, и в то же время ищет оптимальное планирование задач, таких как локальность данных в Stage. DAGScheduler также должен отслеживать возможные сбои, вызванные перетасовкой выходных данных между узлами.Если он обнаружит, что этап терпит неудачу, он может повторно отправить этап.

Специфический процесс вызова DAGScheduler

Когда задание отправлено, DAGScheduler начнет свою работу. Отправка задания в spark запускается действием RDD. Когда происходит действие, метод действия в RDD вызывает метод runJob своего SparkContext после нескольких перегрузок После этого будет вызван метод runJob DAGScheduler. В классе DAGScheduler runJob является функцией входа для отправки задания.Метод submitJob вызывается, чтобы вернуть JobWaiter для ожидания результата планирования задания, а затем распечатать соответствующую информацию журнала результатов в зависимости от успеха или неудачи задания. .

Метод submitJob получит jobId и проверит, существуют ли разделы, и отправит объект JobSubmitted класса case в eventProcessLoop.Объект JobSubmitted инкапсулирует jobId, последний RDD, функцию, которая работает с RDD, и какие разделы необходимо вычислить. . В eventProcessLoop есть поток eventThread, который является потоком демона, который используется для получения объекта JobSubmitted, отправленного в поток через метод post, помещения его в одну из блокирующих очередей eventQueue для обработки, а событие берется из eventQueue будет вызывать метод onReceive (метод реализуется с помощью eventProcessLoop), а метод doOnReceive будет вызываться в методе onReceive для выполнения различной обработки в соответствии с разными типами событий. При чтении исходного кода здесь может возникнуть вопрос, а почему бы не вызвать doOnReceive напрямую, когда DAGScheduler вызывает submitJob для обработки задания, зачем нужно запускать новый поток для обработки, и отправлять сообщения самому себе для обработки (eventProcessLoop — это внутренний DAGScheduler объект). На самом деле это асинхронный метод связи потока, просто отправьте сообщение в форме связи потока (метод связи потока здесь фактически использует блокирующую очередь) в другой поток, метод submitJob может немедленно вернуться без блокировки в процессе обработки событие. Здесь мы не должны просто думать, что DAGScheduler отправляет сообщения самому себе. На самом деле есть другие компоненты, которые будут отправлять сообщения DAGScheduler. Этот метод обработки сообщений с использованием потока демона может объединить два и обработать два. Логика такова. последовательный, и масштабируемость очень хорошая.Используя циркулятор сообщений, все сообщения могут быть обработаны единообразно, гарантируя, что бизнес-логика обработки непротиворечива. EventProcessLoop здесь может обрабатывать различные сообщения, а не только JobSubmitted.Вы можете увидеть следующие события в исходном коде:

  1. JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)
  2. MapStageSubmitted(jobId, dependency, callSite, listener, properties)
  3. StageCancelled(stageId, reason)
  4. JobCancelled(jobId, reason)
  5. JobGroupCancelled(groupId)
  6. AllJobsCancelled
  7. ExecutorAdded(execId, host)
  8. ExecutorLost(execId, reason)
  9. WorkerRemoved(workerId, host, message)
  10. BeginEvent(task, taskInfo)
  11. SpeculativeTaskSubmitted(task)
  12. GettingResultEvent(taskInfo)
  13. completion
  14. TaskSetFailed(taskSet, reason, exception)
  15. ResubmitFailedStages

JobSubmitted вызовет метод handleJobSubmitted в DAGScheduler.Этот метод является началом построения этапа. Он создаст последний этап этапа — ResultStage, а остальные этапы — ShuffleMapStage. Создание ResultStage выполняется с помощью функции createResultStage, в которой метод getOrCreateParentStage получит или создаст список родительских этапов данного RDD. Этот метод — это метод, который мы упоминали ранее, чтобы специально разделить этапы. Исходный код этого метода очень просто. следующим образом:

Функция getShuffleDependencies вызывается для возврата зависимостей прямого перемешивания в родительском узле данного RDD.Исходный код выглядит следующим образом:

Здесь три основных структуры данных, два HashSet — родительский и посещенный, и Stack — waitingForVisit.В коде входящий RDD сначала добавляется в waitForVisit для доступа к стеку.Здесь мы также используем стеки.Видно, что это - это стратегия поиска в глубину. Посещенный используется для записи посещенных узлов, чтобы гарантировать, что они не будут повторно посещены. Затем различаются зависимости посещенных RDD. Если это shuffleDep (т. е. широкие зависимости), зависимости будут быть добавлены.Родители, если это зависимость (узкая зависимость), добавить зависимый RDD в ожиданиеForVisit для обхода поиска в глубину.Здесь родители в конечном итоге будут возвращены.Результатом является то, что все записи в родителях являются shuffleDep, то есть , зависимости между двумя этапами. Затем вызовите getOrCreateShuffleMapStage, чтобы сгенерировать ShuffleMapStage в соответствии с полученной зависимостью от перемешивания, а сгенерированный ShuffleMapStage будет сохранен в HashMap shuffleIdToMapStage. Если созданный ShuffleMapStage уже существует в структуре данных, он будет возвращен напрямую. Если он не существует, createShuffleMapStage будет вызываться для его создания.GetMissingAncestorShuffleDependencies будет вызываться для поиска зависимостей перемешивания предков, и сначала будут созданы зависимые этапы. После создания этапа handleJobSubmitted вызовет submitStage для отправки finalStage, а submitStage сначала рекурсивно отправит родительский этап.Родительский этап получается через getMissingParentStages и сортируется в соответствии с идентификатором этапа, а этап с меньшим идентификатором будет подан первым.

конкретные примеры

На следующем рисунке показана диаграмма преобразования 5 RDD.Предполагая, что RDD E, наконец, начинает действие (например, сбор), тогда в соответствии с взаимосвязью на рисунке я подробно объясню процесс генерации DAGScheduler для Stage.

  • Метод RDD.collect запустит метод SparkContext.runJob, затем вызовет метод DAGScheduler.runJob, а затем вызовет метод submitJob для инкапсуляции события в событие JobSubmitted для обработки, вызовет handleJobSubmitted и вызов createResultStage в этом методе.
  • createResultStage создаст ResultStage на основе jobId (rdd в ResultStage — это RDD, с которого началось действие, то есть finalRDD). Вызовите getOrCreateResultStages, чтобы создать все родительские этапы, вернуть parent: List[Stage] в качестве родительского этапа, передать родительские этапы в ResultStage и создать экземпляр для создания ResultStage. На схематической диаграмме RDD E вызывает createResultStage, получает этапы Stage1 и Stage2 с помощью getOrCreateResultStages, а затем создает собственный этап Stage3.
  • Метод getShuffleDependencies в методе getOrCreateParentStages получит все наборы прямых зависимостей RDD E, RDD B и RDD D, а затем вызовет getOrCreateShuffleMapStage для этих двух RDD соответственно. Эти два этапа ShuffleMapStage и, наконец, использовать эти два этапа в качестве родительских этапов для этапа 3 для создания этапа 3.
  • После этого будет вызвана submitStage в handleJobSubmitted для отправки этапа.При отправке он будет возвращен с конца на передний план.Предыдущий этап будет отправлен первым, а этап с меньшим id будет отправлен первым в соответствии с идентификатор этапа. Последний этап зависит от предыдущего этапа. , последующий этап будет рассчитан только после расчета предыдущего этапа.

SchedulerBackend и TaskScheduler

Упомянутые ранее TaskScheduler и SchedulerBackend — это всего лишь трейты. Конкретным классом реализации TaskScheduler является TaskSchedulerImpl, а подклассы SchedulerBackend включают:

  1. LocalSchedulerBackend
  2. StandaloneSchedulerBackend
  3. CoarseGrainedSchedulerBackend
  4. MesosCoarseGrainedSchedulerBackend
  5. YarnSchedulerBackend

Разные SchedulerBackends соответствуют разным режимам работы Spark. Различные основные параметры, переданные в createTaskScheduler, будут выводить разные SchedulerBackends, где spark фактически генерирует разные SchedulerBackends путем регулярного сопоставления на основе строк, переданных мастером. Здесь используется режим стратегии в режиме разработки, и разные подклассы SchedulerBackend создаются в соответствии с различными потребностями.Если используется локальный режим, будет создан LocalSchedulerBackend, а в автономном режиме кластера будет создан StandaloneSchedulerBackend. В StandaloneSchedulerBackend есть важный метод start, который сначала вызывает метод start своего родительского класса, а затем определяет команду объекта Command, в которой основным классом объекта является org.apache.spark.executor.CoarseGrainedExecutorBackend, этот класс очень важен , мы При запуске искрового приложения вы увидите процесс JVM с именем CoarseGrainedExecutorBackend на рабочем узле. Процесс здесь можно понимать как процесс исполнителя. Мастер отправляет рабочему указание запустить все процессы исполнителя. Класс входа, в котором загружается метод Main, имеет вид This CoarseGrainedExecutorBackend, который запускает исполнитель в CoarseGrainedExecutorBackend.Исполнитель выполняет задачи одновременно, создавая пул потоков, а затем вызывает свой метод запуска. В методе запуска также создается очень важный объект StandaloneAppClient, и будет вызываться его метод запуска, в этом методе будет создан объект ClientEndpoint, который является RpcEndPoint и будет зарегистрирован в Мастере.

SchedulerBackend на самом деле управляется TaskScheduler.Метод инициализации TaskScheduler вызывается в методе createTaskScheduler, а SchedulerBackend вводится как параметр для связывания отношений между ними.

Метод initialize также создаст пул для первоначального определения режима распределения ресурсов. Режим планирования. По умолчанию используется режим «первым пришел – первым вышел» (FIFO), а поддерживаемым режимом является справедливый (FAIR) режим. Режим FIFO означает, что тот, кто отправит задачу первым, выполнит ее первым, а более поздние задачи должны дождаться выполнения предыдущей задачи. Режим FAIR поддерживает группировку задач в пуле планирования. Различные пулы планирования имеют разный вес, и задачи могут выполняться в соответствии с их весами в порядке старшинства.

Основная задача TaskScheduler состоит в том, чтобы передать TaskSet вычислительной операции кластера и сообщить о результате. Мы знаем, что DAGScheduler, упомянутый ранее, разделит задачи на ряд этапов, и каждый этап будет инкапсулировать набор задач, и эти наборы задач будут переданы базовому планировщику TaskScheduler для последовательного выполнения. TaskSet, полученный TaskScheduler, передается из метода submitMissingTasks в DAGScheduler. Вызывается конкретная функция TaskScheduler.submitTasks. TaskScheduler инициализирует TaskSetManager для каждого TaskSet для управления его жизненным циклом. Когда TaskScheduler получает вычислительные ресурсы Executor на Worker узел В это время TaskSetManager отправит конкретную задачу исполнителю для выполнения. Если во время выполнения задачи произойдет сбой, TaskSetManager также будет отвечать за ее обработку, он уведомит DAGScheduler о завершении текущей задачи и добавит неудавшуюся задачу в очередь для повторного выполнения для последующего пересчета. число повторных попыток по умолчанию — 4. Далее, метод обработки этой логики — TaskSetManager.handleFailedTask. После выполнения задачи TaskSetManager вернет результат в DAGScheduler для последующей обработки.

Конкретным классом реализации TaskScheduler является TaskSchedulerImpl, один из которых очень важен для отправки задач.Исходный код этого метода выглядит следующим образом:

В этом методе будет создан TaskSetManager, а идентификатор этапа и TaskSetManager будут управляться соответственно через HashMap. После этого будет вызван метод addTaskSetManager ScheduleableBuilder для добавления в него созданного TaskSetManager.SchedulerableBuilder определит порядок планирования TaskSetManager и определит, в каком ExecutorBackend запускается каждая задача. В конце метода submitTasks будет вызываться backend.receiveOffer. Конкретным типом бэкенда обычно является CoarseGrainedSchedulerBackend, который является подклассом SchedulerBackend. Метод driverEndPoint.send вызывается в методе vanillaOffers. Этот метод будет отправлять ReceiveOffers. сообщение в DriverEndPoint, которое активирует базовое расписание ресурсов. Метод получения driverEndPoint соответствует сообщению ReceiveOffers и вызывает метод makeOffers, который выглядит следующим образом:

Этот метод получит активного исполнителя и сгенерирует все предложения работы, которые могут быть использованы для вычислений в соответствии с активным исполнителем.При создании предложений работы будет передана такая информация, как идентификатор исполнителя, хост и доступное ядро.Информация о доступной памяти был получен в другом месте. Метод makeOffers также вызывает метод resourceOffers планировщика.Этот метод предназначен для предоставления ресурсов для workOffers, используемых для расчета, и равномерного распределения задач для каждого workOffer (Исполнителя) для расчета. У меня как-то тут возник вопрос, то есть отправляются ли задачи каждому Исполнителю для расчета последовательно, то есть если есть 100 Задач и 5 Исполнителей, всегда ли распределение задач в соответствии с Исполнителем №0 и №1? 1 Executor? , Executor №2... распределяются в таком порядке, то есть Executor №0 всегда получает задачу с id TaskId %5, а Executor №1 всегда получает задачу с id TaskId % 5 + 1. Но чтение исходного кода обнаружило, что одна из ссылок предназначена для выполнения операции перемешивания, которая вызывает Random.shuffle(offers), то есть перемешивает порядок workOffers (исполнителей) в Seq, чтобы не всегда помещать задачи в одну и ту же группу worker nodes, мы можем ясно видеть в последующем методе resourceOfferSingleTaskSet, что конкретный процесс распределения задач фактически осуществляется в соответствии с порядком workerOffers в Seq.В исходном коде для чтения доступен простой for traversal workerOffers. основные ресурсы и распределить доступные ресурсы в TaskSetManager для расчета соответствующего TaskSet:

Исходный код просматривается по индексу shuffledOffers, так как операция shuffle выполнялась ранее, порядок workerOffers каждый раз нарушается, поэтому при назначении задач здесь не всегда будут присвоены соответствующие номера id в workerOffers в определенном порядке. Вместо этого он будет назначать задачи workerOffer в случайном порядке и вне очереди, но назначение задач также будет учитывать локальность задач. При назначении задач соответствующие ресурсы Executor будут вводиться в resourceOffer метод TaskSetManager, который вернет требуемое TaskDescription вычисляемой Задачи, очень важным основанием здесь является попытка выделить Исполнителю задачи с высокой локальностью вычислений. Локальный приоритет данных от высокого к низкому: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY, поэтому для некоторых задач данные всегда находятся на узле, поэтому задача всегда будет назначена Исполнителю на узле. расчет, эффект нарушения распределения workerOffers ранее может показаться не особенно очевидным, и вы обнаружите, что некоторые задачи были рассчитаны на некоторых узлах. DAGScheduler также будет учитывать локальность, но DAGScheduler рассматривается на уровне данных, который можно определить на уровне RDD, в то время как TaskScheduler рассматривает локальность расчета с точки зрения конкретного расчета.Это более конкретное базовое планирование, которое удовлетворяет локальности данных. и вычислительная локальность.

В методе resourceOfferSingleTaskSet мы видим, что есть переменная CPUS_PER_TASK, я всегда понимал, что задача выполняется ядром процессора, но на самом деле эта переменная исходит из параметра конфигурации spark.task.cpus, когда мы устанавливаем этот параметр, когда значение равно 2, задача будет назначена на 2 ядра. Из Stack Overflow я узнал, что настройка этого параметра на самом деле предназначена для удовлетворения потребностей некоторых специальных задач. Некоторые задачи могут иметь несколько потоков внутри или запускать дополнительные задачи. Потоки выполняют другие интерактивные операции, и этот параметр может гарантировать, что общая потребность в ресурсах ядра не превысит определенного установленного значения при выполнении в соответствии с заданными условиями (но здесь нет обязательного требования, если количество потоков, запускаемых Задачей, больше, чем set spark.task .cpus не будет проблемой, но может привести к вытеснению ресурсов из-за превышения определенного значения, что повлияет на эффективность).

Набор задач для выделения ресурсов на самом деле имеет определенный порядок.В методе TaskSchedulerImpl.resourceOffers вызывается rootPool.getSortedTaskSetQueue для получения набора задач, отсортированного в соответствии с определенными правилами для обработки обхода.Здесь правило - ранее упомянутое FIFO или FAIR, ссылаясь на это приоритет вычислений, принадлежащих TaskSet этапа. Функция resourceOffers также будет отмечать каждый живой слейв в начале, записывать имя его хоста и отслеживать добавлен ли новый Executor.Возможная ситуация здесь такова, что некоторые Executors зависают и перезапускают новый.TaskSet добавляется в вычислительную запись информации о ресурсе при вычислении запроса.

После выделения ресурсов задачи будет получено TaskDescription задачи, а затем CoarseGrainedSchedulerBackend вызывает метод launchTasks для отправки задачи в соответствующий ExecutorBackend для выполнения.

Если размер задачи превышает maxRpcMessageSize (по умолчанию 128M) после сериализации, она будет отброшена.В противном случае executorData получаются в соответствии с executorId, записанным в TaskDescription для выполнения задачи, а количество ядер, необходимых для выполнения задачи, равно вычитается из freeCores и отправляется с использованием метода отправки executorEndPoint. LaunchTask выполняет ExecutorBackend указанного Executor. LaunchTask — это класс case, а сохраненное содержимое — сериализованная задача.

использованная литература:

  • Бизнес-трилогия Spark Big Data / Ван Цзялинь, Дуань Чжихуа, Ся Ян под редакцией Пекин: издательство Университета Цинхуа, 2018 г.
  • stackoverflow.com/questions/3…