Правильная поза для потоковой передачи Spark для использования Kafka

Java

предисловие

В игровых проектах необходимо ежедневно выполнять статистику частоты слов для информации о десятках миллионов игровых комментариев.На стороне производителя мы сохраняем данные в Kafka в соответствии с ежедневным временем извлечения, а на стороне потребителя мы используем искру Непрерывно извлекайте данные из kafka для статистики частоты слов. В этой статье сначала кратко описывается способ внедрения потоковой передачи Spark в kafka, затем кратко описывается применение потоковой передачи Spark + kafka в проектах общественного мнения и, наконец, обобщается мой опыт фактической оптимизации потоковой передачи Spark + kafka. (Если есть какие-либо ошибки, добро пожаловать, чтобы добавить их, я исправлю их как можно скорее ^v^)

Потоковые данные Spark получены Kafka

Чтобы использовать искровую потоковую передачу для обработки данных в kafka, первым шагом, конечно же, является получение данных и преобразование их в структуру данных Dstream в искровой потоковой передаче. Есть два способа получить данные: 1. Использовать Receiver для получения данных, 2. Считать данные напрямую из kafka.

Подход на основе приемника

Таким образом, приемник (Receiver) используется для получения данных в kafka, и самым простым является использование высокоуровневого пользовательского API-интерфейса Kafka. Для всех приемников данные, полученные от kafka, будут храниться в исполнителе spark, а затем задание, отправленное посредством spark streaming, будет обрабатывать данные. Как показано ниже:
Receiver图形解释
При использовании нам нужно добавить соответствующий пакет зависимостей:

<dependency><!-- Spark Streaming Kafka -->
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka_2.10</artifactId>
    <version>1.6.3</version>
</dependency>

Основное использование Scala выглядит следующим образом:

import org.apache.spark.streaming.kafka._

 val kafkaStream = KafkaUtils.createStream(streamingContext, 
     [ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])

Следует отметить еще несколько моментов:

  • В методе Receiver разделы в Spark не связаны с разделами в kafka, поэтому, если мы увеличим количество разделов на тему, мы просто увеличим количество потоков для обработки тем, потребляемых одним Receiver. Но это не увеличивает параллелизма Spark в обработке данных.
  • Для разных групп и тем мы можем использоватьНесколько приемниковСоздавайте разные потоки данных для параллельного получения данных, которые затем можно использовать.союз для объединенияв D-поток.
  • Если мы включим журналы опережающей записи для копирования в файловую систему, такую ​​как HDFS, то уровень хранения должен быть установлен на StorageLevel.MEMORY_AND_DISK_SER, что равноKafkaUtils.createStream(..., StorageLevel.MEMORY_AND_DISK_SER)

прямое чтение

После spark1.3 был введен прямой режим. В отличие от метода Receiver, метод Direct не имеет слоя получателя, который периодически получает последние смещения в каждом разделе каждой темы в Kafka, а затем обрабатывает каждый пакет в соответствии с установленным значением maxRatePerPartition. Его форма следующая:

Преимущества этого метода по сравнению с методом приемника:

  • упрощенный параллелизм: В методе Receiver мы упомянули, что после создания нескольких Receivers мы используем union для объединения их в метод Dstream для улучшения параллелизма передачи данных. А в прямом режимеРаздел в Kafka соответствует разделу в RDD один к одномуПараллельное чтение данных Kafka, эти отношения сопоставления также более способствуют пониманию и оптимизации.
  • Эффективный: В методе Receiver, чтобы добиться нулевой потери данных, данные должны храниться в журнале упреждающей записи, чтобы две копии данных сохранялись в Kafka и в журнале, что является пустой тратой! Второй метод не имеет этой проблемы.Пока время хранения данных Kafka достаточно велико, мы можем восстановить данные из Kafka.
  • ровно один раз: в методе Receiver высокоуровневый API-интерфейс Kafka используется для получения значения смещения из Zookeeper, что также является традиционным способом чтения данных из Kafka, но поскольку данные, потребляемые Spark Streaming, и смещение, записанное в Zookeeper, не Синхронизация, которая иногда приводит к повторному потреблению данных. Во втором методе напрямую используется простой низкоуровневый API Kafka, а Offsets использует для записи контрольные точки Spark Streaming, что устраняет это несоответствие.

Вышеприведенное в основном представляет собой простой перевод официального документа [1].Для получения подробной информации вы можете непосредственно прочитать официальный документ и не будете повторять его здесь.

В отличие от Receiver, значение смещения считывается из Zookeeper, затем zookeeper естественным образом сохраняет значение смещения текущего потребления, а затем, если потребление перезапускается, оно продолжит использовать последнее значение смещения. В методе Direct мы считываем данные напрямую из kafka, поэтому смещение нужно записывать само по себе, которое можно записать с помощью контрольной точки, базы данных или файловых записей, либо записать обратно в zookeeper.
В примере KafkaManager — это общий класс, а KafkaCluster — это класс в исходном коде kafka. Из-за разрешения на имя пакета я поместил его отдельно. ComsumerMain просто показывает использование общего класса. Просмотрите последние смещения записи потребления из zooker, и после завершения каждой пакетной обработки смещения будут синхронизироваться с zookeeper.

Spark пишет данные в Кафку

Вышеизложенное объясняет, как Spark считывает данные из Kafka в потоковом режиме.Затем я организую и записываю данные в Kafka. В отличие от чтения данных, Spark не предоставляет унифицированного интерфейса для записи в Kafka, поэтому нам нужно использовать базовый интерфейс Kafka для его переноса.
Самый прямой способ, о котором мы можем думать, выглядит следующим образом:

input.foreachRDD(rdd =>
  // 不能在这里创建KafkaProducer
  rdd.foreachPartition(partition =>
    partition.foreach{
      case x:String=>{
        val props = new HashMap[String, Object]()
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
          "org.apache.kafka.common.serialization.StringSerializer")
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
          "org.apache.kafka.common.serialization.StringSerializer")
        println(x)
        val producer = new KafkaProducer[String,String](props)
        val message=new ProducerRecord[String, String]("output",null,x)
        producer.send(message)
      }
    }
  )
) 

Однако недостаток этого метода очевиден.Для каждой записи каждой партиции нам нужно создать KafkaProducer, а затем использовать производителя для выполнения операций вывода.Обратите внимание, что мы не можем размещать вновь созданные задачи KafkaProducer вне foreachPartition, потому что KafkaProducer не сериализуем. Очевидно, что этот подход негибок и неэффективен, поскольку для каждой записи необходимо установить соединение. Как это решить?

  1. Во-первых, нам нужно упаковать KafkaProducer с помощью lazy val следующим образом:
import java.util.concurrent.Future
import org.apache.kafka.clients.producer.{ KafkaProducer, ProducerRecord, RecordMetadata }
class KafkaSink[K, V](createProducer: () => KafkaProducer[K, V]) extends Serializable {
  /* This is the key idea that allows us to work around running into
     NotSerializableExceptions. */
  lazy val producer = createProducer()
  def send(topic: String, key: K, value: V): Future[RecordMetadata] =
    producer.send(new ProducerRecord[K, V](topic, key, value))
  def send(topic: String, value: V): Future[RecordMetadata] =
    producer.send(new ProducerRecord[K, V](topic, value))
}

object KafkaSink {
  import scala.collection.JavaConversions._
  def apply[K, V](config: Map[String, Object]): KafkaSink[K, V] = {
    val createProducerFunc = () => {
      val producer = new KafkaProducer[K, V](config)
      sys.addShutdownHook {
        // Ensure that, on executor JVM shutdown, the Kafka producer sends
        // any buffered messages to Kafka before shutting down.
        producer.close()
      }
      producer
    }
    new KafkaSink(createProducerFunc)
  }
  def apply[K, V](config: java.util.Properties): KafkaSink[K, V] = apply(config.toMap)
}
  1. Затем мы используем форму широковещательных переменных для трансляции KafkaProducer каждому исполнителю следующим образом:
// 广播KafkaSink
val kafkaProducer: Broadcast[KafkaSink[String, String]] = {
  val kafkaProducerConfig = {
    val p = new Properties()
    p.setProperty("bootstrap.servers", Conf.brokers)
    p.setProperty("key.serializer", classOf[StringSerializer].getName)
    p.setProperty("value.serializer", classOf[StringSerializer].getName)
    p
  }
  log.warn("kafka producer init done!")
  ssc.sparkContext.broadcast(KafkaSink[String, String](kafkaProducerConfig))
}

Таким образом, мы можем спокойно вводить данные в kafka в каждом исполнителе:

//输出到kafka
segmentedStream.foreachRDD(rdd => {
  if (!rdd.isEmpty) {
    rdd.foreach(record => {
      kafkaProducer.value.send(Conf.outTopics, record._1.toString, record._2)
      // do something else
    })
  }
})

Потоковая передача Spark + приложение Kafka

Мониторинг общественного мнения WeTest требует статистики частоты слов в режиме реального времени для информации о комментариях десятков миллионов игроков, просматриваемых каждый день.Для просканированных данных комментариев игроков мы будем создавать их в Kafka, и мы используем Spark для потребителей на другой конец.Потоковая передача используется для потоковой обработки.Сначала мы используем метод Direct, описанный выше, для извлечения пакетов из Kafka, а затем выполняем сегментацию слов, статистику и другую связанную обработку, а также записываем обратно в БД (как и для метода обратной записи БД в Spark, пожалуйста, обратитесь к моему предыдущему сообщению в блоге:Искра наступает на яму - база данных (Hbase+Mysql)), таким образом эффективно выполняя задачу статистики частоты слов для больших объемов данных каждый день в режиме реального времени.

Потоковая передача Spark + настройка Kafka

При использовании потоковой передачи Spark + Kafka, когда объем данных невелик, конфигурация и использование по умолчанию часто могут соответствовать ситуации, но когда объем данных велик, необходимо внести определенные корректировки и оптимизации, и эта корректировка и сама оптимизация тоже. Разные сценарии требуют разных конфигураций.

Разумное время партии (batchDuration)

Почти во всех документах по настройке Spark Streaming упоминается корректировка пакетного времени.При инициализации StreamingContext существует параметр, который устанавливает пакетное время. Если это значение установлено слишком коротким, то есть задания, созданные каждым batchDuration, не могут быть обработаны в течение этого периода, что приведет к накоплению данных и, в конечном итоге, к блокировке Spark Streaming. Более того, как правило, параметр batchDuration не будет меньше 500 мс, потому что если он слишком мал, SparkStreaming будет часто отправлять задания, что вызовет дополнительную нагрузку на весь стриминг. В обычных приложениях, в соответствии с различными сценариями приложений и конфигурациями оборудования, я устанавливаю его в диапазоне от 1 до 10. Мы можем настроить batchDuration, наблюдая за общей задержкой в ​​соответствии с интерфейсом визуального мониторинга SparkStreaming, как показано на следующем рисунке:

Разумная сумма кафки (MaxRatePartition важно)

Для сценария приложения, в котором Spark Streaming использует данные в kafka, эта конфигурация очень важна Параметр конфигурации: spark.streaming.kafka.maxRatePerPartition. Этот параметр не в сети по умолчанию, то есть сколько данных есть в кафке будет напрямую вытащено. В зависимости от скорости, с которой производитель записывает данные в Kafka, и скорости, с которой потребитель обрабатывает данные, этот параметр необходимо комбинировать с указанным выше параметром batchDuration, чтобы данные, извлекаемые каждым разделом в течение каждого batchDuration, могли обрабатываться плавно. возможная пропускная способность, и настройка этого параметра может относиться к входной скорости и времени обработки в интерфейсе визуального мониторинга, как показано ниже:

Кэшировать повторно используемые Dstreams (RDD)

Если RDD в Spark и Dstream в SparkStreaming используются повторно, лучше всего использовать cache() для кэширования потока данных, чтобы предотвратить накладные расходы на сеть, вызванные чрезмерными ресурсами планирования. Вы можете обратиться к наблюдению за параметром задержки планирования, как показано ниже:

Настройте разумный GC

Те, кто давно использует Java, знают, что механизм сборки мусора в JVM позволяет не уделять слишком много внимания выделению и утилизации памяти, а больше сосредоточиться на бизнес-логике, и JVM сделает это для нас. Друзья, у которых есть некоторое представление о JVM, должны знать, что в виртуальной машине Java память делится на поколение eden, молодое поколение, старое поколение и постоянное поколение.Вторичный GC занимает определенное время, особенно переработка GC старого возраста , которому нужно организовать фрагментацию памяти, обычно методом пометки-очищения. В той же программе Spark частота и время JVM GC также являются ключевыми факторами, влияющими на эффективность всего Spark. При обычном использовании рекомендуется:

--conf "spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC"

Установите разумное количество ресурсов ЦП

Количество ядер ЦП, каждый исполнитель может занимать одно или несколько ядер, можно понять использование вычислительных ресурсов через наблюдаемые изменения использования ЦП, например, очень распространенным типом потерь является то, что исполнитель занимает несколько ядер, но в целом Использование ЦП невелико (поскольку исполнитель не всегда в полной мере использует возможности многоядерности), на этот раз, чтобы рассмотреть это, исполнитель занимает меньше ядра, при добавлении большего количества рабочих исполнителей ниже или хоста больше рабочих увеличивается выше. executor для увеличения количества параллельных операций, тем самым повышая загрузку ЦП. Но увеличение исполнителя необходимо учитывать при хорошем потреблении памяти, потому что машинная память, выделенная большему исполнителю, меньше памяти каждого исполнителя, так что возникает ситуация, когда слишком много данных переливается даже из памяти.

Установите разумный параллелизм

раздел и параллелизм, раздел относится к количеству осколков данных. Каждая задача может обрабатывать данные только одного раздела. Если это значение слишком мало, объем данных на срез будет слишком большим, что приведет к нехватке памяти или вычислительной мощности из многих исполнителей не может быть использован, он полностью используется, но если он слишком велик, это вызовет слишком много осколков и снизит эффективность выполнения. При выполнении операций типа действия (например, различных операций сокращения) в качестве количества разделов будет выбран самый большой в родительском RDD. Под параллелизмом понимается количество разделов, возвращаемое по умолчанию, когда RDD выполняет операции сокращения (а при выполнении операций отображения количество разделов обычно берется из большего родительского RDD, и перемешивание не задействовано. , поэтому этот параметр параллелизма не имеет эффект). Таким образом, эти два понятия тесно связаны, и оба предполагают сегментирование данных, а способ действия фактически унифицирован. Количество сегментов по умолчанию можно задать с помощью spark.default.parallelism, и многие операции RDD могут указывать параметр раздела для явного управления определенным количеством сегментов.
При использовании SparkStreaming+kafka мы используем метод прямого подключения.Как объяснялось выше, партиция в Spark и партиция в Kafka находятся во взаимно-однозначном соответствии.Мы обычно устанавливаем количество партиций в Kafka по умолчанию.

Используйте высокопроизводительные операторы

Здесь есть ссылка на сообщение в блоге технической команды Meituan, и никаких конкретных тестов производительности не проводилось.Предложения заключаются в следующем:

  • Используйте undedbykey / Aggrugebybey, а не Groupbykey
  • Используйте mapPartitions вместо карты нормалей
  • Используйте foreachPartitions вместо foreach
  • Операция объединения после использования фильтра
  • Используйте repartitionAndSortWithinPartitions вместо операций перераспределения и сортировки.

Оптимизация производительности сериализации с помощью Kryo

Я сам не проверял этот принцип оптимизации, но во многих документах по оптимизации он упоминается и записывается здесь.
В Spark сериализация в основном задействована в трех местах:

  • Когда внешняя переменная используется в функции оператора, эта переменная будет сериализована для передачи по сети (см. объяснение в «Принципе 7: Широковещательная передача больших переменных»).
  • Когда пользовательский тип используется в качестве общего типа RDD (например, Javardd, студент - это пользовательский тип), все объекты пользовательского типа будут сериализованы. Следовательно, в этом случае также требуется, чтобы пользовательский класс должен реализовать интерфейс сериализации.
  • При использовании сериализуемой стратегии сохраняемости (например, MEMORY_ONLY_SER) Spark сериализует каждый раздел в RDD в большой массив байтов.

Для этих трех мест, где происходит сериализация, мы можем оптимизировать производительность сериализации и десериализации с помощью библиотеки сериализации Kryo. Spark по умолчанию использует механизм сериализации Java, то есть API ObjectOutputStream/ObjectInputStream для сериализации и десериализации. Однако Spark также поддерживает использование библиотеки сериализации Kryo.Производительность библиотеки сериализации Kryo намного выше, чем у библиотеки сериализации Java. Согласно официальному представлению, механизм сериализации Kryo имеет примерно в 10 раз более высокую производительность, чем механизм сериализации Java. Причина, по которой Spark не использует Kryo в качестве библиотеки сериализации по умолчанию, заключается в том, что Kryo требует, чтобы все пользовательские типы, которые необходимо сериализовать, были зарегистрированы, поэтому этот метод более проблематичен для разработчиков.

Ниже приведен пример кода с использованием Kryo, нам нужно только установить класс сериализации, а затем зарегистрировать пользовательский тип для сериализации (например, тип внешней переменной, используемый в операторной функции, пользовательский тип как универсальный тип RDD и т. д. .) :

// 创建SparkConf对象。
val conf = new SparkConf().setMaster(...).setAppName(...)
// 设置序列化器为KryoSerializer。
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
// 注册要序列化的自定义类型。
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))

результат

После всех видов отладки, оптимизированных для достижения нашей конечной цели, Spark Streaming может извлекать данные Kafka в режиме реального времени и может оставаться стабильным, как показано ниже:

Конечно, разные сценарии приложений будут иметь разную графику.Это диаграмма мониторинга после оптимизации и стабилизации статистики частоты слов в этой статье.Мы видим, что на столбчатой ​​диаграмме времени обработки есть стабильная пунктирная линия, а большинство пакетов может быть в этой пунктирной линии Следующая обработка завершена, что указывает на то, что общая потоковая передача Spark работает стабильно.

использованная литература

  1. Spark Streaming + Kafka Integration Guide
  2. Разница между анализом исходного кода DirectStream и Stream-SparkStreaming 02
  3. Руководство по оптимизации производительности Spark — основы
  4. Настройка производительности Spark
  5. How to write to Kafka from Spark Streaming


Раздел благосостояния:

«Дорога к большим данным становится Богом»

«Сотни загрузок TJava и ресурсов больших данных»