Как Spark Streaming управляет смещениями Kafka

Kafka Spark

Перевод инженерного блога Cloudera: управление смещением для Apache Kafka с потоковой передачей Apache Spark

Приложения Spark Streaming часто получают информацию от Kafka. Чтение непрерывных данных из Kafka будет иметь много преимуществ, таких как хорошая производительность и скорость. Однако пользователи должны управлять смещениями Kafka, чтобы гарантировать, что приложение Spark Streaming сможет правильно считывать данные после его смерти. В этой статье мы обсудим, как управлять смещениями в будущем.


содержание

  • Обзор управления смещением
  • Сохранение смещений во внешней системе
    • Spark Streaming checkpoints
    • Сохранить смещения в HBase
    • Сохранить смещения в ZooKeeper
    • Хранить смещения в самой Kafka
    • другие методы
  • Суммировать

Обзор управления смещением

Spark Streaming интегрируется с Kafka, чтобы пользователи могли читать данные из одной или нескольких тем из Kafka. Тема Kafka содержит несколько разделов, в которых хранятся сообщения. Сообщения в каждом разделе хранятся последовательно и помечаются смещением (которое можно рассматривать как позицию). Разработчик может управлять позицией чтения данных с помощью смещения в своем приложении Spark Streaming, но для этого требуется хороший механизм управления смещением.

Управление смещениями очень полезно для обеспечения согласованности данных на протяжении всего жизненного цикла потоковых приложений. Например, если смещение не сохраняется в постоянной базе данных до того, как приложение остановится или завершит работу с ошибкой, диапазоны смещений будут потеряны. Кроме того, если смещение, прочитанное каждым разделом, не сохраняется, то Spark Streaming не может продолжить чтение сообщений с того места, где оно было отключено (остановлено или вызвано ошибкой).


Spark-Streaming-flow-for-offsets.png

На приведенной выше диаграмме показан обычный процесс смещения управления приложениями Spark Streaming. Смещениями можно управлять несколькими способами, но обычно следуйте приведенным ниже шагам:

  • При инициализации Direct DStream необходимо указать смещение, содержащее каждый раздел каждой темы, чтобы Direct DStream мог считывать данные из указанного расположения.
    • offsets — это позиция смещения, сохраненная на шаге 4.
  • читать и обрабатывать сообщения
  • Сохранение данных результата после обработки
    • с пунктирным кругомСохранить и зафиксировать смещениеПросто подчеркните, что пользователи могут выполнять ряд действий, чтобы удовлетворить свои более строгие семантические требования. Сюда входят идемпотентные операции и сохранение смещений посредством атомарных операций.
  • Наконец, сохраните смещения во внешней постоянной базе данных, такой как HBase, Kafka, HDFS и ZooKeeper.

Различные решения могут быть объединены в соответствии с различными потребностями бизнеса. Spark имеет удобную парадигму программирования, которая позволяет пользователю контролировать сохранение смещений. Внимательно рассмотрите следующую ситуацию: приложение Spark Streaming считывает данные из Kafka, обрабатывает или преобразует данные, а затем отправляет данные в другую тему или другую систему (например, другие системы обмена сообщениями, Hbase, Solr, СУБД и т. д.). В этом примере мы рассматриваем только сообщения, обработанные и отправленные в другие системы.

Сохранение смещений во внешней системе

В этой главе в будущем мы рассмотрим различные варианты внешнего постоянного хранилища.

Чтобы лучше понять, о чем говорится в этой главе, давайте сделаем несколько предварительных выводов. Если используете spark-streaming-kafka-0-10, то рекомендуем ставитьenable.auto.commitУстановите значение «ложь». Эта конфигурация действует только в этой версии,enable.auto.commitЕсли установлено значение true, это означает, что смещения будут следоватьauto.commit.interval.msНастроенный интервал автоматически периодически отправляется в Kafka. В Spark Streaming установка для этого параметра значения true приведет к автоматической фиксации приложения Spark после чтения данных из kafka, а не после обработки данных, чего мы не хотим. Поэтому, чтобы лучше контролировать фиксацию смещений, мы рекомендуем поставитьenable.auto.commitУстановите значение «ложь».

Spark Streaming checkpoints

Создание контрольных точек с помощью Spark Streaming — это простейший метод хранения, который легко реализовать в среде Spark. Контрольные точки Spark Streaming предназначены для сохранения состояния приложения, и мы перенаправим его в HDFS, чтобы иметь возможность восстанавливать данные после сбоев.

Выполнение операции контрольной точки в Kafka Stream делает смещение сохраненным в контрольной точке.Если приложение зависает, функция приложения SparkStreamig может начать чтение сообщений с сохраненного смещения. Однако, если вы обновляете приложение Spark Streaming, то, извините, данные, которые не могут быть проверены, не могут быть использованы, поэтому этот механизм не является надежным, особенно в строгой производственной среде, мы не рекомендуем этот метод.

Сохранить смещения в HBase

HBase можно использовать в качестве надежной внешней базы данных для сохранения смещений. Сохраняя смещения во внешней системе, приложения Spark Streaming могут повторно считывать или воспроизводить любые данные, все еще хранящиеся в Kafka.

Согласно шаблону проектирования HBase, он позволяет приложениям хранить несколько приложений Spark Streaming и несколько тем Kafka в таблице со структурой строки и столбца. В этом примере таблица названа с именем темы, идентификатором группы потребителей и Spark Streaming.batchTime.milliSecondsКак rowkey для уникальной идентификации. несмотря на то чтоbatchTime.milliSecondsНе требуется, но дает лучшее представление исторических смещений для каждой партии. Таблица будет хранить накопленные данные за 30 дней и будет удалена, если она превысит 30 дней. Ниже приведен DDL и структура для создания таблицы.

DDL
create 'stream_kafka_offsets', {NAME=>'offsets', TTL=>2592000}

RowKey Layout:
row:              <TOPIC_NAME>:<GROUP_ID>:<EPOCH_BATCHTIME_MS>
column family:    offsets
qualifier:        <PARTITION_ID>
value:            <OFFSET_ID>

Для каждого пакета сообщений используйтеsaveOffsets()Сохраните смещения, считанные из указанной темы, в HBase.

/*
Save offsets for each batch into HBase
*/

def saveOffsets(TOPIC_NAME:String,GROUP_ID:String,offsetRanges:Array[OffsetRange],

                hbaseTableName:String,batchTime: org.apache.spark.streaming.Time) ={

  val hbaseConf = HBaseConfiguration.create()

  hbaseConf.addResource("src/main/resources/hbase-site.xml")

  val conn = ConnectionFactory.createConnection(hbaseConf)

  val table = conn.getTable(TableName.valueOf(hbaseTableName))

  val rowKey = TOPIC_NAME + ":" + GROUP_ID + ":" +String.valueOf(batchTime.milliseconds)

  val put = new Put(rowKey.getBytes)

  for(offset <- offsetRanges){

    put.addColumn(Bytes.toBytes("offsets"),Bytes.toBytes(offset.partition.toString),

          Bytes.toBytes(offset.untilOffset.toString))

  }

  table.put(put)

  conn.close()

}

Перед выполнением потоковой задачи сначала используйтеgetLastCommittedOffsets()чтобы прочитать смещения, сохраненные в конце последней задачи из HBase. Этот метод будет использовать обычную схему для возврата смещения раздела темы kafka.

  • Сценарий 1. Задача Streaming запускается в первый раз, получает количество разделов для данной темы от zookeeper, затем устанавливает смещение каждого раздела на 0 и возвращает результат.

  • Сценарий 2: Потоковая задача, которая выполнялась в течение длительного времени, останавливается, и в заданную тему добавляется новый раздел.Метод обработки заключается в получении количества разделов для заданной темы из zookeeper.Для всех старых разделов offset по-прежнему использует то же значение в HBase.Save, и установите offset равным 0 для новых разделов.

  • Сценарий 3: Задача Streaming останавливается после долгого выполнения и в разделе темы нет изменений.В этом случае смещение, сохраненное в HBase, можно использовать напрямую.

После запуска приложения Spark Streaming, если в тему добавляется новый раздел, приложение может только читать данные в старом разделе, но не может читать новый. Поэтому, если вы хотите прочитать данные в новом разделе, вам необходимо перезапустить приложение Spark Streaming.

/* 
Returns last committed offsets for all the partitions of a given topic from HBase in  following  cases.
*/    

def getLastCommittedOffsets(TOPIC_NAME:String,GROUP_ID:String,hbaseTableName:String,

zkQuorum:String,zkRootDir:String,sessionTimeout:Int,connectionTimeOut:Int):Map[TopicPartition,Long] ={

  val hbaseConf = HBaseConfiguration.create()

  val zkUrl = zkQuorum+"/"+zkRootDir

  val zkClientAndConnection = ZkUtils.createZkClientAndConnection(zkUrl, sessionTimeout,connectionTimeOut)

  val zkUtils = new ZkUtils(zkClientAndConnection._1, zkClientAndConnection._2,false)

  val zKNumberOfPartitionsForTopic = zkUtils.getPartitionsForTopics(Seq(TOPIC_NAME)).get(TOPIC_NAME).toList.head.size

  zkClientAndConnection._1.close()

  zkClientAndConnection._2.close()

  //Connect to HBase to retrieve last committed offsets

  val conn = ConnectionFactory.createConnection(hbaseConf)

  val table = conn.getTable(TableName.valueOf(hbaseTableName))

  val startRow = TOPIC_NAME + ":" + GROUP_ID + ":" +

                                              String.valueOf(System.currentTimeMillis())

  val stopRow = TOPIC_NAME + ":" + GROUP_ID + ":" + 0

  val scan = new Scan()

  val scanner = table.getScanner(scan.setStartRow(startRow.getBytes).setStopRow(

                                                   stopRow.getBytes).setReversed(true))

  val result = scanner.next()

  var hbaseNumberOfPartitionsForTopic = 0 //Set the number of partitions discovered for a topic in HBase to 0

  if (result != null){

  //If the result from hbase scanner is not null, set number of partitions from hbase

  to the  number of cells

    hbaseNumberOfPartitionsForTopic = result.listCells().size()

  }

val fromOffsets = collection.mutable.Map[TopicPartition,Long]()

  if(hbaseNumberOfPartitionsForTopic == 0){

    // initialize fromOffsets to beginning

    for (partition <- 0 to zKNumberOfPartitionsForTopic-1){

      fromOffsets += (new TopicPartition(TOPIC_NAME,partition) -> 0)

    }

  } else if(zKNumberOfPartitionsForTopic > hbaseNumberOfPartitionsForTopic){

  // handle scenario where new partitions have been added to existing kafka topic

    for (partition <- 0 to hbaseNumberOfPartitionsForTopic-1){

      val fromOffset = Bytes.toString(result.getValue(Bytes.toBytes("offsets"),

                                        Bytes.toBytes(partition.toString)))

      fromOffsets += (new TopicPartition(TOPIC_NAME,partition) -> fromOffset.toLong)

    }

    for (partition <- hbaseNumberOfPartitionsForTopic to zKNumberOfPartitionsForTopic-1){

      fromOffsets += (new TopicPartition(TOPIC_NAME,partition) -> 0)

    }

  } else {

  //initialize fromOffsets from last run

    for (partition <- 0 to hbaseNumberOfPartitionsForTopic-1 ){

      val fromOffset = Bytes.toString(result.getValue(Bytes.toBytes("offsets"),

                                        Bytes.toBytes(partition.toString)))

      fromOffsets += (new TopicPartition(TOPIC_NAME,partition) -> fromOffset.toLong)

    }

  }

  scanner.close()

  conn.close()

  fromOffsets.toMap

}

Когда мы получим смещения, мы сможем создать Kafka Direct DStream.

val fromOffsets= getLastCommittedOffsets(topic,consumerGroupID,hbaseTableName,zkQuorum,

                                   zkKafkaRootDir,zkSessionTimeOut,zkConnectionTimeOut)

val inputDStream = KafkaUtils.createDirectStream[String,String](ssc,PreferConsistent,

                           Assign[String, String](fromOffsets.keys,kafkaParams,fromOffsets))

Вызывается после завершения обработки данных для этого пакета.saveOffsets()Сохранить смещения.

/*
For each RDD in a DStream apply a map transformation that processes the message.
*/

inputDStream.foreachRDD((rdd,batchTime) => {

  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

  offsetRanges.foreach(offset => println(offset.topic,offset.partition, offset.fromOffset,

                        offset.untilOffset))

  val newRDD = rdd.map(message => processMessage(message))

  newRDD.count()

  saveOffsets(topic,consumerGroupID,offsetRanges,hbaseTableName,batchTime)

})

Вы можете перейти на HBase, чтобы просмотреть данные смещения по различным темам и группам потребителей.

hbase(main):001:0> scan 'stream_kafka_offsets', {REVERSED => true}

ROW                                                COLUMN+CELL

 kafkablog2:groupid-1:1497628830000                column=offsets:0, timestamp=1497628832448, value=285

 kafkablog2:groupid-1:1497628830000                column=offsets:1, timestamp=1497628832448, value=285

 kafkablog2:groupid-1:1497628830000                column=offsets:2, timestamp=1497628832448, value=285

 kafkablog2:groupid-1:1497628770000                column=offsets:0, timestamp=1497628773773, value=225

 kafkablog2:groupid-1:1497628770000                column=offsets:1, timestamp=1497628773773, value=225

 kafkablog2:groupid-1:1497628770000                column=offsets:2, timestamp=1497628773773, value=225

 kafkablog1:groupid-2:1497628650000                column=offsets:0, timestamp=1497628653451, value=165

 kafkablog1:groupid-2:1497628650000                column=offsets:1, timestamp=1497628653451, value=165

 kafkablog1:groupid-2:1497628650000                column=offsets:2, timestamp=1497628653451, value=165

 kafkablog1:groupid-1:1497628530000                column=offsets:0, timestamp=1497628533108, value=120

 kafkablog1:groupid-1:1497628530000                column=offsets:1, timestamp=1497628533108, value=120

 kafkablog1:groupid-1:1497628530000                column=offsets:2, timestamp=1497628533108, value=120

4 row(s) in 0.5030 seconds

hbase(main):002:0>

Используется следующая версия примера кода

GroupID ArtifactID Version
org.apache.spark spark-streaming_2.11 2.1.0.cloudera1
org.apache.spark spark-streaming-kafka-0-10_2.11 2.1.0.cloudera1

github примера кода

Сохранить смещения в ZooKeeper

Это также более надежный способ использования Zookeeper для хранения смещений в приложениях Kafka, подключенных к Spark Streaming.

В этой схеме задача Spark Streaming будет обращаться к Zookeeper для чтения смещений каждого раздела при запуске. Если появится новый раздел, его смещение будет установлено в начале. После обработки каждого пакета данных пользователь должен иметь возможность выбрать смещение или последнее смещение для хранения обработанных данных. Кроме того, новый потребитель сохранит смещение в ZooKeeper, используя тот же формат, что и старый потребительский API Kafka. Таким образом, любой инструмент, который отслеживает или контролирует смещение Kafka в Zookeeper, будет по-прежнему работать.

Инициализируйте соединение Zookeeper, чтобы получить смещения от Zookeeper.

val zkClientAndConnection = ZkUtils.createZkClientAndConnection(zkUrl, sessionTimeout, connectionTimeout)

val zkUtils = new ZkUtils(zkClientAndConnection._1, zkClientAndConnection._2, false)

Method for retrieving the last offsets stored in ZooKeeper of the consumer group and topic list.

def readOffsets(topics: Seq[String], groupId:String):

 Map[TopicPartition, Long] = {

 val topicPartOffsetMap = collection.mutable.HashMap.empty[TopicPartition, Long]

 val partitionMap = zkUtils.getPartitionsForTopics(topics)

 // /consumers/<groupId>/offsets/<topic>/

 partitionMap.foreach(topicPartitions => {

   val zkGroupTopicDirs = new ZKGroupTopicDirs(groupId, topicPartitions._1)

   topicPartitions._2.foreach(partition => {

     val offsetPath = zkGroupTopicDirs.consumerOffsetDir + "/" + partition

     try {

       val offsetStatTuple = zkUtils.readData(offsetPath)

       if (offsetStatTuple != null) {

         LOGGER.info("retrieving offset details - topic: {}, partition: {}, offset: {}, node path: {}", Seq[AnyRef](topicPartitions._1, partition.toString, offsetStatTuple._1, offsetPath): _*)

         topicPartOffsetMap.put(new TopicPartition(topicPartitions._1, Integer.valueOf(partition)),

           offsetStatTuple._1.toLong)

       }

     } catch {

       case e: Exception =>

         LOGGER.warn("retrieving offset details - no previous node exists:" + " {}, topic: {}, partition: {}, node path: {}", Seq[AnyRef](e.getMessage, topicPartitions._1, partition.toString, offsetPath): _*)

         topicPartOffsetMap.put(new TopicPartition(topicPartitions._1, Integer.valueOf(partition)), 0L)

     }

   })

 })

 topicPartOffsetMap.toMap

}

Используйте полученные смещения для инициализации Kafka Direct DStream.

val inputDStream = KafkaUtils.createDirectStream(ssc, PreferConsistent, ConsumerStrategies.Subscribe[String,String](topics, kafkaParams, fromOffsets))

Вот как получить набор смещений из ZooKeeper

Примечание. Путь хранения смещения Kafka в ZooKeeper — /consumers/[groupId]/offsets/topic/[partitionId], а сохраненное значение — это offset

def persistOffsets(offsets: Seq[OffsetRange], groupId: String, storeEndOffset: Boolean): Unit = {

 offsets.foreach(or => {

   val zkGroupTopicDirs = new ZKGroupTopicDirs(groupId, or.topic);

   val acls = new ListBuffer[ACL]()

   val acl = new ACL

   acl.setId(ANYONE_ID_UNSAFE)

   acl.setPerms(PERMISSIONS_ALL)

   acls += acl

   val offsetPath = zkGroupTopicDirs.consumerOffsetDir + "/" + or.partition;

   val offsetVal = if (storeEndOffset) or.untilOffset else or.fromOffset

   zkUtils.updatePersistentPath(zkGroupTopicDirs.consumerOffsetDir + "/"

     + or.partition, offsetVal + "", JavaConversions.bufferAsJavaList(acls))

   LOGGER.debug("persisting offset details - topic: {}, partition: {}, offset: {}, node path: {}", Seq[AnyRef](or.topic, or.partition.toString, offsetVal.toString, offsetPath): _*)

 })

}

Кафка сама

Apache Spark 2.1.x и spark-streaming-kafka-0-10 используют новый потребительский API, который называется API асинхронной отправки. Вы можете использовать его после того, как убедитесь, что ваши обработанные данные хранятся правильно.commitAsync API(API асинхронной фиксации) для фиксации смещений в Kafka. Новый потребительский API будет использовать идентификатор группы потребителей в качестве уникального идентификатора для отправки смещений.

Отправить смещения в Kafka

stream.foreachRDD { rdd =>

  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

  // some time later, after outputs have completed

  stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)

}

может прибытьstreaming-kafka-0-10-integrationузнать больше в

Примечание: commitAsync() интегрирован в версию kafka-0-10 Spark Streaming, и документация Spark напоминает, что это все еще экспериментальный API и есть возможность модификации.

другие методы

Стоит отметить, что вы также можете хранить смещения в HDFS. Но хранение смещений в HDFS не является популярным способом, потому что HDFS имеет немного большую задержку для ZooKeeper и Hbase. Кроме того, сохранение смещения каждого пакета данных в HDFS также вызовет проблему небольших файлов.

Не управляет смещениями

Для использования Spark Streaming управление смещениями не требуется. Например, таким приложениям, как мониторинг в реальном времени, нужны только текущие данные, и им не нужно управлять смещениями, чтобы гарантировать, что данные не будут потеряны. В этом случае вам вообще не нужно управлять смещениями, старые потребители kafka могутauto.offset.resetУстановите наибольшую или наименьшую величину, а новые потребители — на самую раннюю или последнюю.

если вы будетеauto.offset.resetУстановите наименьшее (самое раннее), тогда задача будет считывать данные с первого смещения, что эквивалентно повторному воспроизведению всех данных. Этот параметр заставит вашу задачу снова прочитать данные, которые все еще существуют в теме, при перезапуске задачи. Это будет зависеть от периода хранения вашего сообщения, чтобы определить, будете ли вы повторно использовать его.

И наоборот, если вы будетеauto.offset.resetУстановите наибольшую (последнюю), тогда ваше приложение начнет чтение с самого последнего смещения при запуске, что приведет к потере данных. Это будет зависеть от строгости и семантики данных вашего приложения и может быть жизнеспособным решением.

Суммировать

Способ, который мы обсуждали выше для управления смещениями, поможет вам эффективно управлять смещениями в вашем приложении Spark Streaming. Эти методы могут помочь пользователям лучше справляться со сценариями сбоя приложений и восстановления данных в приложениях, которые непрерывно вычисляют и хранят данные.

Дополнительные сведения см.Spark 2 Kafka Integration or Spark Streaming + Kafka Integration Guide.

Справочная документация по интеграции Spark с Kafka