Управление смещением Kafka для Spark Streaming

Kafka Spark

Эта статья в основном знакомит с содержанием, связанным с использованием сообщений Kafka при разработке приложений Spark Streaming. В статье основное внимание уделяется настройке среды разработки и реализации ручного управления смещениями Kafka.

1. Среда разработки

1. Версия компонента

  • Версия кластера CDH: 6.0.1
  • Искра версия: 2.2.0
  • Кафка версия: 1.0.1

2. Зависимости Maven

<!-- scala -->
<dependency>
  <groupId>org.scala-lang</groupId>
  <artifactId>scala-library</artifactId>
  <version>2.11.8</version>
</dependency>

<!-- spark 基础依赖 -->
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-core_2.11</artifactId>
  <version>2.2.0</version>
</dependency>

<!-- spark-streaming 相关依赖 -->
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-streaming_2.11</artifactId>
  <version>2.2.0</version>
</dependency>

<!-- spark-streaming-kafka 相关依赖 -->
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
  <version>2.2.0</version>
</dependency>

<!-- zookeeper 相关依赖 -->
<dependency>
  <groupId>org.apache.zookeeper</groupId>
  <artifactId>zookeeper</artifactId>
  <version>3.4.5-cdh6.0.1</version>
</dependency>

3. компиляция Скала

Добавьте плагин компиляции scala в плагины в узле сборки pom.xml.

<plugin>
    <groupId>org.scala-tools</groupId>
    <artifactId>maven-scala-plugin</artifactId>
    <executions>
      <execution>
        <goals>
          <goal>compile</goal>
          <goal>testCompile</goal>
        </goals>
      </execution>
    </executions>
    <configuration>
      <scalaVersion>${scala.version}</scalaVersion>
      <args>
        <arg>-target:jvm-1.5</arg>
      </args>
    </configuration>
</plugin>

Заявление об упаковке Maven:mvn clean scala:compile compile package

4. Меры предосторожности при упаковке

Так как spark, spark-streaming, zookeeper и т. д. являются необходимыми компонентами в кластерах больших данных, соответствующие зависимости не нужно упаковывать в окончательный пакет jar, а его область действия может быть установлена ​​как «предоставленная»; в противном случае окончательный пакет jar будет довольно большим.

2. Смещение Кафки

1. Смещение (смещение)

Смещение здесь относится к смещению потребителя kafka.До версии Kafka 0.9 смещение потребителя по умолчанию сохранялось в zookeeper (/consumers/<group.id>/offsets/<topic>/<partitionId>Поэтому необходимо указать при инициализации потребителей.zookeeper.hosts.

При постоянном применении потребителей Kafka в реальных сценариях сообщество обнаружило, что старой версии потребителей нецелесообразно отправлять смещения в ZooKeeper. ZooKeeper по сути является просто компонентом службы координации, он не подходит в качестве компонента для хранения информации о перемещениях, ведь частые и высокопараллельные операции чтения/записи — это не то, в чем ZooKeeper хорош. Итак, начиная с версии 0.9 потребитель отправляет смещение во внутреннюю тему в Kafka (__consumer_offsets), тема по умолчанию имеет 50 разделов, каждый с 3 репликами.

2. Семантика доставки сообщений

  • at-most-once: не более одного раза сообщение может быть потеряно, но не будет повторно обработано;
  • at-least-once: хотя бы один раз сообщение не теряется, но может быть обработано несколько раз;
  • exactly-once: Ровно один раз сообщение должно быть обработано и только один раз.

Если потребитель отправляет смещение до потребления сообщения, то может быть достигнуто не более одного раза, потому что, если потребитель выйдет из строя между отправкой смещения и потреблением сообщения, потребитель начнет потреблять с новой позиции смещения после перезапуска. , и предыдущее сообщение будет использовано, потеряно; ​​и наоборот, семантика по крайней мере один раз может быть достигнута, если смещение фиксации после потребления сообщения. Поскольку Kafka не может гарантировать, что эти две операции могут быть выполнены в одной и той же транзакции, Kafka по умолчанию обеспечивает семантику обработки хотя бы один раз.

3. Способ подачи компенсации

По умолчанию потребитель автоматически отправляет смещение, а интервал автоматической отправки составляет 5 секунд, что можно установить, настроивauto.commit.interval.msПараметры могут управлять интервалом автоматической фиксации. Преимущество автоматической фиксации смещения заключается в том, что она снижает затраты пользователя на разработку, так что пользователю не приходится заниматься фиксацией смещения самостоятельно; пользователь может отправить с помощью ручного смещения).

Так называемая ручная отправка смещения заключается в том, что пользователь определяет, когда сообщение фактически обрабатывается, и может отправить перемещение.Пользователь может убедиться, что фактически обрабатывается только сообщение, а затем отправить перемещение. Это время не может быть гарантировано, если используется автоматическая фиксация смещения, поэтому в этом случае необходимо использовать ручную фиксацию смещения. Настроить смещение фиксации вручную очень просто, просто установите его при создании KafkaConsumer.enable.auto.commit=false, а затем вызовите метод commitSync или commitAsync.

3. Используйте Zookeeper для управления смещениями Kafka

1. Преимущества управляемых смещений Zookeeper

Хотя больше нет необходимости использовать zookeeper для управления смещениями в новой версии kafka, использование zookeeper для управления смещениями имеет следующие преимущества по сравнению с собственным управлением смещениями Kafka:

  1. Информацию о смещении можно легко просмотреть с помощью инструмента управления зоопарком;
  2. Сообщение можно прочитать с самого начала без изменения groupId;
  3. В особых случаях информацию о смещении можно изменить вручную.

С помощью инструмента управления zookeeper информация любого узла может быть изменена и удалена.Если вы хотите прочитать сообщение с начала, вам нужно только удалить данные узла в zk.

2. Реализация управления смещением Zookeeper

import org.I0Itec.zkclient.ZkClient
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.kafka010.OffsetRange

import scala.collection.JavaConverters._

class ZkKafkaOffset(getClient: () => ZkClient, getZkRoot : () => String) {

  // 定义为 lazy 实现了懒汉式的单例模式,解决了序列化问题,方便使用 broadcast
  lazy val zkClient: ZkClient = getClient()
  lazy val zkRoot: String = getZkRoot()

  // offsetId = md5(groupId+join(topics))
  // 初始化偏移量的 zk 存储路径 zkRoot
  def initOffset(offsetId: String) : Unit = {
    if(!zkClient.exists(zkRoot)){
      zkClient.createPersistent(zkRoot, true)
    }
  }

  // 从 zkRoot 读取偏移量信息
  def getOffset(): Map[TopicPartition, Long] = {
    val keys = zkClient.getChildren(zkRoot)
    var initOffsetMap: Map[TopicPartition, Long] = Map()
    if(!keys.isEmpty){
      for (k:String <- keys.asScala) {
        val ks = k.split("!")
        val value:Long = zkClient.readData(zkRoot + "/" + k)
        initOffsetMap += (new TopicPartition(ks(0), Integer.parseInt(ks(1))) -> value)
      }
    }
    initOffsetMap
  }

  // 根据单条消息,更新偏移量信息
  def updateOffset(consumeRecord: ConsumerRecord[String, String]): Boolean = {
    val path = zkRoot + "/" + consumeRecord.topic + "!" + consumeRecord.partition
    zkClient.writeData(path, consumeRecord.offset())
    true
  }

  // 消费消息前,批量更新偏移量信息
  def updateOffset(offsetRanges: Array[OffsetRange]): Boolean = {
    for (offset: OffsetRange <- offsetRanges) {
      val path = zkRoot + "/" + offset.topic + "!" + offset.partition
      if(!zkClient.exists(path)){
        zkClient.createPersistent(path, offset.fromOffset)
      }
      else{
        zkClient.writeData(path, offset.fromOffset)
      }
    }
    true
  }

  // 消费消息后,批量提交偏移量信息
  def commitOffset(offsetRanges: Array[OffsetRange]): Boolean = {
    for (offset: OffsetRange <- offsetRanges) {
      val path = zkRoot + "/" + offset.topic + "!" + offset.partition
      if(!zkClient.exists(path)){
        zkClient.createPersistent(path, offset.untilOffset)
      }
      else{
        zkClient.writeData(path, offset.untilOffset)
      }
    }
    true
  }

  def finalize(): Unit = {
    zkClient.close()
  }
}

object ZkKafkaOffset{
  def apply(cong: SparkConf, offsetId: String): ZkKafkaOffset = {
    val getClient = () =>{
      val zkHost = cong.get("kafka.zk.hosts", "127.0.0.1:2181")
      new ZkClient(zkHost, 30000)
    }
    val getZkRoot = () =>{
      val zkRoot = "/kafka/ss/offset/" + offsetId
      zkRoot
    }
    new ZkKafkaOffset(getClient, getZkRoot)
  }
}

3. Spark Streaming использует сообщения Kafka

import scala.collection.JavaConverters._

object RtDataLoader {
  def main(args: Array[String]): Unit = {
    // 从配置文件读取 kafka 配置信息
    val props = new Props("xxx.properties")
    val groupId = props.getStr("groupId", "")
    if(StrUtil.isBlank(groupId)){
      StaticLog.error("groupId is empty")
      return
    }
    val kfkServers = props.getStr("kfk_servers")
    if(StrUtil.isBlank(kfkServers)){
      StaticLog.error("bootstrap.servers is empty")
      return
    }
    val topicStr = props.getStr("topics")
    if(StrUtil.isBlank(kfkServers)){
      StaticLog.error("topics is empty")
      return
    }

    // KAFKA 配置设定
    val topics = topicStr.split(",")
    val kafkaConf = Map[String, Object](
      "bootstrap.servers" -> kfkServers,
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> groupId,
      "receive.buffer.bytes" -> (102400: java.lang.Integer),
      "max.partition.fetch.bytes" -> (5252880: java.lang.Integer),
      "auto.offset.reset" -> "earliest",
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )

    val conf = new SparkConf().setAppName("ss-kafka").setIfMissing("spark.master", "local[2]")

    // streaming 相关配置
    conf.set("spark.streaming.stopGracefullyOnShutdown","true")
    conf.set("spark.streaming.backpressure.enabled","true")
    conf.set("spark.streaming.backpressure.initialRate","1000")

    // 设置 zookeeper 连接信息
    conf.set("kafka.zk.hosts", props.getStr("zk_hosts", "sky-01:2181"))

    // 创建 StreamingContext
    val sc = new SparkContext(conf)
    sc.setLogLevel("WARN")
    val ssc = new StreamingContext(sc, Seconds(5))

    // 根据 groupId 和 topics 获取 offset
    val offsetId = SecureUtil.md5(groupId + topics.mkString(","))
    val kafkaOffset = ZkKafkaOffset(ssc.sparkContext.getConf, offsetId)
    kafkaOffset.initOffset(ssc, offsetId)
    val customOffset: Map[TopicPartition, Long] = kafkaOffset.getOffset(ssc)

    // 创建数据流
    var stream:InputDStream[ConsumerRecord[String, String]] = null
    if(topicStr.contains("*")) {
      StaticLog.warn("使用正则匹配读取 kafka 主题:" + topicStr)
      stream = KafkaUtils.createDirectStream[String, String](ssc,
        LocationStrategies.PreferConsistent,
        ConsumerStrategies.SubscribePattern[String, String](Pattern.compile(topicStr), kafkaConf, customOffset))
    }
    else {
      StaticLog.warn("待读取的 kafka 主题:" + topicStr)
      stream = KafkaUtils.createDirectStream[String, String](ssc,
        LocationStrategies.PreferConsistent,
        ConsumerStrategies.Subscribe[String, String](topics, kafkaConf, customOffset))
    }

    // 消费数据
    stream.foreachRDD(rdd => {
      // 消息消费前,更新 offset 信息
      val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      kafkaOffset.updateOffset(offsetRanges)
      
      //region 处理详情数据
      StaticLog.info("开始处理 RDD 数据!")
      //endregion
      
      // 消息消费结束,提交 offset 信息
      kafkaOffset.commitOffset(offsetRanges)
    })
    ssc.start()
    ssc.awaitTermination()
  }
}

4. Вопросы, требующие внимания

auto.offset.reset

заauto.offset.resetЛично рекомендую установить самое раннее.__consumer_offsetsСоответствующей информации о смещении нет, поэтому сообщение считывается с самого начала, при повторном запуске из-за__consumer_offsetsИнформация о смещении для потребления уже есть, поэтому она будет основываться на__consumer_offsetsИнформация о смещении, записанная в продолжении чтения данных.

Кроме того, для использования zookeeper для управления смещениями очень удобно просто удалить соответствующий узел, и данные можно будет прочитать с самого начала. Однако, если вы хотите читать данные из самого последнего места и вам не нужно читать старые сообщения, вы можете установить для него последнее.

Регулярная подписка на темы Кафки

Основываясь на темах обычной подписки, есть следующие преимущества:

  • Нет необходимости перечислять названия предметов, достаточно одной-двух тем, если их десятки, перечислять слишком хлопотно;
  • Можно добиться эффекта динамической подписки (будут читаться и новые регулярные темы).
stream = KafkaUtils.createDirectStream[String, String](ssc,
        LocationStrategies.PreferConsistent,
        ConsumerStrategies.SubscribePattern[String, String](Pattern.compile(topicStr), kafkaConf, customOffset))

Проблемы с сериализацией SparkStreaming

Каждый, кто разрабатывает программы SparkStreaming, столкнется с различными проблемами сериализации, проще говоря: переменные или объекты, используемые в драйвере, не нужно сериализовать, а переменные или объекты, передаваемые в экстрактор, нужно сериализовать. Поэтому рекомендуемая практика заключается в том, что лучше всего обрабатывать преобразование данных только в эксекторе, а обработанные результаты сохранять в драйвере.

stream.foreachRDD(rdd => {
  // driver 代码运行区域
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  kafkaOffset.updateOffset(offsetRanges)
  
  // exector 代码运行区域
  val resultRDD = rdd.map(xxxxxxxx)
  //endregion
  
  //对结果进行存储
  resultRDD.saveToES(xxxxxx)
  kafkaOffset.commitOffset(offsetRanges)
})

Некоторые концепции в статье взяты из "Кафки в действии", очень хорошая книга, рекомендую.


Any Code,Code Any!

Отсканируйте код, чтобы следовать «AnyCode», давайте вместе двигаться вперед по пути программирования.