1. Описание версии
Spark предоставляет два интеграционных решения для разных версий Kafka:spark-streaming-kafka-0-8
а такжеspark-streaming-kafka-0-10
, основные отличия заключаются в следующем:
spark-streaming-kafka-0-8 | spark-streaming-kafka-0-10 | |
---|---|---|
Кафка версия | 0.8.2.1 or higher | 0.10.0 or higher |
статус точки доступа | Deprecated Поддержка Kafka 0.8 устарела с момента выпуска Spark 2.3.0. |
Стабильная (стабильная версия) |
языковая поддержка | Scala, Java, Python | Scala, Java |
Receiver DStream | Yes | No |
Direct DStream | Yes | Yes |
SSL / TLS Support | No | Yes |
Offset Commit API (офсетная фиксация) | No | Yes |
Dynamic Topic Subscription (Динамическая подписка на тему) |
No | Yes |
Версия Кафки, используемая в этой статье,kafka_2.12-2.2.0
, поэтому для интегрирования используется второй метод.
2. Зависимости проекта
Проект построен с помощью Maven, и основные зависимости следующие:
<properties>
<scala.version>2.12</scala.version>
</properties>
<dependencies>
<!-- Spark Streaming-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- Spark Streaming 整合 Kafka 依赖-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_${scala.version}</artifactId>
<version>2.4.3</version>
</dependency>
</dependencies>
См. этот репозиторий для получения полного исходного кода:spark-streaming-kafka
3. Интегрируйте Кафку
позвонивKafkaUtils
объектcreateDirectStream
способ создания входного потока, полный код выглядит следующим образом:
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* spark streaming 整合 kafka
*/
object KafkaDirectStream {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("KafkaDirectStream").setMaster("local[2]")
val streamingContext = new StreamingContext(sparkConf, Seconds(5))
val kafkaParams = Map[String, Object](
/*
* 指定 broker 的地址清单,清单里不需要包含所有的 broker 地址,生产者会从给定的 broker 里查找其他 broker 的信息。
* 不过建议至少提供两个 broker 的信息作为容错。
*/
"bootstrap.servers" -> "hadoop001:9092",
/*键的序列化器*/
"key.deserializer" -> classOf[StringDeserializer],
/*值的序列化器*/
"value.deserializer" -> classOf[StringDeserializer],
/*消费者所在分组的 ID*/
"group.id" -> "spark-streaming-group",
/*
* 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
* latest: 在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)
* earliest: 在偏移量无效的情况下,消费者将从起始位置读取分区的记录
*/
"auto.offset.reset" -> "latest",
/*是否自动提交*/
"enable.auto.commit" -> (true: java.lang.Boolean)
)
/*可以同时订阅多个主题*/
val topics = Array("spark-streaming-topic")
val stream = KafkaUtils.createDirectStream[String, String](
streamingContext,
/*位置策略*/
PreferConsistent,
/*订阅主题*/
Subscribe[String, String](topics, kafkaParams)
)
/*打印输入流*/
stream.map(record => (record.key, record.value)).print()
streamingContext.start()
streamingContext.awaitTermination()
}
}
3.1 ConsumerRecord
Каждая запись во входном потоке, полученном здесь, на самом делеConsumerRecord<K, V>
, который содержит всю доступную информацию о Record, исходный код выглядит следующим образом:
public class ConsumerRecord<K, V> {
public static final long NO_TIMESTAMP = RecordBatch.NO_TIMESTAMP;
public static final int NULL_SIZE = -1;
public static final int NULL_CHECKSUM = -1;
/*主题名称*/
private final String topic;
/*分区编号*/
private final int partition;
/*偏移量*/
private final long offset;
/*时间戳*/
private final long timestamp;
/*时间戳代表的含义*/
private final TimestampType timestampType;
/*键序列化器*/
private final int serializedKeySize;
/*值序列化器*/
private final int serializedValueSize;
/*值序列化器*/
private final Headers headers;
/*键*/
private final K key;
/*值*/
private final V value;
.....
}
3.2 Свойства производителя
в примере кодаkafkaParams
Он инкапсулирует свойства потребителей Kafka.Эти свойства не имеют ничего общего со Spark Streaming и определены в собственном API Kafka. Адрес сервера, сериализатор ключей и сериализатор значений являются обязательными, а другие конфигурации необязательны. Остальные необязательные элементы конфигурации следующие:
1. fetch.min.byte
Минимальное количество байтов, которое потребитель может получить с сервера. Если количество доступных данных меньше установленного значения, брокер будет ждать достаточного количества доступных данных, прежде чем вернуть их потребителю.
2. fetch.max.wait.ms
Время ожидания, пока брокер вернет данные потребителю.
3. max.partition.fetch.bytes
Максимальное количество байтов, возвращаемых потребителю разделом.
4. session.timeout.ms
Количество времени, в течение которого потребитель может отключиться от сервера, прежде чем он будет считаться мертвым.
5. auto.offset.reset
Этот атрибут указывает, что должен делать потребитель при чтении раздела без смещения или если смещение неверно:
- последний (по умолчанию): в случае недопустимого смещения потребитель начнет чтение данных из последней записи, сгенерированной после ее запуска;
- самый ранний: в случае, если смещение неверно, потребитель будет читать записи раздела с начальной позиции.
6. enable.auto.commit
Следует ли автоматически фиксировать смещение, значение по умолчанию равно true, во избежание дублирования данных и потери данных можно установить значение false.
7. client.id
Идентификатор клиента, используемый сервером для идентификации источника сообщения.
8. max.poll.records
одиночный вызовpoll()
Количество записей, которые может вернуть метод.
9. получить.буфер.байт и отправить.буфер.байт
Эти два параметра определяют размер буферов приема и отправки пакетов сокета TCP соответственно, а -1 представляет собой значение по умолчанию для операционной системы.
3.3 Стратегия местоположения
В Spark Streaming предусмотрены следующие три стратегии расположения для указания отношения распределения между разделами тем Kafka и исполнителями Spark:
-
PreferConsistent: равномерно распределяет разделы по всем Исполнителям;
-
PreferBrokers: этот параметр можно выбрать, когда исполнитель Spark и брокер Kafka находятся на одном компьютере, и он предпочтительно назначает ведущий раздел на этом брокере исполнителю на этом компьютере;
-
PreferFixed: вы можете указать отношение сопоставления между разделами темы и конкретными хостами, а также явно назначить разделы конкретным хостам.Конструктор выглядит следующим образом:
@Experimental
def PreferFixed(hostMap: collection.Map[TopicPartition, String]): LocationStrategy =
new PreferFixed(new ju.HashMap[TopicPartition, String](hostMap.asJava))
@Experimental
def PreferFixed(hostMap: ju.Map[TopicPartition, String]): LocationStrategy =
new PreferFixed(hostMap)
3.4 Метод подписки
Spark Streaming предоставляет два метода подписки на темы, а именноSubscribe
а такжеSubscribePattern
. Последний может использовать обычные совпадения для подписки на название темы. Его конструкторы следующие:
/**
* @param 需要订阅的主题的集合
* @param Kafka 消费者参数
* @param offsets(可选): 在初始启动时开始的偏移量。如果没有,则将使用保存的偏移量或 auto.offset.reset 属性的值
*/
def Subscribe[K, V](
topics: ju.Collection[jl.String],
kafkaParams: ju.Map[String, Object],
offsets: ju.Map[TopicPartition, jl.Long]): ConsumerStrategy[K, V] = { ... }
/**
* @param 需要订阅的正则
* @param Kafka 消费者参数
* @param offsets(可选): 在初始启动时开始的偏移量。如果没有,则将使用保存的偏移量或 auto.offset.reset 属性的值
*/
def SubscribePattern[K, V](
pattern: ju.regex.Pattern,
kafkaParams: collection.Map[String, Object],
offsets: collection.Map[TopicPartition, Long]): ConsumerStrategy[K, V] = { ... }
В примере кода мы фактически не указываем третий параметрoffsets
, поэтому программа использует конфигурацию по умолчаниюauto.offset.reset
Значение атрибута last, то есть в случае недопустимого смещения потребитель начнет чтение данных с последней записи, сгенерированной после ее запуска.
3.5 Фиксация смещения
В примере кода мы будемenable.auto.commit
Установите значение true для автоматической фиксации. В некоторых случаях вам может потребоваться более высокая надежность, например, фиксация смещений после того, как бизнес полностью обработан, и в этом случае вы можете использовать ручную фиксацию. Чтобы зафиксировать вручную, вам нужно вызвать собственный API Kafka:
-
commitSync
: для асинхронной отправки; -
commitAsync
: для синхронных коммитов.
Чтобы узнать о конкретных способах отправки, см.: [Подробное объяснение потребителей Kafka](GitHub.com/Black and WhiteShould/…Детали потребителя.мд)
В-четвертых, запустите тест
4.1 Создать тему
1. Запустите Кафку
Работа Kafka зависит от zookeeper, который нужно запустить заранее.Вы можете запустить встроенный zookeeper Kafka, или можете запустить собственную установку:
# zookeeper启动命令
bin/zkServer.sh start
# 内置zookeeper启动命令
bin/zookeeper-server-start.sh config/zookeeper.properties
Запускаем одноузловую кафку для тестирования:
# bin/kafka-server-start.sh config/server.properties
2. Создать тему
# 创建用于测试主题
bin/kafka-topics.sh --create \
--bootstrap-server hadoop001:9092 \
--replication-factor 1 \
--partitions 1 \
--topic spark-streaming-topic
# 查看所有主题
bin/kafka-topics.sh --list --bootstrap-server hadoop001:9092
3. Создайте производителя
Здесь создается производитель Kafka для отправки тестовых данных:
bin/kafka-console-producer.sh --broker-list hadoop001:9092 --topic spark-streaming-topic
4.2 Тест локального режима
Здесь я напрямую использую локальный режим для запуска программы Spark Streaming. Используйте производителя для отправки данных после запуска и просмотра результатов из консоли.
Из вывода консоли вы можете видеть, что поток данных был успешно получен из-заkafka-console-producer.sh
Отправляемые данные по умолчанию не имеют ключа, поэтому значение ключа равно null. При этом по выводу также видно, что указанное в программеgroupId
и автоматически назначается программойclientId
.
использованная литература
Дополнительные статьи серии о больших данных см. в проекте с открытым исходным кодом GitHub.:Руководство для начинающих по большим данным