Spark Streaming VS Flink

задняя часть Kafka Flink Spark

В этой статье Spark Streaming и Flink сравниваются с точки зрения модели программирования, планирования задач, временного механизма, восприятия динамического раздела Kafka, отказоустойчивости и семантики обработки, а также обратного давления. Эта статья длинная, рекомендуется сначала ее собрать~

Сравнение архитектуры

запустить роль

Роли среды выполнения Spark Streaming (автономный режим) в основном включают:

  • Мастер: в основном отвечает за общее управление ресурсами кластера и планирование приложений;

  • Рабочий: отвечает за управление ресурсами отдельного узла, запуск драйверов и исполнителей и т. д.;

  • Драйвер: место, где выполняется программа входа пользователя, то есть место, где выполняется SparkContext, в основном генерация DAG, разделение этапов, генерация задач и планирование;

  • Исполнитель: отвечает за выполнение задач, обратную связь о статусе выполнения и результатах выполнения.

Роли среды выполнения Flink (автономный режим) в основном включают:

  • Jobmanager: координирует распределенное выполнение, планирует задачи, координирует контрольные точки, координирует восстановление после сбоя и т. д. Существует по крайней мере один JobManager. В случае высокой доступности может быть запущено несколько JobManager, один из которых выбирается ведущим, а остальные находятся в режиме ожидания;

  • Диспетчер задач: отвечает за выполнение определенных задач, кэширование и обмен потоками данных, имеется как минимум один диспетчер задач;

  • Слот: каждый слот задачи представляет собой фиксированную часть ресурсов диспетчера задач, а количество слотов представляет собой количество задач, которые диспетчер задач может выполнять параллельно.

экология

Рисунок 1: Экология потоковой передачи Spark, официальный сайт VIA Spark

Рисунок 2: Экосистема Flink, с официального сайта Flink.

запустить модель

Spark Streaming — это микропакетный процесс. При запуске необходимо указать время пакета. Каждый раз при запуске задания обрабатывается пакет данных. Процесс показан на рисунке 3:

Рисунок 3, с официального сайта Spark.

Flink управляется событиями, а события можно понимать как сообщения. Приложение, управляемое событиями, — это приложение с отслеживанием состояния, которое внедряет события из одного или нескольких потоков, обновляет состояние, запуская вычисления или внешние действия, которые реагируют на внедренные события.

Рисунок 4, с официального сайта Fink.

Сравнение моделей программирования

Сравнение моделей программирования в основном сравнивает различия в написании кода между flink и Spark Streaming.

Spark Streaming

Комбинация Spark Streaming и kafka — это в основном две модели:

  • На основе приемника dstream;

  • На основе прямого dstream.

Механизм программирования вышеуказанных двух моделей аналогичен, но существуют некоторые различия в API и встроенных данных. Новая версия отменила режим на основе приемника, и предприятие обычно принимает режим прямого DStame.

val Array(brokers, topics) = args//    创建一个批处理时间是2s的context    
   val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount")    
   val ssc = new StreamingContext(sparkConf, Seconds(2))    
   //    使用broker和topic创建DirectStream    
   val topicsSet = topics.split(",").toSet    
   val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)    
   val messages = KafkaUtils.createDirectStream[String, String]( ssc, LocationStrategies.PreferConsistent,    ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams))  
     // Get the lines, split them into words, count the words and print    
   val lines = messages.map(_.value)    
   val words = lines.flatMap(_.split(" "))    
   val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)   
    wordCounts.print()     //    启动流    
   ssc.start()    
   ssc.awaitTermination()

Через приведенный выше код мы можем получить:

  • Установить время партии

  • Создать поток данных

  • написать преобразование

  • написать действие

  • начать выполнение

Flink

Далее давайте посмотрим, как flink и kafka объединяются для написания кода. Комбинация Flink и kafka управляется событиями, у вас могут возникнуть сомнения по этому поводу, при потреблении данных kafka и вызове poll данные получаются пакетами (можно задать размер пакета и время ожидания), что нельзя назвать триггером события. По сути, flink внутренне организует опрашиваемые данные, а затем выдает их один за другим, формируя механизм, запускаемый событием. Следующий код — это flink, интегрирующий kafka в качестве источника данных и приемника данных:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
   env.getConfig().disableSysoutLogging();
   env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000));
   env.enableCheckpointing(5000); // create a checkpoint every 5 seconds
   env.getConfig().setGlobalJobParameters(parameterTool); // make parameters available in the web interface
   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);    
   //    ExecutionConfig.GlobalJobParameters
   env.getConfig().setGlobalJobParameters(null);    DataStream<KafkaEvent> input = env
           .addSource(                new FlinkKafkaConsumer010<>(
                   parameterTool.getRequired("input-topic"),                    new KafkaEventSchema(),
                   parameterTool.getProperties())
               .assignTimestampsAndWatermarks(new CustomWatermarkExtractor())).setParallelism(1).rebalance()
           .keyBy("word")
           .map(new RollingAdditionMapper()).setParallelism(0);
   
   input.addSink(            new FlinkKafkaProducer010<>(
                   parameterTool.getRequired("output-topic"),                    new KafkaEventSchema(),
                   parameterTool.getProperties()));
   
   env.execute("Kafka 0.10 Example");

Код от Flink в сочетании с kafka может получить:

  • зарегистрировать источник данных

  • Написать логику запуска

  • Зарегистрировать приемник данных

Вызов env.execute по сравнению со Spark Streaming требует меньше времени для настройки пакета, есть заметное отличие в том, что все операторы ленивы, форма flink, вызов env.execute строит jobgraph. Клиентская сторона Jobgraph отвечает за создание и отправку его в кластерную операцию; операции Spark Streaming и точки действия оператора и преобразование, при этом преобразование является единственной формой отложенного выполнения, и создание DAG, поэтапное разделение, планирование задач выполняется на стороне драйвера. , драйвер клиентского режима, работающий на клиенте.

Принцип планирования задач

Планирование задач Spark

Как упоминалось выше, задачи Spark Streaming основаны на микропакетной обработке, по сути, каждая партия представляет собой задачу Spark Core. Для задачи Spark Core, выполненной с помощью кодирования, она в основном включает следующие части от генерации до окончательного выполнения:

  • Построить граф DAG;

  • разделить этап;

  • генерировать набор задач;

  • Расписание задач.

Подробности см. на рис. 5:

Рисунок 5: Планирование задач Spark

Существует два режима планирования и выполнения заданий: FIFO и Fair, а задачи планируются и выполняются в соответствии с местоположением данных. Предположим, что тема kafka, используемая каждой задачей Spark Streaming, имеет четыре раздела с операцией преобразования (например, map) и операцией сокращения в середине, как показано на рис. 6.

Изображение 6

Предполагая, что есть два исполнителя, каждый с тремя ядрами, фиксирована ли соответствующая позиция выполнения задачи для каждого пакета? Можно ли это предсказать? Из-за локальности данных и неопределенности планирования текущее местоположение задач, созданных каждым пакетом, соответствующим разделам kafka, не является фиксированным.

Планирование задач Flink

Для потоковой задачи клиента FLINK Streamgraph будет сгенерирован первым, тогда работа будет сгенерирован, а затем jobgraph будет отправлен в JOBMANAGER, который завершит преобразование от Jobgraph к исполнению графики и, наконец, запланировано и выполняется работой.

Рисунок 7

Как показано на рисунке 7, существует программа, состоящая из источника данных, MapFunction и ReduceFunction. Параллелизм источника данных и MapFunction равен 4, а параллелизм ReduceFunction равен 3. Поток данных состоит из последовательности Source-Map-Reduce, работающей в кластере с 2 диспетчерами задач, каждый из которых имеет 3 слота задач.

Видно, что после отправки и выполнения генерации топологии flink, если нет ошибки, позиция выполнения компонентов топологии остается неизменной, а параллелизм определяется параллелизмом каждого оператора, подобно шторму. В потоковой передаче spark каждый пакет планируется в соответствии с местоположением данных и условиями ресурсов, и нет фиксированной топологии выполнения. Flink — это поток данных в топологической структуре, а Spark Streaming — это параллельная обработка пакетов кэша данных.

сравнение механизма времени

Время обработки потока

Программа потоковой обработки по концепции в три раза превышает концепцию времени:

  • время обработки

Время обработки относится к системному времени каждой машины, и когда программе потоковой передачи требуется время обработки, будет использоваться машинное время, которое запускает каждый экземпляр оператора. Время обработки — это простейшая концепция времени, не требующая координации между потоками и машинами и обеспечивающая наилучшую производительность с наименьшей задержкой. Однако в распределенных и асинхронных средах время обработки не может гарантировать синхронизацию событий сообщений, поскольку оно ограничено задержками передачи сообщений, скоростью, с которой сообщения передаются между операторами, и т. д.

  • время события

Время события — это время, когда событие произошло на его устройстве, которое было встроено в событие до того, как оно вошло в flink, и затем flink может извлечь это время. Программы потоковой передачи, основанные на времени события, могут гарантировать порядок событий во время обработки, но приложения, основанные на времени события, должны включать механизм водяных знаков. Обработка, основанная на времени события, часто имеет определенную задержку, поскольку она должна ждать последующих событий и обрабатывать неупорядоченные события, и ее следует тщательно учитывать при использовании для приложений, чувствительных ко времени.

  • время впрыска

Время внедрения — это время, когда событие внедряется в flink. Событие получает текущее время источника у оператора источника в качестве времени внедрения события, и последующие операторы обработки на основе времени будут использовать это время для обработки данных.

По сравнению с событием времени время впрыска не может быть вне заказа или отставать от события, но приложение, неупорядоченное указывать, как генерировать водяной знак. Подобное время обработки и время события программы внутри инъекции, но штемпель генерации водяных знаков и распределительной марки автоматически.

На рисунке 8 хорошо видна разница между тремя временами:

Рисунок 8

Механизм синхронизации искры

Spark Streaming поддерживает только время обработки, структурированная потоковая передача поддерживает время обработки и время события, а также поддерживает механизм водяных знаков для обработки данных задержки.

Механизм времени Flink

Flink поддерживает три временных механизма: время события, время внедрения, время обработки, а также поддерживает механизм водяных знаков для обработки запаздывающих данных.

kafka обнаружение динамического разбиения

Spark Streaming

Для предприятий с потребностями бизнеса в режиме реального времени объем данных будет увеличиваться синхронно с ростом бизнеса, что приведет к тому, что исходное количество разделов kafka не будет соответствовать параллелизму, необходимому для записи данных.Необходимо расширить раздел kafka или увеличить тему kafka.В настоящее время программы обработки в реальном времени, такие как SparkStreaming и flink, необходимы для обнаружения недавно добавленных тем и разделов Kafka и использования данных недавно добавленных разделов.

Затем, в сочетании с анализом исходного кода, можно определить, могут ли Spark Streaming и flink динамически обнаруживать новый раздел, а также использовать и обрабатывать данные нового раздела при добавлении темы или раздела в kafka. Комбинация Spark Streaming и kafka имеет две относительно разные версии.На рисунке 9 приведены сравнительные данные, приведенные на официальном сайте:

Рисунок 9

Среди них подтверждено, что комбинация Spark Streaming и kafka версии 0.8 не поддерживает динамическое определение разделов, а поддерживается в комбинации с версией 0.10, а затем посредством анализа исходного кода.

Spark Streaming в сочетании с версией kafka 0.8

*Анализ исходного кода предназначен только для обнаружения разделов

Вход - DirectKafkainPutdream Of Compute:

override def compute(validTime: Time): Option[KafkaRDD[K, V, U, T, R]] = {//    改行代码会计算这个job,要消费的每个kafka分区的最大偏移
   val untilOffsets = clamp(latestLeaderOffsets(maxRetries))//    构建KafkaRDD,用指定的分区数和要消费的offset范围
   val rdd = KafkaRDD[K, V, U, T, R](
     context.sparkContext, kafkaParams, currentOffsets, untilOffsets, messageHandler)    // Report the record number and metadata of this batch interval to InputInfoTracker.
   val offsetRanges = currentOffsets.map { case (tp, fo) =>
     val uo = untilOffsets(tp)      OffsetRange(tp.topic, tp.partition, fo, uo.offset)
   }    val description = offsetRanges.filter { offsetRange =>
     // Don't display empty ranges.
     offsetRange.fromOffset != offsetRange.untilOffset
   }.map { offsetRange =>
     s"topic: ${offsetRange.topic}\tpartition: ${offsetRange.partition}\t" +
       s"offsets: ${offsetRange.fromOffset} to ${offsetRange.untilOffset}"
   }.mkString("\n")    // Copy offsetRanges to immutable.List to prevent from being modified by the user
   val metadata = Map(      "offsets" -> offsetRanges.toList,      StreamInputInfo.METADATA_KEY_DESCRIPTION -> description)    val inputInfo = StreamInputInfo(id, rdd.count, metadata)
   ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)

   currentOffsets = untilOffsets.map(kv => kv._1 -> kv._2.offset)    Some(rdd)
 }

Первая строка — это рассчитанное максимальное смещение, сгенерированное KafkaRDD для пакета, который будет использоваться для каждого раздела. Затем посмотрите lastLeaderOffsets (maxRetries)

@tailrec  protected final def latestLeaderOffsets(retries: Int): Map[TopicAndPartition, LeaderOffset] = {//    可以看到的是用来指定获取最大偏移分区的列表还是只有currentOffsets,没有发现关于新增的分区的内容。
   val o = kc.getLatestLeaderOffsets(currentOffsets.keySet)    // Either.fold would confuse @tailrec, do it manually
   if (o.isLeft) {      val err = o.left.get.toString      if (retries <= 0) {        throw new SparkException(err)
     } else {
       logError(err)        Thread.sleep(kc.config.refreshLeaderBackoffMs)
       latestLeaderOffsets(retries - 1)
     }
   } else {
     o.right.get
   }
 }

Где protected var currentOffsets = fromOffsets, это инициализируется только при построении DirectKafkaInputDStream и обновляется при вычислении:

currentOffsets = untilOffsets.map(kv => kv._1 -> kv._2.offset)

В kafka нет кода для обнаружения новых тем или разделов, поэтому можно подтвердить, что комбинация Spark Streaming и kafka 0.8 не поддерживает динамическое обнаружение разделов.

Spark Streaming в сочетании с kafka версии 0.10

Вход также является методом вычисления DirectKafkaInputDStream.Чтобы выбрать основную часть, первая строка в вычислении также предназначена для вычисления максимального смещения каждого раздела, который будет использоваться kafkardd, сгенерированным текущим заданием:

//    获取当前生成job,要用到的KafkaRDD每个分区最大消费偏移值
   val untilOffsets = clamp(latestOffsets())

Код для обнаружения недавно добавленной темы или раздела kafka находится в lastOffsets().

/**   
* Returns the latest (highest) available offsets, taking new partitions into account.   */
 protected def latestOffsets(): Map[TopicPartition, Long] = {    val c = consumer
   paranoidPoll(c)    // 获取所有的分区信息
   val parts = c.assignment().asScala    // make sure new partitions are reflected in currentOffsets
   // 做差获取新增的分区信息
   val newPartitions = parts.diff(currentOffsets.keySet)    // position for new partitions determined by auto.offset.reset if no commit
   // 新分区消费位置,没有记录的化是由auto.offset.reset决定
   currentOffsets = currentOffsets ++ newPartitions.map(tp => tp -> c.position(tp)).toMap    // don't want to consume messages, so pause
   c.pause(newPartitions.asJava)    // find latest available offsets
   c.seekToEnd(currentOffsets.keySet.asJava)
   parts.map(tp => tp -> c.position(tp)).toMap
 }

В этом методе есть процесс получения нового раздела кафки и обновляя его в CurrentOffsets, поэтому его можно проверить, что комбинация искровой потоковой передачи и кафка 0.10 поддерживает динамическое обнаружение раздела.

Flink

Начальным классом является FlinkKafkaConsumerBase, который является родительским классом для всех потребителей flink kafka.

Рисунок 10

В методе run FlinkKafkaConsumerBase создается kafkaFetcher, который на самом деле является потребителем:

this.kafkaFetcher = createFetcher(
       sourceContext,
       subscribedPartitionsToStartOffsets,
       periodicWatermarkAssigner,
       punctuatedWatermarkAssigner,
       (StreamingRuntimeContext) getRuntimeContext(),
       offsetCommitMode,
       getRuntimeContext().getMetricGroup().addGroup(KAFKA_CONSUMER_METRICS_GROUP),
       useMetrics);

Следующий шаг — создать поток, который периодически обнаруживает новые разделы в kafka и добавляет их в kafkaFetcher.

if (discoveryIntervalMillis != PARTITION_DISCOVERY_DISABLED) {      final AtomicReference<Exception> discoveryLoopErrorRef = new AtomicReference<>();      this.discoveryLoopThread = new Thread(new Runnable() {        @Override
       public void run() {          try {            // --------------------- partition discovery loop ---------------------

           List<KafkaTopicPartition> discoveredPartitions;            // throughout the loop, we always eagerly check if we are still running before
           // performing the next operation, so that we can escape the loop as soon as possible

           while (running) {              if (LOG.isDebugEnabled()) {                LOG.debug("Consumer subtask {} is trying to discover new partitions ...", getRuntimeContext().getIndexOfThisSubtask());
             }              try {
               discoveredPartitions = partitionDiscoverer.discoverPartitions();
             } catch (AbstractPartitionDiscoverer.WakeupException | AbstractPartitionDiscoverer.ClosedException e) {                // the partition discoverer may have been closed or woken up before or during the discovery;
               // this would only happen if the consumer was canceled; simply escape the loop
               break;
             }              // no need to add the discovered partitions if we were closed during the meantime
             if (running && !discoveredPartitions.isEmpty()) {
               kafkaFetcher.addDiscoveredPartitions(discoveredPartitions);
             }              // do not waste any time sleeping if we're not running anymore
             if (running && discoveryIntervalMillis != 0) {                try {                  Thread.sleep(discoveryIntervalMillis);
               } catch (InterruptedException iex) {                  // may be interrupted if the consumer was canceled midway; simply escape the loop
                 break;
               }
             }
           }
         } catch (Exception e) {
           discoveryLoopErrorRef.set(e);
         } finally {            // calling cancel will also let the fetcher loop escape
           // (if not running, cancel() was already called)
           if (running) {
             cancel();
           }
         }
       }
     }, "Kafka Partition Discovery for " + getRuntimeContext().getTaskNameWithSubtasks());

     discoveryLoopThread.start();
     kafkaFetcher.runFetchLoop();

Выше приведен процесс динамического обнаружения новых разделов в kafka. Однако, в отличие от Spark, который не требует никакой настройки, flink динамически обнаруживает новые разделы в kafka, и эту функцию нужно включить. Это также очень просто, вам нужно установить свойство flink.partition-discovery.interval-millis больше 0.

Механизмы отказоустойчивости и семантика обработки

В этом разделе в основном хотят противопоставить как восстановление, так и то, как обеспечить семантику только одного процесса. На этот раз для вопроса: при обработке в реальном времени, как обеспечить, чтобы данные обрабатывались только один раз семантика?

Spark Streaming гарантирует только одну обработку

Для задач Spark Streaming мы можем установить контрольную точку, а затем, если произойдет сбой и произойдет перезапуск, мы сможем восстановиться с последней контрольной точки, но такое поведение может только предотвратить потерю данных, может обрабатываться повторно и не может достичь точно- после обработки семантики.

Для прямого потока в сочетании со Spark Streaming и kafka вы можете поддерживать смещение для zookeeper, kafka или любой другой внешней системы и отправлять смещение после каждой отправки результата, чтобы при восстановлении после сбоя и перезапуске можно было использовать последнее отправленное смещение. для восстановления, чтобы убедиться, что данные не потеряны. Однако, если сбой произойдет после отправки результата и до отправки смещения, данные будут обрабатываться несколько раз.В это время нам нужно убедиться, что многократный вывод результата обработки не влияет на нормальный бизнес.

Отсюда можно проанализировать, что, предполагая, что данные обрабатываются ровно один раз, вывод результата и фиксация смещения должны быть выполнены в рамках транзакции. Здесь есть два подхода:

  • Вывод действия repartition(1) Spark Streaming становится только одним разделом, что можно сделать с помощью транзакций:
Dstream.foreachRDD(rdd=>{
   rdd.repartition(1).foreachPartition(partition=>{    //    开启事务
       partition.foreach(each=>{//        提交数据
       })    //  提交事务
   })
 })
  • Отправьте результат со смещением

Это результат данных, содержащих смещение. Результаты и это операция смещения подачи завершена, данные не будут потеряны, процесс не будет повторяться. Время восстановления может использовать результаты последней фиксации смещения пояса.

Flink и kafka 0.11 гарантируют только одну обработку

Чтобы приемник поддерживал однократную семантику, данные должны записываться в Kafka транзакционным образом, чтобы при фиксации транзакции все операции записи между двумя контрольными точками выполнялись как одна транзакция. Это гарантирует возможность отката этих записей в случае сбоя или сбоя.

В распределенном приложении с несколькими параллельными приемниками выполнения недостаточно выполнить одну фиксацию или откат, поскольку все компоненты должны согласовывать эти фиксации или откаты для обеспечения согласованных результатов. Flink решает эту проблему, используя двухэтапный протокол фиксации с фазой предварительной фиксации.

Приложение Flink в этом примере состоит из следующих компонентов, как показано на рисунке 11:

  • Источник, который считывает данные из Kafka (например, KafkaConsumer)

  • Вечеринка с временным окном

  • Приемник, который записывает результаты обратно в Kafka (например, KafkaProducer)

Рисунок 11

Ниже приводится подробное объяснение идей двухэтапной отправки flink:

Рисунок 12

Как показано на рис. 12, при запуске контрольной точки Flink она переходит в фазу предварительной фиксации. В частности, после запуска контрольной точки диспетчер задач Flink записывает барьер контрольной точки во входной поток и делит все сообщения в потоке на сообщения, принадлежащие текущей контрольной точке, и сообщения, принадлежащие следующей контрольной точке.Барьер также будет передаваться между операторами. Для каждого оператора барьер запускает серверную часть состояния оператора, чтобы сделать моментальный снимок состояния оператора. Источник данных сохраняет смещение Kafka, а затем передает барьер контрольной точки последующим операторам.

Этот подход работает, только если у оператора есть только его внутреннее состояние. Внутреннее состояние относится к содержимому, сохраняемому и управляемому бэкендами состояния Flink (например, сумма, рассчитанная агрегацией окон во втором операторе).

Когда процесс имеет только свое внутреннее состояние, нет необходимости делать что-либо еще на этапе предварительной фиксации, кроме как записывать изменения данных в серверную часть состояния перед созданием контрольной точки. Когда контрольная точка пройдет успешно, Flink правильно отправит эти записи, а когда контрольная точка завершится неудачно, отправка будет прекращена.Процесс можно увидеть на рисунке 13.

Рисунок 13.

В сочетании с внешними системами внешняя система должна поддерживать транзакции, которые могут быть связаны с двумя этапами отправки протоколов. Очевидно, что SINK в этом примере представляет Kafka Sink, поэтому его необходимо прогнозировать с помощью внешних транзакций на этапе предварительной зарядки DATA SINK. Как показано ниже:

Рисунок 14

Когда барьер пройден через все операторы и инициированная запись моментального снимка завершена, фаза предварительной фиксации завершена. Все моментальные снимки состояния триггера считаются частью контрольной точки, или контрольная точка — это моментальный снимок состояния всего приложения, включая внешнее состояние перед фиксацией. Сбой можно восстановить с контрольной точки. Следующий шаг — уведомить всех операторов об успешном прохождении контрольной точки. На этом этапе менеджер заданий инициирует логику обратного вызова для каждого оператора, чтобы контрольная точка была завершена.

В этом примере у источника данных и оконных операций нет внешнего состояния, поэтому на данном этапе этим двум операторам не нужно выполнять какую-либо логику, но у приемника данных есть внешнее состояние, поэтому мы должны отправлять внешние транзакции в это время, т.к. показано на следующем рисунке:

Рисунок 15

Выше приведена базовая логика для того, чтобы flink реализовал ровно одну обработку.

Back pressure

Скорость потребления потребителей ниже, чем скорость производства производителя. Для того, чтобы сделать приложение нормально, потребители будут обратиться к производителю регулировать скорость производства производителя, так что насколько потребности потребителей и сколько производителю производителя Отказ

* противодавление всегда упоминается как противодавление.

Противодавление для потоковой передачи Spark

Комбинация Spark Streaming и kafka имеет механизм обратного давления, целью которого является корректировка количества сообщений kafka, получаемых в последующих пакетах, в соответствии с обработкой текущего задания. Для достижения этой цели Spark Streaming добавляет в исходную архитектуру RateController. Используемый алгоритм — PID. Требуемые данные обратной связи — это время окончания, время планирования, время обработки и количество сообщений обработки задачи. Эти данные передаются через систему SparkListener. Получите, а затем рассчитайте скорость с помощью вычислений PIDRateEsimator, а затем рассчитайте смещение, а затем сравните его с максимальным количеством сообщений потребления, установленным пределом скорости, чтобы получить максимальное смещение окончательного сообщения до потребляться.

Метод вычисления PIDRateEsimator выглядит следующим образом:

def compute(      time: Long, // in milliseconds
     numElements: Long,      processingDelay: Long, // in milliseconds
     schedulingDelay: Long // in milliseconds
   ): Option[Double] = {
   logTrace(s"\ntime = $time, # records = $numElements, " +
     s"processing time = $processingDelay, scheduling delay = $schedulingDelay")    this.synchronized {      if (time > latestTime && numElements > 0 && processingDelay > 0) {        val delaySinceUpdate = (time - latestTime).toDouble / 1000

       val processingRate = numElements.toDouble / processingDelay * 1000

       val error = latestRate - processingRate        val historicalError = schedulingDelay.toDouble * processingRate / batchIntervalMillis        // in elements/(second ^ 2)
       val dError = (error - latestError) / delaySinceUpdate        val newRate = (latestRate - proportional * error -
                                   integral * historicalError -
                                   derivative * dError).max(minRate)
       logTrace(s"""            | latestRate = $latestRate, error = $error            | latestError = $latestError, historicalError = $historicalError            | delaySinceUpdate = $delaySinceUpdate, dError = $dError            """.stripMargin)

       latestTime = time        if (firstRun) {
         latestRate = processingRate
         latestError = 0D
         firstRun = false
         logTrace("First run, rate estimation skipped")          None
       } else {
         latestRate = newRate
         latestError = error
         logTrace(s"New rate = $newRate")          Some(newRate)
       }
     } else {
       logTrace("Rate estimation skipped")        None
     }
   }
 }

Противодавление во Флинке

В отличие от обратного давления Spark Streaming, обратное давление Flink заключается в том, что менеджер заданий инициирует 100 вызовов Thread.getStackTrace() для каждой задачи каждые 50 мс, чтобы определить коэффициент блокировки. Процесс показан на рисунке 16:

16

Коэффициент блокировки в сети делится на три уровня:

  • OK: 0

  • НИЗКИЙ: 0,10

  • ВЫСОКИЙ: 0,5