1. Введение
Подобно Spring Data Redis, Spring Data MongoDB, Spring Data JPA и другим проектам, Spring Kafka предоставляет способ доступа к кластерам Kafka посредством простой настройки в приложениях Spring.
В этой статье я расскажу, как производители сообщений отправляют сообщения в кластеры Kafka в приложениях Spring, как потребители сообщений потребляют сообщения, как потребляют сообщения пакетами и как несколько групп потребителей потребляют сообщения одновременно.
Следует отметить, что для использования новейших функций Spring Kafka следующий тестовый код собран с использованием Spring Boot 2.0.0, и доступны все адреса ссылок на исходный код:git ee.com/word way can/K…
Два основных использования Spring Kafka
(1) Добавьте зависимости в pom.xml:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
Примечание. Поскольку для сборки проекта я использовал Spring Boot, я не указывал конкретную версию Spring Kafka при добавлении зависимостей (фактическая версия пакета jar — 2.1.x). Подробный файл pom.xml может относиться к:git ee.com/word way can/K…
(2) Базовая конфигурация:
i) Если вы создаете свой проект с помощью Spring Boot, вы можете просто добавить следующую конфигурацию в файл свойств:
#kafka,更多配置:org.springframework.boot.autoconfigure.kafka.KafkaProperties
#指定kafka 代理地址,可以多个
spring.kafka.bootstrap-servers=192.168.1.159:9092,192.168.1.159:9093,192.168.1.159:9094
#指定默认topic id
spring.kafka.template.default-topic=topic-test
#指定listener 容器中的线程数,用于提高并发量
spring.kafka.listener.concurrency=3
#每次批量发送消息的数量
spring.kafka.producer.batch-size=1000
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
#指定默认消费者group id
spring.kafka.consumer.group-id=myGroup1
#若设置为earliest,那么会从头开始读partition
spring.kafka.consumer.auto-offset-reset=latest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
ii) Если вы используете обычный Maven для сборки проекта или хотите настроить дополнительную конфигурацию, вы можете использовать следующую конфигурацию JavaConfig:
package cn.zifangsky.kafkademo.config;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode;
/**
* Kafka配置
* @author zifangsky
*/
@Configuration
@EnableKafka
public class KafkaConfig {
@Value("${kafka.producer.bootstrapServers}")
private String producerBootstrapServers; //生产者连接Server地址
@Value("${kafka.producer.retries}")
private String producerRetries; //生产者重试次数
@Value("${kafka.producer.batchSize}")
private String producerBatchSize;
@Value("${kafka.producer.lingerMs}")
private String producerLingerMs;
@Value("${kafka.producer.bufferMemory}")
private String producerBufferMemory;
@Value("${kafka.consumer.bootstrapServers}")
private String consumerBootstrapServers;
@Value("${kafka.consumer.groupId}")
private String consumerGroupId;
@Value("${kafka.consumer.enableAutoCommit}")
private String consumerEnableAutoCommit;
@Value("${kafka.consumer.autoCommitIntervalMs}")
private String consumerAutoCommitIntervalMs;
@Value("${kafka.consumer.sessionTimeoutMs}")
private String consumerSessionTimeoutMs;
@Value("${kafka.consumer.maxPollRecords}")
private String consumerMaxPollRecords;
@Value("${kafka.consumer.autoOffsetReset}")
private String consumerAutoOffsetReset;
/**
* ProducerFactory
* @return
*/
@Bean
public ProducerFactory<Object, Object> producerFactory(){
Map<String, Object> configs = new HashMap<String, Object>(); //参数
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, producerBootstrapServers);
configs.put(ProducerConfig.RETRIES_CONFIG, producerRetries);
configs.put(ProducerConfig.BATCH_SIZE_CONFIG, producerBatchSize);
configs.put(ProducerConfig.LINGER_MS_CONFIG, producerLingerMs);
configs.put(ProducerConfig.BUFFER_MEMORY_CONFIG, producerBufferMemory);
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
return new DefaultKafkaProducerFactory<Object, Object>(configs);
}
/**
* KafkaTemplate
* @param producerFactory
* @return
*/
@Bean
public KafkaTemplate<Object, Object> kafkaTemplate(){
return new KafkaTemplate<Object, Object>(producerFactory(), true);
}
/**
* ConsumerFactory
* @return
*/
@Bean
public ConsumerFactory<Object, Object> consumerFactory(){
Map<String, Object> configs = new HashMap<String, Object>(); //参数
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, consumerBootstrapServers);
configs.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, consumerEnableAutoCommit);
configs.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, consumerAutoCommitIntervalMs);
configs.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, consumerSessionTimeoutMs);
configs.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, consumerMaxPollRecords); //批量消费数量
configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, consumerAutoOffsetReset);
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
return new DefaultKafkaConsumerFactory<Object, Object>(configs);
}
/**
* 添加KafkaListenerContainerFactory,用于批量消费消息
* @return
*/
@Bean
public KafkaListenerContainerFactory<?> batchContainerFactory(){
ConcurrentKafkaListenerContainerFactory<Object, Object> containerFactory = new ConcurrentKafkaListenerContainerFactory<Object, Object>();
containerFactory.setConsumerFactory(consumerFactory());
containerFactory.setConcurrency(4);
containerFactory.setBatchListener(true); //批量消费
containerFactory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
return containerFactory;
}
}
Примечание. Используемые выше свойства:
kafka.producer.bootstrapServers=192.168.1.159:9092,192.168.1.159:9093,192.168.1.159:9094
kafka.producer.retries=3
#16K
kafka.producer.batchSize=16384
kafka.producer.lingerMs=1
#32M
kafka.producer.bufferMemory=33554432
kafka.consumer.bootstrapServers=192.168.1.159:9092,192.168.1.159:9093,192.168.1.159:9094
kafka.consumer.groupId=0
kafka.consumer.enableAutoCommit=false
kafka.consumer.autoCommitIntervalMs=1000
kafka.consumer.sessionTimeoutMs=30000
kafka.consumer.maxPollRecords=100
#earliest,latest
kafka.consumer.autoOffsetReset=earliest
Примечание. Подробное значение этих свойств см. в официальной документации:kafka.apache.org/document ATI…
(3) Первый экземпляр создания/потребления сообщения:
i) Производитель сообщения:
package cn.zifangsky.kafkademo.producer;
import java.text.MessageFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
/**
* 消息生产者的第一个示例
* @author zifangsky
*/
@Component("simpleProducer")
public class SimpleProducer {
private static final Logger logger = LoggerFactory.getLogger(SimpleProducer.class);
@Autowired
private KafkaTemplate<Object, Object> kafkaTemplate;
/**
* 使用KafkaTemplate向Kafka推送数据
* @param topicName topic
* @param data
*/
public void sendMessage(String topicName,String data){
logger.info(MessageFormat.format("开始向Kafka推送数据:{0}", data));
try {
kafkaTemplate.send(topicName, data);
logger.info("推送数据成功!");
} catch (Exception e) {
logger.error(MessageFormat.format("推送数据出错,topic:{0},data:{1}"
,topicName,data));
}
}
}
ii) вызвать контроллер:
package cn.zifangsky.kafkademo.controller;
import javax.annotation.Resource;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import cn.zifangsky.kafkademo.producer.SimpleProducer;
@RestController
@RequestMapping("/kafka")
public class TestKafkaController {
@Resource(name="simpleProducer")
private SimpleProducer producer;
private final String TOPIC = "topic-test"; //测试使用topic
@RequestMapping("/send")
public String send(String data){
producer.sendMessage(TOPIC, data);
return "发送数据【" + data + "】成功!";
}
}
iii) Потребители сообщений:
package cn.zifangsky.kafkademo.consumer;
import java.text.MessageFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import cn.zifangsky.kafkademo.producer.SimpleProducer;
/**
* 消息消费者的第一个示例
* @author zifangsky
*/
@Component("simpleConsumer")
public class SimpleConsumer {
private static final Logger logger = LoggerFactory.getLogger(SimpleProducer.class);
@KafkaListener(id="test",topics={"topic-test"})
public void listen(String data){
System.out.println("SimpleConsumer收到消息:" + data);
logger.info(MessageFormat.format("SimpleConsumer收到消息:{0}", data));
}
}
г) Тест:
После запуска проекта зайдите в браузере: http://127.0.0.1:9090/kafka/send?data=hahaha 1111
Вывод в консоли следующий:
Можно обнаружить, что этот простейший пример уже может работать нормально.
(4) Отправка/получение сообщений пользовательского типа:
В приведенных выше примерах все сообщения, которые мы отправляем/получаем, представляют собой простые строки, и суть заключается в использовании StringDeserializer и StringDeserializer для кодирования и декодирования сообщений. Однако в реальной разработке иногда может потребоваться отправить более сложные сообщения (например, мы хотим отправить сообщение с описанием погодных условий в определенной области, которое включает в себя несколько параметров, таких как температура, влажность, индекс загрязнения и т. д.). . В этом случае у нас обычно есть два способа добиться этого:
- Способ 1. Преобразуйте объект Java в строку JSON перед отправкой сообщения, а затем отправьте его в кластер Kafka.
- Способ 2: настроить кодировщик и декодер сообщений, напрямую отправлять объекты Java
i) Пользовательский кодировщик и декодер сообщений:
package cn.zifangsky.kafkademo.common;
import java.util.Map;
import org.apache.kafka.common.serialization.Serializer;
import org.springframework.util.SerializationUtils;
public class ObjectSerializer implements Serializer<Object> {
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
}
/**
* 序列化
*/
@Override
public byte[] serialize(String topic, Object data) {
return SerializationUtils.serialize(data);
}
@Override
public void close() {
}
}
package cn.zifangsky.kafkademo.common;
import java.util.Map;
import org.apache.kafka.common.serialization.Deserializer;
import org.springframework.util.SerializationUtils;
public class ObjectDeserializer implements Deserializer<Object> {
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
}
/**
* 反序列化
*/
@Override
public Object deserialize(String topic, byte[] data) {
return SerializationUtils.deserialize(data);
}
@Override
public void close() {
}
}
ii) Измените соответствующую конфигурацию в KafkaConfig:
/**
* ProducerFactory
* @return
*/
@Bean
public ProducerFactory<Object, Object> producerFactory(){
Map<String, Object> configs = new HashMap<String, Object>(); //参数
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, producerBootstrapServers);
configs.put(ProducerConfig.RETRIES_CONFIG, producerRetries);
configs.put(ProducerConfig.BATCH_SIZE_CONFIG, producerBatchSize);
configs.put(ProducerConfig.LINGER_MS_CONFIG, producerLingerMs);
configs.put(ProducerConfig.BUFFER_MEMORY_CONFIG, producerBufferMemory);
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
// configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ObjectSerializer.class);
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,ObjectSerializer.class);
return new DefaultKafkaProducerFactory<Object, Object>(configs);
}
...
/**
* ConsumerFactory
* @return
*/
@Bean
public ConsumerFactory<Object, Object> consumerFactory(){
Map<String, Object> configs = new HashMap<String, Object>(); //参数
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, consumerBootstrapServers);
configs.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, consumerEnableAutoCommit);
configs.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, consumerAutoCommitIntervalMs);
configs.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, consumerSessionTimeoutMs);
configs.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, consumerMaxPollRecords); //批量消费数量
configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, consumerAutoOffsetReset);
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
// configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ObjectDeserializer.class);
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,ObjectDeserializer.class); //需要把原来的消息删掉,不然会出现反序列化失败的问题
return new DefaultKafkaConsumerFactory<Object, Object>(configs);
}
Следует отметить, что после смены кодировщика и декодера сообщения необходимо очистить исходное сообщение в теме или использовать новую тему, иначе при десериализации исходное строковое сообщение будет ненормальным, помните.
iii) Добавьте новый метод в SimpleProducer.java:
/**
* 使用KafkaTemplate向Kafka推送数据
* @param topicName topic
* @param data
*/
public void sendObjectMessage(String topicName,Object data){
logger.info(MessageFormat.format("开始向Kafka推送数据:{0}", data));
try {
kafkaTemplate.send(topicName, data);
logger.info("推送数据成功!");
} catch (Exception e) {
logger.error(MessageFormat.format("推送数据出错,topic:{0},data:{1}"
,topicName,data));
}
}
г) Тест:
Сначала добавьте класс тестовой сущности DemoObj.java:
package cn.zifangsky.kafkademo.model;
import java.io.Serializable;
public class DemoObj implements Serializable{
private static final long serialVersionUID = -8094247978023094250L;
private Long id;
private String data;
public DemoObj() {
}
public DemoObj(Long id, String data) {
this.id = id;
this.data = data;
}
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public String getData() {
return data;
}
public void setData(String data) {
this.data = data;
}
@Override
public String toString() {
return "DemoObj [id=" + id + ", data=" + data + "]";
}
}
Затем добавьте новый метод в TestKafkaController.java:
@RequestMapping("/send2")
public String send2(DemoObj demoObj){
producer.sendObjectMessage(TOPIC2, demoObj);
return "发送数据【" + demoObj + "】成功!";
}
Наконец, добавьте соответствующих потребителей сообщений:
package cn.zifangsky.kafkademo.consumer;
import java.text.MessageFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import cn.zifangsky.kafkademo.model.DemoObj;
/**
* 消息消费者(group1)
* @author zifangsky
*
*/
@Component("groupListener1")
public class GroupListener1 {
private static final Logger logger = LoggerFactory.getLogger(GroupListener1.class);
@KafkaListener(topics={"topic-test2"},groupId="group1")
public void listenTopic2(DemoObj data){
System.out.println("Group1收到消息:" + data);
logger.info(MessageFormat.format("Group1收到消息:{0}", data));
}
}
Доступ в браузере: http://127.0.0.1:9090/kafka/send2?id=8&data=test9
Вывод в консоли следующий:
(5) Несколько групп потребителей используют одно и то же сообщение:
После версии Spring Kafka 2.x аннотация @KafkaListener добавляет параметр groupId для указания группы потребителей, к которой он принадлежит. Согласно принципу дизайна Kafka, если два разных потребителя находятся в двух разных группах потребителей, то они могут одновременно потреблять одно и то же сообщение (сообщение, отправленное производителем в определенный раздел определенной темы).
Поэтому добавьте новую группу потребителей на основе приведенного выше кода:
package cn.zifangsky.kafkademo.consumer;
import java.text.MessageFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import cn.zifangsky.kafkademo.model.DemoObj;
/**
* 消息消费者(group2)
* @author zifangsky
*
*/
@Component("groupListener2")
public class GroupListener2 {
private static final Logger logger = LoggerFactory.getLogger(GroupListener2.class);
@KafkaListener(topics={"topic-test2"},groupId="group2")
public void listenTopic2(DemoObj data){
System.out.println("Group2收到消息:" + data);
logger.info(MessageFormat.format("Group2收到消息:{0}", data));
}
// @KafkaListener(topics={"topic-test2"},groupId="group2")
// public void listenTopic2_2(DemoObj data){
// System.out.println("Group2_2收到消息:" + data);
// logger.info(MessageFormat.format("Group2_2收到消息:{0}", data));
// }
}
Протестируйте еще раз, вы можете обнаружить, что вывод выглядит следующим образом:
Очевидно, что результат такой, как мы и ожидали.
(6) Настройте @KafkaListener для приема сообщений в пакетном режиме:
Спринг Кафкаофициальная документацияВидно, что @KafkaListener поддерживает пакетное потребление сообщений, начиная с версии 1.1.Официальный пример выглядит следующим образом:
На самом деле я уже настроил соответствующую конфигурацию в KafkaConfig выше, то есть:
/**
* ConsumerFactory
* @return
*/
@Bean
public ConsumerFactory<Object, Object> consumerFactory(){
Map<String, Object> configs = new HashMap<String, Object>(); //参数
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, consumerBootstrapServers);
configs.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, consumerEnableAutoCommit);
configs.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, consumerAutoCommitIntervalMs);
configs.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, consumerSessionTimeoutMs);
configs.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, consumerMaxPollRecords); //批量消费数量
configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, consumerAutoOffsetReset);
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
// configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ObjectDeserializer.class);
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,ObjectDeserializer.class); //需要把原来的消息删掉,不然会出现反序列化失败的问题
return new DefaultKafkaConsumerFactory<Object, Object>(configs);
}
/**
* 添加KafkaListenerContainerFactory,用于批量消费消息
* @return
*/
@Bean
public KafkaListenerContainerFactory<?> batchContainerFactory(){
ConcurrentKafkaListenerContainerFactory<Object, Object> containerFactory = new ConcurrentKafkaListenerContainerFactory<Object, Object>();
containerFactory.setConsumerFactory(consumerFactory());
containerFactory.setConcurrency(4);
containerFactory.setBatchListener(true); //批量消费
containerFactory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
return containerFactory;
}
Затем добавьте новый метод в приведенный выше GroupListener1.java для тестирования:
@KafkaListener(topics={"topic-test"},groupId="group1",containerFactory="batchContainerFactory")
public void listenTopic1(List<String> data){
System.out.println("Group1收到消息:" + data);
logger.info(MessageFormat.format("Group1收到消息:{0}", data));
}
Окончательные результаты испытаний следующие:
Ссылаться на: