Подробное объяснение основного использования сообщений производства/потребления Spring Kafka 2.x.

Spring Boot Kafka
Подробное объяснение основного использования сообщений производства/потребления Spring Kafka 2.x.

1. Введение

Подобно Spring Data Redis, Spring Data MongoDB, Spring Data JPA и другим проектам, Spring Kafka предоставляет способ доступа к кластерам Kafka посредством простой настройки в приложениях Spring.

В этой статье я расскажу, как производители сообщений отправляют сообщения в кластеры Kafka в приложениях Spring, как потребители сообщений потребляют сообщения, как потребляют сообщения пакетами и как несколько групп потребителей потребляют сообщения одновременно.

Следует отметить, что для использования новейших функций Spring Kafka следующий тестовый код собран с использованием Spring Boot 2.0.0, и доступны все адреса ссылок на исходный код:git ee.com/word way can/K…

Два основных использования Spring Kafka

(1) Добавьте зависимости в pom.xml:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

Примечание. Поскольку для сборки проекта я использовал Spring Boot, я не указывал конкретную версию Spring Kafka при добавлении зависимостей (фактическая версия пакета jar — 2.1.x). Подробный файл pom.xml может относиться к:git ee.com/word way can/K…

(2) Базовая конфигурация:

i) Если вы создаете свой проект с помощью Spring Boot, вы можете просто добавить следующую конфигурацию в файл свойств:

#kafka,更多配置:org.springframework.boot.autoconfigure.kafka.KafkaProperties
#指定kafka 代理地址,可以多个
spring.kafka.bootstrap-servers=192.168.1.159:9092,192.168.1.159:9093,192.168.1.159:9094
#指定默认topic id
spring.kafka.template.default-topic=topic-test
#指定listener 容器中的线程数,用于提高并发量
spring.kafka.listener.concurrency=3
#每次批量发送消息的数量
spring.kafka.producer.batch-size=1000
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
 
#指定默认消费者group id
spring.kafka.consumer.group-id=myGroup1
#若设置为earliest,那么会从头开始读partition
spring.kafka.consumer.auto-offset-reset=latest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

ii) Если вы используете обычный Maven для сборки проекта или хотите настроить дополнительную конфигурацию, вы можете использовать следующую конфигурацию JavaConfig:

package cn.zifangsky.kafkademo.config;
 
import java.util.HashMap;
import java.util.Map;
 
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode;
 
/**
 * Kafka配置
 * @author zifangsky
 */
@Configuration
@EnableKafka
public class KafkaConfig {
 
    @Value("${kafka.producer.bootstrapServers}")
    private String producerBootstrapServers; //生产者连接Server地址
    
    @Value("${kafka.producer.retries}")
    private String producerRetries; //生产者重试次数
    
    @Value("${kafka.producer.batchSize}")
    private String producerBatchSize;
    
    @Value("${kafka.producer.lingerMs}")
    private String producerLingerMs;
    
    @Value("${kafka.producer.bufferMemory}")
    private String producerBufferMemory;
    
    
    @Value("${kafka.consumer.bootstrapServers}")
    private String consumerBootstrapServers;
    
    @Value("${kafka.consumer.groupId}")
    private String consumerGroupId;
    
    @Value("${kafka.consumer.enableAutoCommit}")
    private String consumerEnableAutoCommit;
    
    @Value("${kafka.consumer.autoCommitIntervalMs}")
    private String consumerAutoCommitIntervalMs;
    
    @Value("${kafka.consumer.sessionTimeoutMs}")
    private String consumerSessionTimeoutMs;
    
    @Value("${kafka.consumer.maxPollRecords}")
    private String consumerMaxPollRecords;
    
    @Value("${kafka.consumer.autoOffsetReset}")
    private String consumerAutoOffsetReset;
    
    /**
     * ProducerFactory
     * @return
     */
    @Bean
    public ProducerFactory<Object, Object> producerFactory(){
        Map<String, Object> configs = new HashMap<String, Object>(); //参数
        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, producerBootstrapServers);
        configs.put(ProducerConfig.RETRIES_CONFIG, producerRetries);
        configs.put(ProducerConfig.BATCH_SIZE_CONFIG, producerBatchSize);
        configs.put(ProducerConfig.LINGER_MS_CONFIG, producerLingerMs);
        configs.put(ProducerConfig.BUFFER_MEMORY_CONFIG, producerBufferMemory);
        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
        
        return new DefaultKafkaProducerFactory<Object, Object>(configs);
    }
    
    /**
     * KafkaTemplate
     * @param producerFactory
     * @return
     */
    @Bean
    public KafkaTemplate<Object, Object> kafkaTemplate(){
        return new KafkaTemplate<Object, Object>(producerFactory(), true);
    }
 
    /**
     * ConsumerFactory
     * @return
     */
    @Bean
    public ConsumerFactory<Object, Object> consumerFactory(){
        Map<String, Object> configs = new HashMap<String, Object>(); //参数
        configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, consumerBootstrapServers);
        configs.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
        configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, consumerEnableAutoCommit);
        configs.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, consumerAutoCommitIntervalMs);
        configs.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, consumerSessionTimeoutMs);
        configs.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, consumerMaxPollRecords); //批量消费数量
        configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, consumerAutoOffsetReset);
        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
    
        return new DefaultKafkaConsumerFactory<Object, Object>(configs);
    }
    
    /**
     * 添加KafkaListenerContainerFactory,用于批量消费消息
     * @return
     */
    @Bean
    public KafkaListenerContainerFactory<?> batchContainerFactory(){
        ConcurrentKafkaListenerContainerFactory<Object, Object> containerFactory = new ConcurrentKafkaListenerContainerFactory<Object, Object>();
        containerFactory.setConsumerFactory(consumerFactory());
        containerFactory.setConcurrency(4);
        containerFactory.setBatchListener(true); //批量消费
        containerFactory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
        
        return containerFactory;
    }
    
}

Примечание. Используемые выше свойства:

kafka.producer.bootstrapServers=192.168.1.159:9092,192.168.1.159:9093,192.168.1.159:9094
kafka.producer.retries=3
#16K
kafka.producer.batchSize=16384
kafka.producer.lingerMs=1
#32M
kafka.producer.bufferMemory=33554432
 
kafka.consumer.bootstrapServers=192.168.1.159:9092,192.168.1.159:9093,192.168.1.159:9094
kafka.consumer.groupId=0
kafka.consumer.enableAutoCommit=false
kafka.consumer.autoCommitIntervalMs=1000
kafka.consumer.sessionTimeoutMs=30000
kafka.consumer.maxPollRecords=100
#earliest,latest
kafka.consumer.autoOffsetReset=earliest

Примечание. Подробное значение этих свойств см. в официальной документации:kafka.apache.org/document ATI…

(3) Первый экземпляр создания/потребления сообщения:

i) Производитель сообщения:

package cn.zifangsky.kafkademo.producer;
 
import java.text.MessageFormat;
 
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
 
/**
 * 消息生产者的第一个示例
 * @author zifangsky
 */
@Component("simpleProducer")
public class SimpleProducer {
    private static final Logger logger = LoggerFactory.getLogger(SimpleProducer.class);
    
    @Autowired
    private KafkaTemplate<Object, Object> kafkaTemplate;
    
    /**
     * 使用KafkaTemplate向Kafka推送数据
     * @param topicName topic
     * @param data
     */
    public void sendMessage(String topicName,String data){
        logger.info(MessageFormat.format("开始向Kafka推送数据:{0}", data));
        
        try {
            kafkaTemplate.send(topicName, data);
            logger.info("推送数据成功!");
        } catch (Exception e) {
            logger.error(MessageFormat.format("推送数据出错,topic:{0},data:{1}"
                    ,topicName,data));
        }
    }
 
}
 

ii) вызвать контроллер:

package cn.zifangsky.kafkademo.controller;
 
import javax.annotation.Resource;
 
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
 
import cn.zifangsky.kafkademo.producer.SimpleProducer;
 
@RestController
@RequestMapping("/kafka")
public class TestKafkaController {
    @Resource(name="simpleProducer")
    private SimpleProducer producer;
    
    private final String TOPIC = "topic-test"; //测试使用topic
    
    @RequestMapping("/send")
    public String send(String data){
        producer.sendMessage(TOPIC, data);
        
        return "发送数据【" + data + "】成功!";
    }
 
}
 

iii) Потребители сообщений:

package cn.zifangsky.kafkademo.consumer;
 
import java.text.MessageFormat;
 
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
 
import cn.zifangsky.kafkademo.producer.SimpleProducer;
 
/**
 * 消息消费者的第一个示例
 * @author zifangsky
 */
@Component("simpleConsumer")
public class SimpleConsumer {
    private static final Logger logger = LoggerFactory.getLogger(SimpleProducer.class);
    
    @KafkaListener(id="test",topics={"topic-test"})
    public void listen(String data){
        System.out.println("SimpleConsumer收到消息:" + data);
        logger.info(MessageFormat.format("SimpleConsumer收到消息:{0}", data));
    }
    
}
 

г) Тест:

После запуска проекта зайдите в браузере: http://127.0.0.1:9090/kafka/send?data=hahaha 1111

Вывод в консоли следующий:

Можно обнаружить, что этот простейший пример уже может работать нормально.

(4) Отправка/получение сообщений пользовательского типа:

В приведенных выше примерах все сообщения, которые мы отправляем/получаем, представляют собой простые строки, и суть заключается в использовании StringDeserializer и StringDeserializer для кодирования и декодирования сообщений. Однако в реальной разработке иногда может потребоваться отправить более сложные сообщения (например, мы хотим отправить сообщение с описанием погодных условий в определенной области, которое включает в себя несколько параметров, таких как температура, влажность, индекс загрязнения и т. д.). . В этом случае у нас обычно есть два способа добиться этого:

  • Способ 1. Преобразуйте объект Java в строку JSON перед отправкой сообщения, а затем отправьте его в кластер Kafka.
  • Способ 2: настроить кодировщик и декодер сообщений, напрямую отправлять объекты Java

i) Пользовательский кодировщик и декодер сообщений:

package cn.zifangsky.kafkademo.common;
 
import java.util.Map;
 
import org.apache.kafka.common.serialization.Serializer;
import org.springframework.util.SerializationUtils;
 
public class ObjectSerializer implements Serializer<Object> {
 
    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
 
    }
 
    /**
     * 序列化
     */
    @Override
    public byte[] serialize(String topic, Object data) {
        return SerializationUtils.serialize(data);
    }
 
    @Override
    public void close() {
 
    }
 
}
 

package cn.zifangsky.kafkademo.common;
 
import java.util.Map;
 
import org.apache.kafka.common.serialization.Deserializer;
import org.springframework.util.SerializationUtils;
 
public class ObjectDeserializer implements Deserializer<Object> {
 
    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
 
    }
 
    /**
     * 反序列化
     */
    @Override
    public Object deserialize(String topic, byte[] data) {
        return SerializationUtils.deserialize(data);
    }
 
    @Override
    public void close() {
 
    }
 
}
 

ii) Измените соответствующую конфигурацию в KafkaConfig:

    /**
     * ProducerFactory
     * @return
     */
    @Bean
    public ProducerFactory<Object, Object> producerFactory(){
        Map<String, Object> configs = new HashMap<String, Object>(); //参数
        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, producerBootstrapServers);
        configs.put(ProducerConfig.RETRIES_CONFIG, producerRetries);
        configs.put(ProducerConfig.BATCH_SIZE_CONFIG, producerBatchSize);
        configs.put(ProducerConfig.LINGER_MS_CONFIG, producerLingerMs);
        configs.put(ProducerConfig.BUFFER_MEMORY_CONFIG, producerBufferMemory);
        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
//        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
//        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ObjectSerializer.class);
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,ObjectSerializer.class);
        
        return new DefaultKafkaProducerFactory<Object, Object>(configs);
    }
    
        ...
 
    /**
     * ConsumerFactory
     * @return
     */
    @Bean
    public ConsumerFactory<Object, Object> consumerFactory(){
        Map<String, Object> configs = new HashMap<String, Object>(); //参数
        configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, consumerBootstrapServers);
        configs.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
        configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, consumerEnableAutoCommit);
        configs.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, consumerAutoCommitIntervalMs);
        configs.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, consumerSessionTimeoutMs);
        configs.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, consumerMaxPollRecords); //批量消费数量
        configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, consumerAutoOffsetReset);
        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
//        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
//        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ObjectDeserializer.class);
        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,ObjectDeserializer.class); //需要把原来的消息删掉,不然会出现反序列化失败的问题
        
        return new DefaultKafkaConsumerFactory<Object, Object>(configs);
    }

Следует отметить, что после смены кодировщика и декодера сообщения необходимо очистить исходное сообщение в теме или использовать новую тему, иначе при десериализации исходное строковое сообщение будет ненормальным, помните.

iii) Добавьте новый метод в SimpleProducer.java:

    /**
     * 使用KafkaTemplate向Kafka推送数据
     * @param topicName topic
     * @param data
     */
    public void sendObjectMessage(String topicName,Object data){
        logger.info(MessageFormat.format("开始向Kafka推送数据:{0}", data));
        
        try {
            kafkaTemplate.send(topicName, data);
            logger.info("推送数据成功!");
        } catch (Exception e) {
            logger.error(MessageFormat.format("推送数据出错,topic:{0},data:{1}"
                    ,topicName,data));
        }
    }

г) Тест:

Сначала добавьте класс тестовой сущности DemoObj.java:

package cn.zifangsky.kafkademo.model;
 
import java.io.Serializable;
 
public class DemoObj implements Serializable{
    private static final long serialVersionUID = -8094247978023094250L;
    private Long id;
    private String data;
 
    public DemoObj() {
 
    }
 
    public DemoObj(Long id, String data) {
        this.id = id;
        this.data = data;
    }
 
    public Long getId() {
        return id;
    }
 
    public void setId(Long id) {
        this.id = id;
    }
 
    public String getData() {
        return data;
    }
 
    public void setData(String data) {
        this.data = data;
    }
 
    @Override
    public String toString() {
        return "DemoObj [id=" + id + ", data=" + data + "]";
    }
 
}

Затем добавьте новый метод в TestKafkaController.java:

    @RequestMapping("/send2")
    public String send2(DemoObj demoObj){
        producer.sendObjectMessage(TOPIC2, demoObj);
        
        return "发送数据【" + demoObj + "】成功!";
    }

Наконец, добавьте соответствующих потребителей сообщений:

package cn.zifangsky.kafkademo.consumer;
 
import java.text.MessageFormat;
 
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
 
import cn.zifangsky.kafkademo.model.DemoObj;
 
/**
 * 消息消费者(group1)
 * @author zifangsky
 *
 */
@Component("groupListener1")
public class GroupListener1 {
    private static final Logger logger = LoggerFactory.getLogger(GroupListener1.class);
    
    @KafkaListener(topics={"topic-test2"},groupId="group1")
    public void listenTopic2(DemoObj data){
        System.out.println("Group1收到消息:" + data);
        logger.info(MessageFormat.format("Group1收到消息:{0}", data));
    }
    
}

Доступ в браузере: http://127.0.0.1:9090/kafka/send2?id=8&data=test9

Вывод в консоли следующий:

(5) Несколько групп потребителей используют одно и то же сообщение:

После версии Spring Kafka 2.x аннотация @KafkaListener добавляет параметр groupId для указания группы потребителей, к которой он принадлежит. Согласно принципу дизайна Kafka, если два разных потребителя находятся в двух разных группах потребителей, то они могут одновременно потреблять одно и то же сообщение (сообщение, отправленное производителем в определенный раздел определенной темы).

Поэтому добавьте новую группу потребителей на основе приведенного выше кода:

package cn.zifangsky.kafkademo.consumer;
 
import java.text.MessageFormat;
 
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
 
import cn.zifangsky.kafkademo.model.DemoObj;
 
/**
 * 消息消费者(group2)
 * @author zifangsky
 *
 */
@Component("groupListener2")
public class GroupListener2 {
    private static final Logger logger = LoggerFactory.getLogger(GroupListener2.class);
    
    @KafkaListener(topics={"topic-test2"},groupId="group2")
    public void listenTopic2(DemoObj data){
        System.out.println("Group2收到消息:" + data);
        logger.info(MessageFormat.format("Group2收到消息:{0}", data));
    }
    
//    @KafkaListener(topics={"topic-test2"},groupId="group2")
//    public void listenTopic2_2(DemoObj data){
//        System.out.println("Group2_2收到消息:" + data);
//        logger.info(MessageFormat.format("Group2_2收到消息:{0}", data));
//    }
    
}

Протестируйте еще раз, вы можете обнаружить, что вывод выглядит следующим образом:

Очевидно, что результат такой, как мы и ожидали.

(6) Настройте @KafkaListener для приема сообщений в пакетном режиме:

Спринг Кафкаофициальная документацияВидно, что @KafkaListener поддерживает пакетное потребление сообщений, начиная с версии 1.1.Официальный пример выглядит следующим образом:

На самом деле я уже настроил соответствующую конфигурацию в KafkaConfig выше, то есть:

    /**
     * ConsumerFactory
     * @return
     */
    @Bean
    public ConsumerFactory<Object, Object> consumerFactory(){
        Map<String, Object> configs = new HashMap<String, Object>(); //参数
        configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, consumerBootstrapServers);
        configs.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
        configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, consumerEnableAutoCommit);
        configs.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, consumerAutoCommitIntervalMs);
        configs.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, consumerSessionTimeoutMs);
        configs.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, consumerMaxPollRecords); //批量消费数量
        configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, consumerAutoOffsetReset);
        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
//        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
//        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ObjectDeserializer.class);
        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,ObjectDeserializer.class); //需要把原来的消息删掉,不然会出现反序列化失败的问题
        
        return new DefaultKafkaConsumerFactory<Object, Object>(configs);
    }
    
    /**
     * 添加KafkaListenerContainerFactory,用于批量消费消息
     * @return
     */
    @Bean
    public KafkaListenerContainerFactory<?> batchContainerFactory(){
        ConcurrentKafkaListenerContainerFactory<Object, Object> containerFactory = new ConcurrentKafkaListenerContainerFactory<Object, Object>();
        containerFactory.setConsumerFactory(consumerFactory());
        containerFactory.setConcurrency(4);
        containerFactory.setBatchListener(true); //批量消费
        containerFactory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
        
        return containerFactory;
    }

Затем добавьте новый метод в приведенный выше GroupListener1.java для тестирования:

    @KafkaListener(topics={"topic-test"},groupId="group1",containerFactory="batchContainerFactory")
    public void listenTopic1(List<String> data){
        System.out.println("Group1收到消息:" + data);
        logger.info(MessageFormat.format("Group1收到消息:{0}", data));
    }

Окончательные результаты испытаний следующие:

Ссылаться на: