Механизм отказоустойчивости Spark Streaming

Spark

предисловие

···

1. Отказоустойчивость Spark Streaming

На этом этапе мы запускаем задачу приложения. В соответствии с режимом, который мы запускаем, и типом работающего кластера сервер будет выбран в качестве сервера драйвера в соответствии с определенной стратегией. После завершения его инициализации эти исполнители будут инициализированы путь.

После этого Драйвер отправляет Получателя Исполнителю, а Получатель отвечает за получение данных (можно также рассматривать это как задачу). Он хранит данные, которые получает, в виде блока каждые 200 миллисекунд, и этот блок обязательно будет иметь механизм копирования.

Когда блочный блок сгенерирован, он вернет драйверу отчетную информацию, чтобы сообщить ему о существовании блоков, а затем драйвер сгенерирует небольшую задачу и передаст ее этим исполнителям для завершения.

1.1 Исполнитель зависает

Тогда что нам делать, если наш Executor завис?

Во-первых, если виснет Executor внизу картинки, то это совсем не мешает, т.к. все данные в Executor выше, но когда Executor сверху тоже виснет, то его Receiver естественно не будет работать должным образом, и данные будут потеряны.

Вы думали, что я скажу это 😏? На самом деле это не потому, что сам Spark Streaming имеет хороший механизм отказоустойчивости: когда Executor с Receiver зависает, Driver автоматически находит другого Executor для повторного создания Receiver, потому что данные имеют резервные блоки, поэтому не Слишком много, чтобы беспокоиться о потере данных

А те задачи, которые распределены на приостановленный Исполнитель, также будут перераспределены для повторного выполнения, что не требует ручного вмешательства.

1.2 Водитель вешает трубку

Тогда, если Драйвер зависнет, все Экзекуторы тоже выйдут из строя.В это время нам нужно использовать механизм checkPoint, который будет периодически записывать информацию о Драйвере в HDFS.

Согласно этой идее, цель, которую мы хотим достичь, — позволить Драйверу автоматически восстанавливаться при зависании, а затем вычислять последний результат без вмешательства человека.

1.2.1 Настройка автоматического перезапуска драйвера

На данный момент предполагается, что мы находимся в режиме кластера, поскольку отказоустойчивость возможна только в режиме кластера.

Standalone

Добавьте следующие два параметра в spark-submit:

--deploy-mode cluster

--supervise(这个就是让任务自动重启的参数)

Yarn(в большинстве случаев)

Добавьте следующие параметры в spark-submit:

--deploy-mode cluster

установить в конфигурации пряжиyarn.resourcemanager.am.max-attemps, чтобы установить максимальное количество раз, когда задача может завершиться неудачей. Предположим, этот параметр установлен на 3 (наша компания в 3 раза 😶), тогда когда наша задача зависнет, Yarn автоматически перезапустит ее для нас, и если она зависнет в третий раз, то действительно зависнет.

Mesos

Marathon может перезапустить приложение Mesos

1.2.2 Установите каталог контрольной точки HDFS

Укажите каталог контрольной точки для хранения важной информации, иначе ее нельзя будет восстановить. Как правило, каталог, который мы устанавливаем, — это каталог HDFS, который сам по себе обладает характеристиками высокой доступности и не имеет единой точки отказа.

Всего одна строка кода, ничего особенного

streamingContext.setCheckpoint(hdfsDirectory) 

1.2.3 code

// Function to create and setup a new StreamingContext
def functionToCreateContext(): StreamingContext = {
  val ssc = new StreamingContext(...)   // new context
  val lines = ssc.socketTextStream(...) // create DStreams
  ...
  ssc.checkpoint(checkpointDirectory)   // set checkpoint directory
  ssc
}

// Get StreamingContext from checkpoint data or create a new one


val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)

// Do additional setup on context that needs to be done,
// irrespective of whether it is being started or restarted
context. ...

// Start the context
context.start()
context.awaitTermination()

Логика кода на самом деле getOrCreate.Если есть данные в checkpointDirectory, восстанавливаем, если нет данных, создаем Driver самостоятельно.

1.3 Проблема потери данных исполнителя

В это время мы можем восстановить Драйвер, но данные в памяти Исполнителя также теряются из-за простоя Драйвера, поэтому нам также приходится реализовывать некоторые механизмы отказоустойчивости для этой части данных.

Механизм WAL таков: например, когда Kafka отправляет данные через Data Stream, Получатель откроет механизм WAL после их получения и запишет данные в HDFS.После завершения записи Получатель уведомит Kafka о том, что данные был успешно получен (на данный момент для подтверждения Kafka установлено значение -1), этот метод на самом деле точно такой же, как и checkPoint, просто изменено имя WAL

1.3.1 Настройка каталога контрольных точек

streamingContext.setCheckpoint(hdfsDirectory)

Нет необходимости расширять это, что я только что сказал

1.3.2 Включить журнал WAL

По умолчанию он не включен, поэтому вручную установите для него значение true.

sparkConf.set(“spark.streaming.receiver.writeAheadLog.enable”, “true”)

1.3.3 reliable receiver

Не вводите себя в заблуждение, на самом деле это вопрос надежных источников данных. Этот надежный источник данных относится к Kafka. Когда данные записываются в WAL, он сообщает Kafka, что данные были использованы. Для данных, которые не возвращаются в Kafka, данные могут быть повторно использованы из Kafka.

Это кажется немного завихрением, но на самом деле это так: Receiver хочет записать данные в HDFS, в это время, если программа зависнет, писать некогда, то мы будем перевыгружать их из Kafka в на этот раз, то Kafka Он действительно может поддерживать откат данных, а затем отправлять ранее отправленные данные в Receiver (о чем было написано в серии статей Kafka).

Что такое ненадежный источник данных, такой как Socket, он не может сохранить прежние данные, если они потеряны, то они потеряны, и их нельзя получить из него снова.

1.3.4 Отменить резервное копирование

Используйте StorageLevel.MEMORY_AND_DISK_SER для хранения источников данных, нет необходимости в политике с суффиксом 2 (по умолчанию используется политика с суффиксом 2), поскольку HDFS уже имеет несколько копий.

1.4 Решить проблему, что некоторая задача выполняется очень медленно

Нам нужно включить механизм спекуляций:

spark.speculation=true,

После включения этого механизма он через равные промежутки времени будет проверять, какие запущенные задачи необходимо перепланировать.

spark.speculation.interval=100ms

Как мы оцениваем это время задача должна быть перепланирована? Необходимо выполнить два условия. На данный момент мы предполагаем, что есть 10 общих задач

  1. Количество успешных задач > 0,75 * 10, соответствующий параметр 0,75 равен,

    spark.speculation.quantile=0.75

0,75 — значение по умолчанию для этого параметра.

  1. Время выполнения запущенных задач > 1,5 *успехЗапуск задачисреднийвремя, соответствующий параметр 1,5,

    spark.speculation.multiplier=1.5

1,5 — значение по умолчанию для этого параметра.

Если два вышеуказанных условия соблюдены, выполняющаяся задача должна снова дождаться планирования. Если медлительность этой задачи связана с перекосом данных, см.Эта статья расскажет вам обо всех аспектах настройки Spark Core.Решения, упомянутые в

Если мы включим спекулятивное выполнение и случайно столкнемся с перекосом данных, есть шанс столкнуться с очень неловкой ситуацией, то есть время выполнения задачи с перекосом данных должно занимать 10 секунд, а у меня в это время 1~ Осталось 2 секунды После завершения обработки, в это время выполняются два вышеуказанных условия, эта задача распределяется, затем снова выполняется, затем наклоняется, снова распределяется..., эта задача не может быть обработана после бесконечного цикла.

Поэтому перед спекулятивным исполнением надо быть осторожным, оно может решить ситуацию с полным gc или джиттером сети, но если данные будут перекошены, то его существование станет узким местом.

1.5 Семантика потоковой передачи Spark

  1. Данные могут быть потеряны максимум после того, как запись обработана один раз или не обработана.

  2. По крайней мере один раз запись может быть обработана один или несколько раз и может обрабатываться повторно

  3. Ровно один раз запись обрабатывается только один раз, что является наиболее требовательным

finally

···