Приложения Spark Streaming часто получают информацию от Kafka. Чтение непрерывных данных из Kafka будет иметь много преимуществ, таких как хорошая производительность и скорость. Однако пользователи должны управлять смещениями Kafka, чтобы гарантировать, что приложение Spark Streaming сможет правильно считывать данные после его смерти. В этой статье мы обсудим, как управлять смещениями в будущем.
содержание
- Обзор управления смещением
- Сохранение смещений во внешней системе
- Spark Streaming checkpoints
- Сохранить смещения в HBase
- Сохранить смещения в ZooKeeper
- Хранить смещения в самой Kafka
- другие методы
- Суммировать
Обзор управления смещением
Spark Streaming интегрируется с Kafka, чтобы пользователи могли читать данные из одной или нескольких тем из Kafka. Тема Kafka содержит несколько разделов, в которых хранятся сообщения. Сообщения в каждом разделе хранятся последовательно и помечаются смещением (которое можно рассматривать как позицию). Разработчик может управлять позицией чтения данных с помощью смещения в своем приложении Spark Streaming, но для этого требуется хороший механизм управления смещением.
Управление смещениями очень полезно для обеспечения согласованности данных на протяжении всего жизненного цикла потоковых приложений. Например, если смещение не сохраняется в постоянной базе данных до того, как приложение остановится или завершит работу с ошибкой, диапазоны смещений будут потеряны. Кроме того, если смещение, прочитанное каждым разделом, не сохраняется, то Spark Streaming не может продолжить чтение сообщений с того места, где оно было отключено (остановлено или вызвано ошибкой).
Spark-Streaming-flow-for-offsets.png
На приведенной выше диаграмме показан обычный процесс смещения управления приложениями Spark Streaming. Смещениями можно управлять несколькими способами, но обычно следуйте приведенным ниже шагам:
- При инициализации Direct DStream необходимо указать смещение, содержащее каждый раздел каждой темы, чтобы Direct DStream мог считывать данные из указанного расположения.
- offsets — это позиция смещения, сохраненная на шаге 4.
- читать и обрабатывать сообщения
- Сохранение данных результата после обработки
- с пунктирным кругомСохранить и зафиксировать смещениеПросто подчеркните, что пользователи могут выполнять ряд действий, чтобы удовлетворить свои более строгие семантические требования. Сюда входят идемпотентные операции и сохранение смещений посредством атомарных операций.
- Наконец, сохраните смещения во внешней постоянной базе данных, такой как HBase, Kafka, HDFS и ZooKeeper.
Различные решения могут быть объединены в соответствии с различными потребностями бизнеса. Spark имеет удобную парадигму программирования, которая позволяет пользователю контролировать сохранение смещений. Внимательно рассмотрите следующую ситуацию: приложение Spark Streaming считывает данные из Kafka, обрабатывает или преобразует данные, а затем отправляет данные в другую тему или другую систему (например, другие системы обмена сообщениями, Hbase, Solr, СУБД и т. д.). В этом примере мы рассматриваем только сообщения, обработанные и отправленные в другие системы.
Сохранение смещений во внешней системе
В этой главе в будущем мы рассмотрим различные варианты внешнего постоянного хранилища.
Чтобы лучше понять, о чем говорится в этой главе, давайте сделаем несколько предварительных выводов. Если используете spark-streaming-kafka-0-10, то рекомендуем ставитьenable.auto.commit
Установите значение «ложь». Эта конфигурация действует только в этой версии,enable.auto.commit
Если установлено значение true, это означает, что смещения будут следоватьauto.commit.interval.ms
Настроенный интервал автоматически периодически отправляется в Kafka. В Spark Streaming установка для этого параметра значения true приведет к автоматической фиксации приложения Spark после чтения данных из kafka, а не после обработки данных, чего мы не хотим. Поэтому, чтобы лучше контролировать фиксацию смещений, мы рекомендуем поставитьenable.auto.commit
Установите значение «ложь».
Spark Streaming checkpoints
Создание контрольных точек с помощью Spark Streaming — это простейший метод хранения, который легко реализовать в среде Spark. Контрольные точки Spark Streaming предназначены для сохранения состояния приложения, и мы перенаправим его в HDFS, чтобы иметь возможность восстанавливать данные после сбоев.
Выполнение операции контрольной точки в Kafka Stream делает смещение сохраненным в контрольной точке.Если приложение зависает, функция приложения SparkStreamig может начать чтение сообщений с сохраненного смещения. Однако, если вы обновляете приложение Spark Streaming, то, извините, данные, которые не могут быть проверены, не могут быть использованы, поэтому этот механизм не является надежным, особенно в строгой производственной среде, мы не рекомендуем этот метод.
Сохранить смещения в HBase
HBase можно использовать в качестве надежной внешней базы данных для сохранения смещений. Сохраняя смещения во внешней системе, приложения Spark Streaming могут повторно считывать или воспроизводить любые данные, все еще хранящиеся в Kafka.
Согласно шаблону проектирования HBase, он позволяет приложениям хранить несколько приложений Spark Streaming и несколько тем Kafka в таблице со структурой строки и столбца. В этом примере таблица названа с именем темы, идентификатором группы потребителей и Spark Streaming.batchTime.milliSeconds
Как rowkey для уникальной идентификации. несмотря на то чтоbatchTime.milliSeconds
Не требуется, но дает лучшее представление исторических смещений для каждой партии. Таблица будет хранить накопленные данные за 30 дней и будет удалена, если она превысит 30 дней. Ниже приведен DDL и структура для создания таблицы.
DDL
create 'stream_kafka_offsets', {NAME=>'offsets', TTL=>2592000}
RowKey Layout:
row: <TOPIC_NAME>:<GROUP_ID>:<EPOCH_BATCHTIME_MS>
column family: offsets
qualifier: <PARTITION_ID>
value: <OFFSET_ID>
Для каждого пакета сообщений используйтеsaveOffsets()
Сохраните смещения, считанные из указанной темы, в HBase.
/*
Save offsets for each batch into HBase
*/
def saveOffsets(TOPIC_NAME:String,GROUP_ID:String,offsetRanges:Array[OffsetRange],
hbaseTableName:String,batchTime: org.apache.spark.streaming.Time) ={
val hbaseConf = HBaseConfiguration.create()
hbaseConf.addResource("src/main/resources/hbase-site.xml")
val conn = ConnectionFactory.createConnection(hbaseConf)
val table = conn.getTable(TableName.valueOf(hbaseTableName))
val rowKey = TOPIC_NAME + ":" + GROUP_ID + ":" +String.valueOf(batchTime.milliseconds)
val put = new Put(rowKey.getBytes)
for(offset <- offsetRanges){
put.addColumn(Bytes.toBytes("offsets"),Bytes.toBytes(offset.partition.toString),
Bytes.toBytes(offset.untilOffset.toString))
}
table.put(put)
conn.close()
}
Перед выполнением потоковой задачи сначала используйтеgetLastCommittedOffsets()
чтобы прочитать смещения, сохраненные в конце последней задачи из HBase. Этот метод будет использовать обычную схему для возврата смещения раздела темы kafka.
-
Сценарий 1. Задача Streaming запускается в первый раз, получает количество разделов для данной темы от zookeeper, затем устанавливает смещение каждого раздела на 0 и возвращает результат.
-
Сценарий 2: Потоковая задача, которая выполнялась в течение длительного времени, останавливается, и в заданную тему добавляется новый раздел.Метод обработки заключается в получении количества разделов для заданной темы из zookeeper.Для всех старых разделов offset по-прежнему использует то же значение в HBase.Save, и установите offset равным 0 для новых разделов.
-
Сценарий 3: Задача Streaming останавливается после долгого выполнения и в разделе темы нет изменений.В этом случае смещение, сохраненное в HBase, можно использовать напрямую.
После запуска приложения Spark Streaming, если в тему добавляется новый раздел, приложение может только читать данные в старом разделе, но не может читать новый. Поэтому, если вы хотите прочитать данные в новом разделе, вам необходимо перезапустить приложение Spark Streaming.
/*
Returns last committed offsets for all the partitions of a given topic from HBase in following cases.
*/
def getLastCommittedOffsets(TOPIC_NAME:String,GROUP_ID:String,hbaseTableName:String,
zkQuorum:String,zkRootDir:String,sessionTimeout:Int,connectionTimeOut:Int):Map[TopicPartition,Long] ={
val hbaseConf = HBaseConfiguration.create()
val zkUrl = zkQuorum+"/"+zkRootDir
val zkClientAndConnection = ZkUtils.createZkClientAndConnection(zkUrl, sessionTimeout,connectionTimeOut)
val zkUtils = new ZkUtils(zkClientAndConnection._1, zkClientAndConnection._2,false)
val zKNumberOfPartitionsForTopic = zkUtils.getPartitionsForTopics(Seq(TOPIC_NAME)).get(TOPIC_NAME).toList.head.size
zkClientAndConnection._1.close()
zkClientAndConnection._2.close()
//Connect to HBase to retrieve last committed offsets
val conn = ConnectionFactory.createConnection(hbaseConf)
val table = conn.getTable(TableName.valueOf(hbaseTableName))
val startRow = TOPIC_NAME + ":" + GROUP_ID + ":" +
String.valueOf(System.currentTimeMillis())
val stopRow = TOPIC_NAME + ":" + GROUP_ID + ":" + 0
val scan = new Scan()
val scanner = table.getScanner(scan.setStartRow(startRow.getBytes).setStopRow(
stopRow.getBytes).setReversed(true))
val result = scanner.next()
var hbaseNumberOfPartitionsForTopic = 0 //Set the number of partitions discovered for a topic in HBase to 0
if (result != null){
//If the result from hbase scanner is not null, set number of partitions from hbase
to the number of cells
hbaseNumberOfPartitionsForTopic = result.listCells().size()
}
val fromOffsets = collection.mutable.Map[TopicPartition,Long]()
if(hbaseNumberOfPartitionsForTopic == 0){
// initialize fromOffsets to beginning
for (partition <- 0 to zKNumberOfPartitionsForTopic-1){
fromOffsets += (new TopicPartition(TOPIC_NAME,partition) -> 0)
}
} else if(zKNumberOfPartitionsForTopic > hbaseNumberOfPartitionsForTopic){
// handle scenario where new partitions have been added to existing kafka topic
for (partition <- 0 to hbaseNumberOfPartitionsForTopic-1){
val fromOffset = Bytes.toString(result.getValue(Bytes.toBytes("offsets"),
Bytes.toBytes(partition.toString)))
fromOffsets += (new TopicPartition(TOPIC_NAME,partition) -> fromOffset.toLong)
}
for (partition <- hbaseNumberOfPartitionsForTopic to zKNumberOfPartitionsForTopic-1){
fromOffsets += (new TopicPartition(TOPIC_NAME,partition) -> 0)
}
} else {
//initialize fromOffsets from last run
for (partition <- 0 to hbaseNumberOfPartitionsForTopic-1 ){
val fromOffset = Bytes.toString(result.getValue(Bytes.toBytes("offsets"),
Bytes.toBytes(partition.toString)))
fromOffsets += (new TopicPartition(TOPIC_NAME,partition) -> fromOffset.toLong)
}
}
scanner.close()
conn.close()
fromOffsets.toMap
}
Когда мы получим смещения, мы сможем создать Kafka Direct DStream.
val fromOffsets= getLastCommittedOffsets(topic,consumerGroupID,hbaseTableName,zkQuorum,
zkKafkaRootDir,zkSessionTimeOut,zkConnectionTimeOut)
val inputDStream = KafkaUtils.createDirectStream[String,String](ssc,PreferConsistent,
Assign[String, String](fromOffsets.keys,kafkaParams,fromOffsets))
Вызывается после завершения обработки данных для этого пакета.saveOffsets()
Сохранить смещения.
/*
For each RDD in a DStream apply a map transformation that processes the message.
*/
inputDStream.foreachRDD((rdd,batchTime) => {
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
offsetRanges.foreach(offset => println(offset.topic,offset.partition, offset.fromOffset,
offset.untilOffset))
val newRDD = rdd.map(message => processMessage(message))
newRDD.count()
saveOffsets(topic,consumerGroupID,offsetRanges,hbaseTableName,batchTime)
})
Вы можете перейти на HBase, чтобы просмотреть данные смещения по различным темам и группам потребителей.
hbase(main):001:0> scan 'stream_kafka_offsets', {REVERSED => true}
ROW COLUMN+CELL
kafkablog2:groupid-1:1497628830000 column=offsets:0, timestamp=1497628832448, value=285
kafkablog2:groupid-1:1497628830000 column=offsets:1, timestamp=1497628832448, value=285
kafkablog2:groupid-1:1497628830000 column=offsets:2, timestamp=1497628832448, value=285
kafkablog2:groupid-1:1497628770000 column=offsets:0, timestamp=1497628773773, value=225
kafkablog2:groupid-1:1497628770000 column=offsets:1, timestamp=1497628773773, value=225
kafkablog2:groupid-1:1497628770000 column=offsets:2, timestamp=1497628773773, value=225
kafkablog1:groupid-2:1497628650000 column=offsets:0, timestamp=1497628653451, value=165
kafkablog1:groupid-2:1497628650000 column=offsets:1, timestamp=1497628653451, value=165
kafkablog1:groupid-2:1497628650000 column=offsets:2, timestamp=1497628653451, value=165
kafkablog1:groupid-1:1497628530000 column=offsets:0, timestamp=1497628533108, value=120
kafkablog1:groupid-1:1497628530000 column=offsets:1, timestamp=1497628533108, value=120
kafkablog1:groupid-1:1497628530000 column=offsets:2, timestamp=1497628533108, value=120
4 row(s) in 0.5030 seconds
hbase(main):002:0>
Используется следующая версия примера кода
GroupID | ArtifactID | Version |
---|---|---|
org.apache.spark | spark-streaming_2.11 | 2.1.0.cloudera1 |
org.apache.spark | spark-streaming-kafka-0-10_2.11 | 2.1.0.cloudera1 |
Сохранить смещения в ZooKeeper
Это также более надежный способ использования Zookeeper для хранения смещений в приложениях Kafka, подключенных к Spark Streaming.
В этой схеме задача Spark Streaming будет обращаться к Zookeeper для чтения смещений каждого раздела при запуске. Если появится новый раздел, его смещение будет установлено в начале. После обработки каждого пакета данных пользователь должен иметь возможность выбрать смещение или последнее смещение для хранения обработанных данных. Кроме того, новый потребитель сохранит смещение в ZooKeeper, используя тот же формат, что и старый потребительский API Kafka. Таким образом, любой инструмент, который отслеживает или контролирует смещение Kafka в Zookeeper, будет по-прежнему работать.
Инициализируйте соединение Zookeeper, чтобы получить смещения от Zookeeper.
val zkClientAndConnection = ZkUtils.createZkClientAndConnection(zkUrl, sessionTimeout, connectionTimeout)
val zkUtils = new ZkUtils(zkClientAndConnection._1, zkClientAndConnection._2, false)
Method for retrieving the last offsets stored in ZooKeeper of the consumer group and topic list.
def readOffsets(topics: Seq[String], groupId:String):
Map[TopicPartition, Long] = {
val topicPartOffsetMap = collection.mutable.HashMap.empty[TopicPartition, Long]
val partitionMap = zkUtils.getPartitionsForTopics(topics)
// /consumers/<groupId>/offsets/<topic>/
partitionMap.foreach(topicPartitions => {
val zkGroupTopicDirs = new ZKGroupTopicDirs(groupId, topicPartitions._1)
topicPartitions._2.foreach(partition => {
val offsetPath = zkGroupTopicDirs.consumerOffsetDir + "/" + partition
try {
val offsetStatTuple = zkUtils.readData(offsetPath)
if (offsetStatTuple != null) {
LOGGER.info("retrieving offset details - topic: {}, partition: {}, offset: {}, node path: {}", Seq[AnyRef](topicPartitions._1, partition.toString, offsetStatTuple._1, offsetPath): _*)
topicPartOffsetMap.put(new TopicPartition(topicPartitions._1, Integer.valueOf(partition)),
offsetStatTuple._1.toLong)
}
} catch {
case e: Exception =>
LOGGER.warn("retrieving offset details - no previous node exists:" + " {}, topic: {}, partition: {}, node path: {}", Seq[AnyRef](e.getMessage, topicPartitions._1, partition.toString, offsetPath): _*)
topicPartOffsetMap.put(new TopicPartition(topicPartitions._1, Integer.valueOf(partition)), 0L)
}
})
})
topicPartOffsetMap.toMap
}
Используйте полученные смещения для инициализации Kafka Direct DStream.
val inputDStream = KafkaUtils.createDirectStream(ssc, PreferConsistent, ConsumerStrategies.Subscribe[String,String](topics, kafkaParams, fromOffsets))
Вот как получить набор смещений из ZooKeeper
Примечание. Путь хранения смещения Kafka в ZooKeeper — /consumers/[groupId]/offsets/topic/[partitionId], а сохраненное значение — это offset
def persistOffsets(offsets: Seq[OffsetRange], groupId: String, storeEndOffset: Boolean): Unit = {
offsets.foreach(or => {
val zkGroupTopicDirs = new ZKGroupTopicDirs(groupId, or.topic);
val acls = new ListBuffer[ACL]()
val acl = new ACL
acl.setId(ANYONE_ID_UNSAFE)
acl.setPerms(PERMISSIONS_ALL)
acls += acl
val offsetPath = zkGroupTopicDirs.consumerOffsetDir + "/" + or.partition;
val offsetVal = if (storeEndOffset) or.untilOffset else or.fromOffset
zkUtils.updatePersistentPath(zkGroupTopicDirs.consumerOffsetDir + "/"
+ or.partition, offsetVal + "", JavaConversions.bufferAsJavaList(acls))
LOGGER.debug("persisting offset details - topic: {}, partition: {}, offset: {}, node path: {}", Seq[AnyRef](or.topic, or.partition.toString, offsetVal.toString, offsetPath): _*)
})
}
Кафка сама
Apache Spark 2.1.x и spark-streaming-kafka-0-10 используют новый потребительский API, который называется API асинхронной отправки. Вы можете использовать его после того, как убедитесь, что ваши обработанные данные хранятся правильно.commitAsync API
(API асинхронной фиксации) для фиксации смещений в Kafka. Новый потребительский API будет использовать идентификатор группы потребителей в качестве уникального идентификатора для отправки смещений.
Отправить смещения в Kafka
stream.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
// some time later, after outputs have completed
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}
может прибытьstreaming-kafka-0-10-integrationузнать больше в
Примечание: commitAsync() интегрирован в версию kafka-0-10 Spark Streaming, и документация Spark напоминает, что это все еще экспериментальный API и есть возможность модификации.
другие методы
Стоит отметить, что вы также можете хранить смещения в HDFS. Но хранение смещений в HDFS не является популярным способом, потому что HDFS имеет немного большую задержку для ZooKeeper и Hbase. Кроме того, сохранение смещения каждого пакета данных в HDFS также вызовет проблему небольших файлов.
Не управляет смещениями
Для использования Spark Streaming управление смещениями не требуется. Например, таким приложениям, как мониторинг в реальном времени, нужны только текущие данные, и им не нужно управлять смещениями, чтобы гарантировать, что данные не будут потеряны. В этом случае вам вообще не нужно управлять смещениями, старые потребители kafka могутauto.offset.reset
Установите наибольшую или наименьшую величину, а новые потребители — на самую раннюю или последнюю.
если вы будетеauto.offset.reset
Установите наименьшее (самое раннее), тогда задача будет считывать данные с первого смещения, что эквивалентно повторному воспроизведению всех данных. Этот параметр заставит вашу задачу снова прочитать данные, которые все еще существуют в теме, при перезапуске задачи. Это будет зависеть от периода хранения вашего сообщения, чтобы определить, будете ли вы повторно использовать его.
И наоборот, если вы будетеauto.offset.reset
Установите наибольшую (последнюю), тогда ваше приложение начнет чтение с самого последнего смещения при запуске, что приведет к потере данных. Это будет зависеть от строгости и семантики данных вашего приложения и может быть жизнеспособным решением.
Суммировать
Способ, который мы обсуждали выше для управления смещениями, поможет вам эффективно управлять смещениями в вашем приложении Spark Streaming. Эти методы могут помочь пользователям лучше справляться со сценариями сбоя приложений и восстановления данных в приложениях, которые непрерывно вычисляют и хранят данные.
Дополнительные сведения см.Spark 2 Kafka Integration or Spark Streaming + Kafka Integration Guide.