Обзор Spring Boot Kafka, настройка и элегантная реализация публикации и подписки

Spring Boot

Эта статья принадлежит к оригиналу, перепечатайте и укажите источник, добро пожаловать, чтобы обратить внимание на апплет WeChat小白AI博客Публичный аккаунт WeChat小白AIили сайтxiaobaiai.net

[TOC]

1. Введение

Содержание этой статьи очень всеобъемлющее, очень длинное и очень подробное! Не волнуйтесь, не торопитесь! Я закончил писать, я думаю, вы обязательно сможете прочитать его после прочтения.Если у вас есть какие-либо вопросы, вы можете общаться в любое время!

Содержание этой статьи очень всеобъемлющее, очень длинное и очень подробное! Не волнуйтесь, не торопитесь! Я закончил писать, я думаю, вы обязательно сможете прочитать его после прочтения.Если у вас есть какие-либо вопросы, вы можете общаться в любое время!

Содержание этой статьи очень всеобъемлющее, очень длинное и очень подробное! Не волнуйтесь, не торопитесь! Я закончил писать, я думаю, вы обязательно сможете прочитать его после прочтения.Если у вас есть какие-либо вопросы, вы можете общаться в любое время!

В этой статье в основном представлена ​​общая конфигурация Spring Kafka, автоматическое создание темы, публикация сообщений в кластерах, подписка на сообщения (группы), конфигурация потоковой обработки и встроенная конфигурация Kafka для тестирования. Наконец, есть два способа добиться публикации сообщений и функций подписки. , один из которых основан наSpring IntegrationСпособ. Содержание этой статьи основано наSpring Kafka2.3.3 ДокументацияиДокументация, связанная с Spring Boot Kafka, Spring создаетSpring kafka, который инкапсулирует клиентскую часть Apache kafka (производитель/потребитель/потоковая обработка и т. д.) для быстрой интеграции kafka в проекты Spring,Spring-KafkaВ проекте предусмотрена автоматическая настройка Apache Kafka, упрощенная настройка через Spring Boot (сspring.kafka.*в качестве параметра конфигурации префикса), использование Kafka с Spring Boot особенно просто. И Spring Boot также предоставляет встроенный брокер Kafka для тестирования.

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=myGroup

Для выполнения следующих функций требуется следующая среда:

  • Среда выполнения или среда разработки Java (JRE/JDK)
  • Кафка успешно установлена

Дополнительная конфигурация может относиться к《Kafka,ZK集群开发或部署环境搭建及实验》эта статья.

В этой статье мы пытаемся объяснить логику как можно яснее.Основной маршрут — глобальное представление основных функций и ключевых конфигураций Spring Kafka, в то время как Spring Boot еще больше упрощает настройку Spring Kafka, реализует функцию публикации и подписки с помощью нескольких аннотаций Kafka в Spring Boot и в то же время с помощью Spring Integration + Customize the Kafka метод конфигурации для достижения более сложной функции публикации и подписки Kafka.Эта статья охватила большую часть контента Spring Kafka с помощью моих собственных экспериментов и договоренностей в течение длительного времени. , Я надеюсь, что вы сможете терпеливо прочитать его и не стесняйтесь оставлять отзывы, если у вас есть какие-либо вопросы. Давайте учиться вместе.

2 Обзор возможностей Spring Kafka

Связи или совместимость версий Spring Kafka, Spring Integration и Kafka client (по состоянию на 9 декабря 2019 г.):

Spring for Apache Kafka Spring Integration for Apache Kafka Version kafka-clients
2.3.x 3.2.x 2.3.1
2.2.x 3.1.x 2.0.1, 2.1.x, 2.2.x
2.1.x 3.0.x 1.0.x, 1.1.x, 2.0.0
1.3.x 2.3.x 0.11.0.x, 1.0.x

Для более конкретных функций версии, пожалуйста, обратитесь к официальному сайту Последняя версия Spring Kafka — 2.3.4.

Аннотации, относящиеся к Spring Kafka, следующие:

Тип аннотации описывать
EnableKafka включеноAbstractListenerContainerFactoryКонечные точки аннотаций прослушивателя Kafka, созданные под прикрытием для классов конфигурации;
EnableKafkaStreams Включить компонент потоковой передачи Kafka по умолчанию
KafkaHandler В классе, аннотированном с помощью KafkaListener, аннотация, помечающая метод как цель прослушивателя сообщений Kafka.
KafkaListener Аннотация, помечающая метод как цель прослушивателя сообщений Kafka в указанной теме.
KafkaListeners Аннотации контейнера, объединяющие несколько аннотаций KafkaListener
PartitionOffset Используется для добавления информации о разделе/начальном смещении в KafkaListener.
TopicPartition Используется для добавления информации о теме/разделе в KafkaListener.

Используемый@EnableKafkaможет контролироватьAbstractListenerContainerFactoryПодкласс целевой конечной точки, такой какConcurrentKafkaListenerContainerFactoryдаAbstractKafkaListenerContainerFactoryподкласс .

public class ConcurrentKafkaListenerContainerFactory<K,V>
extends AbstractKafkaListenerContainerFactory<ConcurrentMessageListenerContainer<K,V>,K,V>
@Configuration
 @EnableKafka
 public class AppConfig {
        @Bean
        public ConcurrentKafkaListenerContainerFactory myKafkaListenerContainerFactory() {
                ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
                factory.setConsumerFactory(consumerFactory());
                factory.setConcurrency(4);
                return factory;
        }
        // other @Bean definitions
 }

@EnableKafkaНет необходимости включать Kafka в Spring Boot, Spring Boot поставляется с автоматической настройкой Spring Kafka, поэтому нет необходимости использовать явный@EnableKafka. Если вы хотите реализовать класс конфигурации Kafka самостоятельно, вам нужно добавить@EnableKafka, если вы не хотите автоконфигурации Kafka, как в тестах, все, что вам нужно сделать, это удалитьKafkaAutoConfiguration:

@SpringBootTest("spring.autoconfigure.exclude=org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration")

2.1 Автоматическое создание темы

💡 Чтобы создать тему при запуске приложения, вы можете добавитьNewTopicТип фасоли. Если тема уже существует, компонент игнорируется.

2.2 Отправка сообщения

ВеснаKafkaTemplateнастраивается автоматически, и вы можете автоматически связать его непосредственно с вашим собственным bean-компонентом, как показано в следующем примере:

@Component
public class MyBean {

    private final KafkaTemplate kafkaTemplate;

    @Autowired
    public MyBean(KafkaTemplate kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    // ...

}

KafkaTemplateОбертывает производителя и предоставляет удобные методы для отправки данных в темы kafka. Предоставляет асинхронные и синхронные (блокирующие отправку) методы, а также асинхронные (неблокирующие отправку) методы возврата.ListenableFuture, для отслеживания статуса асинхронной отправки, успеха или неудачи, KafkaTemplate предоставляет следующий интерфейс:

ListenableFuture<SendResult<K, V>> sendDefault(V data);
ListenableFuture<SendResult<K, V>> sendDefault(K key, V data);
ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data);
ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data);
ListenableFuture<SendResult<K, V>> send(String topic, V data);
ListenableFuture<SendResult<K, V>> send(String topic, K key, V data);
ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);
ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);
ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);
ListenableFuture<SendResult<K, V>> send(Message<?> message);
Map<MetricName, ? extends Metric> metrics();
List<PartitionInfo> partitionsFor(String topic);
<T> T execute(ProducerCallback<K, V, T> callback);
// Flush the producer.
void flush();
interface ProducerCallback<K, V, T> {
    T doInKafka(Producer<K, V> producer);
}

sendDefaultAPI требует, чтобы шаблону была предоставлена ​​тема по умолчанию. Некоторые API принимают временную метку в качестве параметра и сохраняют ее в записи. Способ хранения предоставленной пользователем временной метки зависит от типа временной метки, настроенного в теме Kafka. Если тема настроена на использованиеCREATE_TIME, регистрируется указанная пользователем временная метка (создается, если она не указана). Если тема настроена на использованиеLOG_APPEND_TIME, указанная пользователем временная метка игнорируется, и прокси-сервер добавит локальное время прокси-сервера.metricsиpartitionsForМетоды делегируются тем же методам базового производителя. Метод execute обеспечивает прямой доступ к базовому производителю.

Чтобы использовать шаблоны, настройте фабрику производителей и укажите ее в конструкторе шаблона. В следующем примере показано, как это сделать:

@Bean
public ProducerFactory<Integer, String> producerFactory() {
    return new DefaultKafkaProducerFactory<>(producerConfigs());
}

@Bean
public Map<String, Object> producerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    // See https://kafka.apache.org/documentation/#producerconfigs for more properties
    return props;
}

@Bean
public KafkaTemplate<Integer, String> kafkaTemplate() {
    // KafkaTemplate构造函数中输入生产者工厂配置
    return new KafkaTemplate<Integer, String>(producerFactory());
}

Затем, чтобы использовать шаблон, вы можете вызвать один из его методов для отправки сообщения.

когда вы используете включитьMessage<?>Когда метод параметра, тема, раздел и информация о ключе предоставляются в заголовке сообщения со следующими подпунктами:

KafkaHeaders.TOPIC
KafkaHeaders.PARTITION_ID
KafkaHeaders.MESSAGE_KEY
KafkaHeaders.TIMESTAMP

Например, чтобы получить доступ к определенной части информации в информации заголовка:

public void handleMessage(Message<?> message) throws MessagingException {
    LOGGER.debug("===Received Msg Topic: {}",  message.getHeaders().get(KafkaHeaders.TOPIC));
}

Необязательная функция заключается в том, что вы можете использоватьProducerListenerнастроитьKafkaTemplate, чтобы получить асинхронный обратный вызов с отправленным результатом (успех или сбой) вместо ожидания завершения будущего. Следующий список показываетProducerListenerОпределение интерфейса:

public interface ProducerListener<K, V> {
    void onSuccess(String topic, Integer partition, K key, V value, RecordMetadata recordMetadata);
    void onError(String topic, Integer partition, K key, V value, Exception exception);
    boolean isInterestedInSuccess();
}

По умолчанию конфигурация шаблона имеетLoggingProducerListener, который регистрирует только ошибки и ничего не делает, если отправка прошла успешно. только тогда, когдаisInterestedInSuccessВызывается только тогда, когда он возвращает trueonSuccess. Для удобства, если вы хотите реализовать только один из методов, будет предоставлена ​​абстракцияProducerListenerAdapter. заisInterestedInSuccess, он возвращает ложь. Асинхронный обратный вызов результата показан ниже:

public void sendMessage(String msg) {
    LOGGER.info("===Producing message[{}]: {}", mTopic, msg);       
    ListenableFuture<SendResult<String, String>> future = mKafkaTemplate.send(mTopic, msg);
    future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
        @Override
        public void onSuccess(SendResult<String, String> result) {
            LOGGER.info("===Producing message success");  
        }

        @Override
        public void onFailure(Throwable ex) {
            LOGGER.info("===Producing message failed");  
        }

    });
}

Если вы хотите, чтобы блокирующий поток отправки ждал результата, вы можете вызватьfutureизget()метод. вы можете позвонить перед ожиданиемflush(), или для удобства шаблон имеетautoFlushконструктор параметра, который приводит к шаблону каждый раз, когда он отправляетсяflush(). Однако имейте в виду, что обновление может значительно снизить производительность:

public void sendToKafka(final MyOutputData data) {
    final ProducerRecord<String, String> record = createRecord(data);

    try {
        template.send(record).get(10, TimeUnit.SECONDS);
        handleSuccess(data);
    }
    catch (ExecutionException e) {
        handleFailure(data, record, e.getCause());
    }
    catch (TimeoutException | InterruptedException e) {
        handleFailure(data, record, e);
    }
}

Используйте DefaultKafkaProducerFactory:

использовать как указано вышеKafkaTemplateПоказано в,ProducerFactoryИспользуется для создания производителя. По умолчанию, когда транзакции не используются,DefaultKafkaProducerFactoryсоздаст одиночный производитель для использования всеми клиентами, напримерKafkaProducerкак предложено в javadocs. Однако если для шаблона вызывается flush(), это может вызвать задержки в других потоках, использующих того же производителя. Начиная с версии 2.3,DefaultKafkaProducerFactoryимеет новое свойствоproducerPerThread. когда установлено наtrue, фабрика создаст (и кэширует) отдельного производителя для каждого потока, чтобы избежать этой проблемы.

когдаproducerPerThreadПри значении true пользовательский код должен вызывать фабрику, когда производитель больше не нужен.closeThreadBoundProducer(). Это фактически закроет производителя и удалит его изThreadLocalудалено в. Вызов reset() или destroy() не очистит этих производителей.

СоздайтеDefaultKafkaProducerFactory, классы сериализатора ключей и/или значений можно получить из конфигурации, вызвав конструктор, который принимает только карту свойств (см. пример в разделе Использование KafkaTemplate), или экземпляр сериализатора можно передать вDefaultKafkaProducerFactoryКонструктор (в этом случае все производители используют один и тот же экземпляр). В качестве альтернативы можно предоставитьSupplier<Serializer> s(начиная с версии 2.3) для получения отдельного для каждого производителяSerializerПример:

@Bean
public ProducerFactory<Integer, CustomValue> producerFactory() {
    return new DefaultKafkaProducerFactory<>(producerConfigs(), null, () -> new CustomValueSerializer());
}

@Bean
public KafkaTemplate<Integer, CustomValue> kafkaTemplate() {
    return new KafkaTemplate<Integer, CustomValue>(producerFactory());
}

Используя ReplyingKafkaTemplate:

Версия2.1.3представилKafkaTemplateПодкласс для предоставления семантики запроса/ответа. Этот класс называетсяReplyingKafkaTemplate, и имеет метод (помимо тех, что в суперклассе). В следующем списке показаны сигнатуры методов:

RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record);
RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record,
    Duration replyTimeout);

результатListenableFuture, который заполняется асинхронно с результатом (или с исключением по тайм-ауту). Результат другойsendFutureсобственность, которая называетсяKafkaTemplate.send()результат. Вы можете использовать это Future для определения результата операции отправки. Здесь он не будет расширяться.

2.3 Получение сообщений

можно настроитьMessageListenerContainerи предоставить прослушиватель сообщений или использовать@KafkaListenerАннотация для получения сообщений.

2.3.1 Прослушиватель сообщений

При использовании контейнера прослушивателя сообщений необходимо предоставить прослушиватель для получения данных. В настоящее время существует восемь интерфейсов, поддерживаемых прослушивателями сообщений. В следующем списке показаны эти интерфейсы:

// 使用自动提交或容器管理的提交方法之一时,使用此接口处理从Kafka 消费者 poll() 作接收的单个ConsumerRecord实例
public interface MessageListener<K, V> {
    void onMessage(ConsumerRecord<K, V> data);
}

// 使用手动提交方法之一时,使用此接口处理从Kafka 消费者 poll() 操作接收的单个ConsumerRecord实例
public interface AcknowledgingMessageListener<K, V> {
    void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment);
}

// 使用自动提交或容器管理的提交方法之一时,使用此接口处理从Kafka 消费者 poll() 操作接收的单个ConsumerRecord实例。提供对消费者对象的访问。
public interface ConsumerAwareMessageListener<K, V> extends MessageListener<K, V> { 
    void onMessage(ConsumerRecord<K, V> data, Consumer<?, ?> consumer);
}

// 使用手动提交方法之一时,使用此接口处理从Kafka 消费者 poll() 操作接收的单个ConsumerRecord实例。提供对消费者对象的访问。
public interface AcknowledgingConsumerAwareMessageListener<K, V> extends MessageListener<K, V> { 
    void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer);
}

// 使用自动提交或容器管理的提交方法之一时,使用此接口处理从Kafka 消费者 poll() 操作接收的所有ConsumerRecord实例。使用此接口时不支持AckMode.RECORD,因为监听器已获得完整的批处理。
public interface BatchMessageListener<K, V> { 
    void onMessage(List<ConsumerRecord<K, V>> data);
}

// 使用手动提交方法之一时,使用此接口处理从Kafka 消费者 poll() 操作接收的所有ConsumerRecord实例。
public interface BatchAcknowledgingMessageListener<K, V> { 
    void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment);
}

// 使用自动提交或容器管理的提交方法之一时,使用此接口处理从Kafka 消费者 poll() 操作接收的所有ConsumerRecord实例。使用此接口时不支持AckMode.RECORD,因为监听器已获得完整的批处理。提供对使用者对象的访问。
public interface BatchConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> { 
    void onMessage(List<ConsumerRecord<K, V>> data, Consumer<?, ?> consumer);
}

// 使用手动提交方法之一时,使用此接口处理从Kafka 消费者 poll() 操作接收的所有ConsumerRecord实例。提供对使用者对象的访问。
public interface BatchAcknowledgingConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> { 
    void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer);
}

Приведенный выше потребительский объект не является потокобезопасным. Его методы можно вызывать только в том потоке, который вызвал слушателя.

2.3.1.1 Контейнер прослушивателя сообщений

обеспечивает дваMessageListenerContainerРеализация:

  • KafkaMessageListenerContainer
  • ConcurrentMessageListenerContainer

KafkaMessageListenerContainerПолучать все сообщения из всех разделов или разделов в одном потоке (т. е. раздел может быть назначен только одному потребителю, потребителю может быть назначено несколько разделов).ConcurrentMessageListenerContainerделегировать одному или несколькимKafkaMessageListenerContainerЭкземпляры для обеспечения многопоточного использования для обработки всех сообщений темы или раздела из нескольких потоков.

Начиная с версии Spring Kafka 2.2.7, вы можете добавитьRecordInterceptorДобавляется в контейнер прослушивателя; он будет вызываться перед вызовом прослушивателя, чтобы разрешить проверку или изменение записей. Если перехватчик возвращает null, прослушиватель не вызывается. Прослушиватель не вызывается, если прослушиватель является пакетным прослушивателем. Начиная с версии 2.3,CompositeRecordInterceptorМожет использоваться для вызова нескольких перехватчиков.

По умолчанию при использовании транзакций прослушиватель вызывается после запуска транзакции. Начиная с версии 2.3.4 вы можете установить контейнер прослушивателяinterceptBeforeTxсвойство для вызова прослушивателя до начала транзакции. Для пакетных слушателей не предусмотрены прослушиватели, поскольку Kafka уже предоставляетConsumerInterceptor.

2.3.1.2 Использование KafkaMessageListenerContainer

Доступны следующие конструкторы:

public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
                    ContainerProperties containerProperties)
public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
                    ContainerProperties containerProperties,
                    TopicPartitionOffset... topicPartitions)

получить один из каждогоConsumerFactoryи информацию о темах и разделах, а такжеContainerPropertiesДополнительная настройка в объекте.ConcurrentMessageListenerContainer(Подробнее об этом позже) Распределить по экземплярам потребителя с помощью второго конструктораTopicPartitionOffset.ContainerPropertiesИмеет следующий конструктор:

public ContainerProperties(TopicPartitionOffset... topicPartitions)
public ContainerProperties(String... topics)
public ContainerProperties(Pattern topicPattern)

Первый конструктор принимаетTopicPartitionOffsetМассив аргументов для явного указания, какие разделы контейнер хочет использовать (используя метод assign() потребителя), и необязательное начальное смещение. По умолчанию положительные значения являются абсолютными смещениями. По умолчанию отрицательные значения относятся к текущему последнему смещению внутри раздела. при условииTopicPartitionOffsetконструктор, который принимает дополнительный логический параметр. Если true, начальное смещение (положительное или отрицательное) относится к текущему положению этого потребителя. Смещение применяется при запуске контейнера. Второй — это массив тем, на которых основана Кафка.group.idСвойства: Распределить разделы в группе для назначения разделов. Третий использует регулярные выражения для выбора тем.

кMessageListenerНазначается контейнеру, может использоваться при создании контейнераContainerProps.setMessageListenerметод. В следующем примере показано, как это сделать:

ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
containerProps.setMessageListener(new MessageListener<Integer, String>() {
    ...
});
DefaultKafkaConsumerFactory<Integer, String> cf =
                        new DefaultKafkaConsumerFactory<>(consumerProps());
KafkaMessageListenerContainer<Integer, String> container =
                        new KafkaMessageListenerContainer<>(cf, containerProps);
return container;

Обратите внимание, что при созданииDefaultkafkafkaconsumerfactoryПри использовании конструктора конструктор основывается только на его характеристиках, а это означает, что категория Deserializer для ключа/значения получается из конфигурации. В качестве альтернативы экземпляр десериализатора может быть передан ключу/значениюDefaultKafkaConsumerFactoryКонструктор, в этом случае все потребители используют один и тот же экземпляр. Другой вариант — предоставитьSupplier<Deserializer>s(начиная с версии 2.3), чтобы получить отдельный экземпляр десериализатора для каждого потребителя:

DefaultKafkaConsumerFactory<Integer, CustomValue> cf =
                        new DefaultKafkaConsumerFactory<>(consumerProps(), null, () -> new CustomValueDeserializer());
KafkaMessageListenerContainer<Integer, String> container =
                        new KafkaMessageListenerContainer<>(cf, containerProps);
return container;

См. Javadoc для получения дополнительной информации о различных свойствах, которые можно установить.ContainerProperties.

Начиная с версии Spring Kafka 2.1.1, файл с именемlogContainerConfigДоступны новые свойства. Если включено ведение журнала INFO, каждый контейнер прослушивателя записывает сообщение журнала, в котором обобщаются его свойства конфигурации.

Например, чтобы изменить уровень журнала на INFO, вы можете использоватьcontainerProperties.setCommitLogLevel(LogIfLevelEnabled.level.INFO).

Начиная с версии Spring Kafka 2.2, дополнение под названиемmissingtopicsfailalновое свойство контейнера (по умолчанию: true). Это предотвратит запуск контейнера, если ни один клиент не опубликует или не подпишется на тему, для которой он существует в брокере. Неприменимо, если контейнер настроен на прослушивание шаблона темы (регулярное выражение). Раньше контейнерные потоки былиconsumer.poll()Метод зацикливается и ожидает появления темы, когда регистрируется много сообщений. Кроме журналов, нет никаких указаний на проблему. Чтобы восстановить предыдущее поведение, вы можете установить свойство в false.В это время Брокер устанавливает элемент allow.auto.create.topics=true, а свойство контейнера равно false, он автоматически создаст несуществующую тему. .

2.3.1.3 Использование ConcurrentMessageListenerContainer

Одиночный конструктор аналогичен первомуKafkaListenerContainerКонструктор. В следующем списке показана подпись конструктора:

public ConcurrentMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
                            ContainerProperties containerProperties)

Он также имеет свойство параллелизма. Например,container.setConcurrency(3)означает создание трехKafkaMessageListenerContainerпример. Для первого конструктора Kafka использует свои возможности управления группами для распределения разделов между потребителями.

При прослушивании нескольких тем распределение разделов по умолчанию может отличаться от ожидаемого. Например, если у вас есть три темы с пятью разделами в каждой и вы хотите использоватьconcurrency=15, то вы видите только пять активных потребителей, каждому из которых назначен раздел из каждой темы, а остальные десять потребителей простаивают. Это потому, что Kafka по умолчаниюPartitionAssignorдаRangeAssignor(см. его Javadoc). В этом случае вы можете рассмотреть возможность использованияRoundRobinAssignor, который распределяет раздел среди всех потребителей. Затем назначьте каждому потребителю тему или раздел. ИзменитьPartitionAssignor, вы можете предоставитьDefaultKafkaConsumerFactoryустановить в свойствахpartition.assignment.strategyПараметры конфигурации потребителя (ConsumerConfigs.PARTITION_ASSIGNMENT_STRATEGY_CONFIG).

При использовании Spring Boot стратегию настроек можно назначить следующим образом:

spring.kafka.consumer.properties.partition.assignment.strategy=\
org.apache.kafka.clients.consumer.RoundRobinAssignor

Для второго конструктораConcurrentMessageListenerContainerбудетTopicPartitionЭкземпляры распределяются в делегатеKafkaMessageListenerContainerна экземпляре.

Например, если предоставлено шестьTopicPartitionэкземпляр с параллелизмом 3; каждый контейнер получает два раздела. на пятьTopicPartitionНапример, два контейнера получают два раздела, а третий контейнер получает один раздел. Если параллелизм больше, чемTopicPartitions, параллелизм снижается, так что каждый контейнер получает один раздел. Способ настройки раздела может использовать инструмент командной строкиkafka-topics.shЗапрос и настройка количества разделов по теме. Вы также можете добавитьNewTopicBean, если число, установленное NewTopic, больше текущего числа, автоматическая настройка весенней загрузкиKafkaAdminПерегородка будет скорректирована вверх.

Свойство client.id (если установлено) будет добавлено-n, где n — экземпляр потребителя, соответствующий параллелизму. Это необходимо для предоставления уникальных имен для MBean-компонентов при включенном JMX.

Начиная с версии Spring Kafka 1.3,MessageListenerContainerобеспечивает основуKafkaConsumerметрический доступ. заConcurrentMessageListenerContainer,metrics()метод возвращает все целиKafkaMessageListenerContainerПоказатели экземпляра. судя по днуKafkaConsumerкоторый предоставилclient-idМетрики сгруппированы вMap<MetricName, ?extends Metric>.

Начиная с версии 2.3,ContainerPropertiesпредоставилidleBetweenPollsопция, позволяющая запускать основной цикл в контейнере прослушивателяKafkaConsumer.poll()Спите между звонками. Выберите фактический интервал ожидания в качестве минимального значения из предоставленных вариантов и выберитеmax.poll.interval.msРазница между конфигурацией потребителя и текущим временем записи пакета.

2.3.1.4 Фиксация смещения

Предусмотрено несколько вариантов фиксации смещений. еслиenable.auto.commitСвойство пользователяtrue, Kafka автоматически зафиксирует смещение в соответствии со своей конфигурацией. еслиfalse, контейнер поддерживает несколькоAckModeНастройки (описаны в следующем списке). Режим подтверждения по умолчанию — пакетный. Начиная с версии 2.3, фреймворк будетenable.auto.commitУстановить какfalse, если это явно не указано в конфигурации. Ранее, если свойство не было установлено, использовалось значение Kafka по умолчанию (true). потребительpoll()метод возвращает один или несколькоConsumerRecords. вызывается для каждой записиMessageListener. В следующем списке описывается контейнер для каждогоAckModeДействие предпринято:

  • RECORD: фиксирует смещение, когда прослушиватель возвращается после обработки записи.
  • ПАРТИЯ: сделаноpoll()Фиксировать смещение после возврата всех записей.
  • ВРЕМЯ: после обработкиpoll()Фиксировать смещения после возврата всех записей, если они превышают смещение фиксации с момента последней фиксацииackTime
  • COUNT: после обработкиpoll()Все записи, возвращаемые после смещения фиксации, если они были получены с момента последней фиксацииackCountзаписывать.
  • COUNT_TIME: похоже наTIMEиCOUNT, но если оба условия истинны, фиксация выполняется.
  • РУКОВОДСТВО: Слушатель сообщений отвечает заacknowledge()иAcknowledgment. После этого применяется та же семантика, что и для BATCH.
  • MANUAL_IMMEDIATE: вызов слушателяAcknowledgement.acknowledge()Метод немедленно фиксирует смещение.

MANUAL и MANUAL_IMMEDIATE требуют, чтобы прослушиватель былAcknowledgingMessageListenerилиBatchAcknowledgingMessageListener. См. прослушиватель сообщений.

в соответствии сsyncCommitsсвойства контейнера, используйтеcommitSync()илиcommitAsync()метод. по умолчанию,syncCommitsверно; см. такжеsetSyncCommitTimeout. видетьsetCommitCallbackчтобы получить результат асинхронной фиксации; обратный вызов по умолчаниюLoggingCommitCallback, который регистрирует ошибки (и успехи на уровне отладки).

Поскольку контейнер прослушивателя имеет собственный механизм фиксации смещений, он ожидает, что KafkaConsumerConfig.ENABLE_AUTO_COMMIT_CONFIGявляется ложным. Начиная с версии 2.3, он будет безоговорочно устанавливать значение false, если это специально не установлено в фабрике-потребителе или переопределении потребительских свойств контейнера.

AcknowledgmentСуществуют следующие методы:

public interface Acknowledgment {
    void acknowledge();
}

Этот метод позволяет слушателю контролировать, когда фиксируется смещение.

Начиная с версии 2.3, интерфейс подтверждения имеет два дополнительных метода.nack(long sleep)иnack(int index, long sleep). Первый предназначен для прослушивания журналов, а второй — для пакетных прослушивателей. Вызов неправильного метода для типа прослушивателя вызоветIllegalStateException.

nack() можно вызывать только в потоке-потребителе, вызвавшем прослушиватель.

При использовании пакетных прослушивателей индекс может быть указан в неудачном пакете. перечислитьnack(), смещение записи будет зафиксировано перед индексированием и поиском в разделе ошибочных и отброшенных записей, чтобы в следующий разpoll()при повторном прохождении этих смещений. верноSeekToCurrentBatchErrorHandlerулучшение,SeekToCurrentBatchErrorHandlerТолько всю партию можно найти для повторной доставки.

Примечание. При использовании назначения разделов с управлением группами убедитесь, что параметр сна (плюс время, затраченное на обработку последней опрашиваемой записи) меньше, чемconsumer max.poll.interval.msАтрибуты очень важны.

2.3.1.5 Автозапуск контейнера прослушивателя и запуск вручную

Контейнер прослушивателя реализуетSmartLifecycle(пройти черезSmartLifecycleПосле того, как Spring загружает и инициализирует все bean-компоненты, затем выполняет некоторые задачи или запускает необходимые асинхронные службы), по умолчаниюautoStartupзаtrue. Контейнер запускается позже (Integer.MAX-VALUE - 100). выполнитьSmartLifecycleДругие компоненты для обработки данных от слушателя должны запускаться на более ранней стадии.-100Оставляет место для более поздних этапов, чтобы компоненты могли запускаться автоматически после контейнера. Например, мы проходим@BeanПередайте контейнер слушателя руководству Spring, на этот раз черезSmartLifecycleЗадача инициализации выполняется автоматически, но когда мы вручную передаем новый экземпляр контейнера прослушивателя, пост-инициализация не будет выполняться, напримерKafkaMessageListenerContainerЭкземпляры необходимо запускать вручнуюstart().

autoStartupУстановка true и false при ручном запуске start не имеет никакого эффекта.@KafkaListenerЭтот подраздел Управления Циклом Декларации.

2.3.2 Аннотация @KafkaListener

2.3.2.1 Record Listeners

@KafkaListenerАннотация используется для обозначения метода компонента в качестве прослушивателя для контейнера прослушивателя. фасоль завернута вMessagingMessageListenerAdapter, адаптер конфигурируется с различными функциями, такими как преобразователи, которые преобразуют данные (при необходимости) в соответствии с параметрами метода. Используя заполнители свойств (${…}), или вы можете использовать SpEL (#{…​}) для большинства свойств в аннотациях конфигурации. См. Javadoc для получения дополнительной информации.

@KafkaListener:

  • id: уникальный идентификатор слушателя. Если GroupId не настроен, автоматически генерируется идентификатор по умолчанию. После указания этого значения идентификатор группы будет перезаписан.
  • containerFactory: как упоминалось выше, @KafkaListener нужно только настроить свойство containerFactory аннотации, чтобы различать потребление отдельных данных или нескольких данных, что настраивает прослушивающую фабрику контейнеров, котораяConcurrentKafkaListenerContainerFactory, настройте имя компонента
  • topics: Тема для мониторинга, может отслеживать несколько тем, может быть выражением или ключевыми словами-заполнителями или прямыми именами тем, например, мониторинг нескольких тем:{"topic1" , "topic2"}
  • topicPattern: Тематический режим для этого слушателя. Записи могут быть «Тематические шаблоны», «Ключи-заполнители атрибутов» или «Выражения». Платформа создаст контейнер, который подписывается на все темы, соответствующие указанному шаблону для динамически выделяемых разделов. Сопоставление шаблонов будет периодически выполняться по темам, которые существуют на момент проверки. Выражение должно разрешаться в шаблон темы (поддерживаются строковые или шаблонные типы результатов). При этом используется управление группами, и Kafka назначит разделы членам группы.
  • topicPartitions: для использования ручного назначения темы/раздела
  • errorHandler: прослушивание обработчика исключений, настройка имени компонента, по умолчанию пусто
  • groupId: идентификатор группы потребителей
  • idIsGroup: Является ли идентификатор GroupId
  • clientIdPrefix: Префикс идентификатора потребителя
  • beanRef: имя компонента реального прослушивающего контейнера, вам нужно добавить «__» перед именем компонента.

@KafkaListenerАннотации предоставляют механизм для простых слушателей POJO. В следующем примере показано, как его использовать:

public class Listener {
    @KafkaListener(id = "foo", topics = "myTopic", clientIdPrefix = "myClientId")
    public void listen(String data) {
        ...
    }
}

Этот механизм действует@Configurationна одном из занятий@EnableKafkaАннотации и основы настройкиConcurrentMessageListenerContainerКонтейнерная фабрика слушателей. По умолчанию он должен называтьсяkafkaListenerContainerFactoryбоб. В следующем примере показано, как использоватьConcurrentMessageListenerContain:

@Configuration
@EnableKafka
public class KafkaConfig {

    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
                        kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(3);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }

    @Bean
    public ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
        ...
        return props;
    }
}

Обратите внимание, что для установки свойств контейнера необходимо использовать на фабрикеgetContainerProperties()метод. Он используется в качестве шаблона для внедрения фактических свойств контейнера.

Начиная с версии 2.1.1 теперь можно настроить потребителя для создания аннотаций.client.idАтрибуты.clientdprefixСуффикс-n, где n — целое число, представляющее номер контейнера при использовании параллелизма.

Начиная с версии 2.2, теперь можно переопределить свойства параллелизма и автозапуска фабрики контейнеров, используя свойства самой аннотации. Свойства могут быть простыми значениями, заполнителями свойств или выражениями SpEL. В следующем примере показано, как это сделать:

@KafkaListener(id = "myListener", topics = "myTopic",
        autoStartup = "${listen.auto.start:true}", concurrency = "${listen.concurrency:3}")
public void listen(String data) {
    ...
}

Вы также можете настроить прослушиватели POJO с явными темами и разделами (и необязательными начальными смещениями). В следующем примере показано, как это сделать:

@KafkaListener(id = "thing2", topicPartitions =
        { @TopicPartition(topic = "topic1", partitions = { "0", "1" }),
          @TopicPartition(topic = "topic2", partitions = "0",
             partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
        })
public void listen(ConsumerRecord<?, ?> record) {
    ...
}

ты сможешьpartitionsилиpartitionOffsetsКаждый раздел указан в атрибуте, но не оба одновременно.

использовать руководствоAckMode, вы также можете предоставить слушателюAcknowledgment. В приведенном ниже примере также показано, как использовать различные фабрики контейнеров:

@KafkaListener(id = "cat", topics = "myTopic",
          containerFactory = "kafkaManualAckListenerContainerFactory")
public void listen(String data, Acknowledgment ack) {
    ...
    ack.acknowledge();
}

Наконец, метаданные о сообщении можно получить из заголовка сообщения. Вы можете получить содержимое заголовка сообщения, используя следующие имена заголовков:

KafkaHeaders.OFFSET
KafkaHeaders.RECEIVED_MESSAGE_KEY
KafkaHeaders.RECEIVED_TOPIC
KafkaHeaders.RECEIVED_PARTITION_ID
KafkaHeaders.RECEIVED_TIMESTAMP
KafkaHeaders.TIMESTAMP_TYPE

Пример:

@KafkaListener(id = "qux", topicPattern = "myTopic1")
public void listen(@Payload String foo,
        @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) Integer key,
        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
        @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
        @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts
        ) {
    ...
}
2.3.2.2 Пакетный прослушиватель

Начиная с версии 1.1 можно настроить@KafkaListenerспособ получения всего пакета потребительских записей, полученных от потребителя. Чтобы настроить фабрику контейнеров прослушивателя для создания пакетных прослушивателей, вы можете установитьbatchListenerАтрибуты. В следующем примере показано, как это сделать:

@Bean
public KafkaListenerContainerFactory<?, ?> batchFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setBatchListener(true);
    return factory;
}

В следующем примере показано, как получить список полезных данных:

@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<String> list) {
    ...
}

Темы, разделы, смещения и т.д. доступны в шапке параллельно полезной нагрузке. В следующем примере показано, как использовать заголовки:

@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<String> list,
        @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) List<Integer> keys,
        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
        @Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
        @Header(KafkaHeaders.OFFSET) List<Long> offsets) {
    ...
}

Кроме того, вы можете получить список сообщенийMessage<?>объект, содержащий каждое смещение и другие детали в каждом сообщении, но он должен быть единственным параметром (за исключением Подтверждения и Подтверждения при использовании ручных коммитов)/илиConsumer<?, ?>параметр). В следующем примере показано, как это сделать:

@KafkaListener(id = "listMsg", topics = "myTopic", containerFactory = "batchFactory")
public void listen14(List<Message<?>> list) {
    ...
}

@KafkaListener(id = "listMsgAck", topics = "myTopic", containerFactory = "batchFactory")
public void listen15(List<Message<?>> list, Acknowledgment ack) {
    ...
}

@KafkaListener(id = "listMsgAckConsumer", topics = "myTopic", containerFactory = "batchFactory")
public void listen16(List<Message<?>> list, Acknowledgment ack, Consumer<?, ?> consumer) {
    ...
}

В этом случае преобразование полезной нагрузки не выполняется. еслиBatchMessagingMessageConverterнастроенRecordMessageConverter, вы также можете добавить универсальный тип к параметру сообщения и преобразовать полезную нагрузку. Дополнительные сведения см. в разделе Преобразование полезной нагрузки с помощью пакетных прослушивателей.

вы также можете получитьConsumerRecord<?, ?>объект, но он должен быть единственным параметром (при использовании ручной отправки илиConsumer<?, ?>параметр, за исключением необязательного подтверждения). В следующем примере показано, как это сделать:

@KafkaListener(id = "listCRs", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<ConsumerRecord<Integer, String>> list) {
    ...
}

@KafkaListener(id = "listCRsAck", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<ConsumerRecord<Integer, String>> list, Acknowledgment ack) {
    ...
}

Начиная с версии 2.2 слушатель может получатьpoll()Метод возвращает полныйConsumerRecords<?, ?>объект, который позволяет слушателям получать доступ к другим методам, таким какpartitions()(возвращаетTopicPartitionэкземпляр) иrecords(TopicPartition) (Получить выборочные записи). Опять же, это должен быть единственный параметр (при использовании ручной фиксации илиConsumer<?, ?>параметр, за исключением необязательного подтверждения). В следующем примере показано, как это сделать:

@KafkaListener(id = "pollResults", topics = "myTopic", containerFactory = "batchFactory")
public void pollResults(ConsumerRecords<?, ?> records) {
    ...
}

2.3.3 Проверка @KafkaListener@Payload

Начиная с версии 2.2 теперь проще добавить валидатор для проверки@KafkaListener``@Payloadпараметр. Раньше вам приходилось настраиватьDefaultMessageHandlerMethodFactoryи добавить в регистратор. Теперь можно добавить валидатор в сам регистратор. Следующий код иллюстрирует, как это сделать:

@Configuration
@EnableKafka
public class Config implements KafkaListenerConfigurer {

    ...

    @Override
    public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
      registrar.setValidator(new MyValidator());
    }
}

Когда вы используете Spring Bootvalidation starter, который автоматически настраиваетсяLocalValidatorFactoryBean, как показано в следующем примере:

@Configuration
@EnableKafka
public class Config implements KafkaListenerConfigurer {

    @Autowired
    private LocalValidatorFactoryBean validator;
    ...

    @Override
    public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
      registrar.setValidator(this.validator);
    }
}

В следующем примере показано, как проверить:

public static class ValidatedClass {

  @Max(10)
  private int bar;

  public int getBar() {
    return this.bar;
  }

  public void setBar(int bar) {
    this.bar = bar;
  }

}
@KafkaListener(id="validated", topics = "annotated35", errorHandler = "validationErrorHandler",
      containerFactory = "kafkaJsonListenerContainerFactory")
public void validatedListener(@Payload @Valid ValidatedClass val) {
    ...
}

@Bean
public KafkaListenerErrorHandler validationErrorHandler() {
    return (m, e) -> {
        ...
    };
}

2.3.4 Слушатели перебалансировки

ContainerPropertiesEстьconsumerRebalanceListenerсвойство, которое принимает клиент KafkaconsumerRebalanceListeneРеализация интерфейса r. Если это свойство не указано, контейнер настроит прослушиватель журнала, который будет регистрировать события перебалансировки на информационном уровне. Фреймворк также добавляет субинтерфейсConsumerRawareRebalanceListener. Следующий список показываетConsumerRawareRebalanceListenerОпределение интерфейса:

public interface ConsumerAwareRebalanceListener extends ConsumerRebalanceListener {
    void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
    void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
    void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
}

2.3.5 Пересылка сообщений прослушивателя

Начиная с версии 2.0, если также используется@SendToАннотации Примечания@KafkaListener, а вызов метода возвращает результат, результат пересылается@SendToуказанный предмет. как:

@KafkaListener(topics = "annotated21")
@SendTo("!{request.value()}") // runtime SpEL
public String replyingListener(String in) {
    ...
}

@KafkaListener(topics = "${some.property:annotated22}")
@SendTo("#{myBean.replyTopic}") // config time SpEL
public Collection<String> replyingBatchListener(List<String> in) {
    ...
}

@KafkaListener(topics = "annotated23", errorHandler = "replyErrorHandler")
@SendTo("annotated23reply") // static reply topic definition
public String replyingListenerWithErrorHandler(String in) {
    ...
}
...
@KafkaListener(topics = "annotated25")
@SendTo("annotated25reply1")
public class MultiListenerSendTo {

    @KafkaHandler
    public String foo(String in) {
        ...
    }

    @KafkaHandler
    @SendTo("!{'annotated25reply2'}")
    public String bar(@Payload(required = false) KafkaNull nul,
            @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) int key) {
        ...
    }

}

2.3.6 Управление жизненным циклом @KafkaListener

за@KafkaListenerКонтейнер прослушивателя, созданный аннотацией, не является компонентом в контексте приложения. Вместо этого они используютKafkaListenerEndpointRegistryТип зарегистрированного компонента инфраструктуры. Этот bean-компонент автоматически объявляется фреймворком и управляет жизненным циклом контейнера; он автоматически запускает любойautoStartupУстановить какtrueконтейнер. Все контейнеры, созданные всеми контейнерными фабриками, должны находиться в одномphase. Подробнее см.Прочитайте контейнер слушателя для автоматического запуска. Вы можете программно управлять жизненным циклом с помощью реестра. Запуск или остановка реестра запустит или остановит все зарегистрированные контейнеры. В качестве альтернативы ссылку на отдельный контейнер можно получить, используя его атрибут id. Можно установить в аннотацияхautoStartup, который переопределяет параметры по умолчанию, заданные в фабрике контейнеров (setAutoStartup(true)). Вы можете получить ссылку на bean-компонент из контекста приложения, такого как autowiring, для управления контейнером, в котором он зарегистрирован. В следующем примере показано, как это сделать:

@KafkaListener(id = "myContainer", topics = "myTopic", autoStartup = "false")
public void listen(...) { ... }
@Autowired
private KafkaListenerEndpointRegistry registry;

...

    this.registry.getListenerContainer("myContainer").start();

...

Реестр поддерживает только жизненный цикл контейнеров, которыми он управляет; контейнеры, объявленные как компоненты, не управляются реестром и могут быть получены из контекста приложения. позвонив в регистратуруgetListenerContainers()метод для получения коллекции управляемых контейнеров. В Spring Kafka версии 2.2.5 добавлен удобный методgetAllListenerContainers(), который возвращает коллекцию всех контейнеров, в том числе управляемых реестром и объявленных как bean-компоненты. Возвращаемая коллекция будет включать в себя все инициализированные bean-компоненты-прототипы, но не будет инициализировать никакие объявления отложенных bean-компонентов.

2.4 Потоковая обработка

Spring for Apache KafkaФабричный компонент предоставляется для созданияStreamsBuilderобъекты и управлять жизненным циклом их потоков. Пока поток kafka находится на пути к классам, а поток kafka проходит через@EnableKafkaStreamsКогда аннотация включена, Spring Boot автоматически настроит необходимыеKafkaStreamsConfigurationбобы.

Включение Kafka Streams означает необходимость установки идентификатора приложения и серверов начальной загрузки. Первый можно использоватьspring.kafka.streams.application-idconfig, если не задано, по умолчанию используетсяspring.application.name. Последнее может быть установлено глобально или переопределено специально для потоков.

С помощью выделенного свойства можно использовать несколько других свойств; можно использоватьspring.Kafka.streams.propertiesПространство имен устанавливает другие произвольные свойства Kafka. Чтобы получить больше информации,Additional Kafka Properties .

По умолчанию он создаетStreamBuilderПотоки, управляемые объектами, запустятся автоматически. можно использоватьspring.kafka.streams.auto-startupсвойства, чтобы настроить это поведение.

Чтобы использовать фабричный компонент, просто поместитеStreamsBuilderПодключен к@bean, как показано в следующем примере:

@Configuration(proxyBeanMethods = false)
@EnableKafkaStreams
public static class KafkaStreamsExampleConfiguration {

    @Bean
    public KStream<Integer, String> kStream(StreamsBuilder streamsBuilder) {
        KStream<Integer, String> stream = streamsBuilder.stream("ks1In");
        stream.map((k, v) -> new KeyValue<>(k, v.toUpperCase())).to("ks1Out",
                Produced.with(Serdes.Integer(), new JsonSerde<>()));
        return stream;
    }

}

По умолчанию он создаетStreamBuilderПотоки, управляемые объектами, запустятся автоматически. можно использоватьspring.kafka.streams.auto-startupсвойства, чтобы настроить это поведение.

2.5 Дополнительная конфигурация

Свойства, поддерживаемые автонастройкой, показаны наОбщие свойства приложениясередина. Обратите внимание, что в большинстве случаев эти свойства (стиль с дефисом или верблюжьим регистром) напрямую сопоставляются с точечными свойствами Apache Kafka. Для получения дополнительной информации см.Apache Kafkaдокументация.

Некоторые из упомянутых ранее свойств применяются ко всем компонентам (производителям, потребителям, администраторам и потокам), но их можно указать на уровне компонента, если вы хотите использовать другие значения. Apache Kafka определяет важность какHIGH,MEDIUMилиLOWхарактеристики. Автоматическая настройка Spring Boot поддерживает все свойства высокой важности, некоторые выбранные средние и низкие свойства, а также любые свойства без значений по умолчанию.

Только подмножество свойств, поддерживаемых Kafka, может быть передано черезKafkaPropertiesКласс используется напрямую, если вы хотите настроить производителя или потребителя с другими свойствами, которые не поддерживаются напрямую, используйте следующие свойства:

spring.kafka.properties.prop.one=first
spring.kafka.admin.properties.prop.two=second
spring.kafka.consumer.properties.prop.three=third
spring.kafka.producer.properties.prop.four=fourth
spring.kafka.streams.properties.prop.five=fifth

Приведенный выше пример настройки параметров будет общим.prop.oneСвойство Kafka установлено наfirst(относится к производителям, потребителям и администраторам),prop.twoСвойство администратора установлено наsecond,prop.threeпотребительская собственность устанавливается вthird,prop.fourСвойство производителя установлено вfourth,prop.fiveСвойство потоков установлено вfifth.

Вы также можете настроить Spring KafkaJsonDeserializer,Следующее:

spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.value.default.type=com.example.Invoice
spring.kafka.consumer.properties.spring.json.trusted.packages=com.example,org.acme

Так же можно отключитьJsonSerializerПоведение по умолчанию для отправки информации о типе в заголовках:

spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.properties.spring.json.add.type.headers=false

Примечание. Установленные таким образом свойства переопределяют любые элементы конфигурации, которые явно поддерживаются Spring Boot.

2.6 Тестирование со встроенной Kafka

Spring для Apache Kafka предоставляет удобный способ тестирования проектов со встроенными брокерами Apache Kafka. Чтобы использовать эту функцию, используйте тестовый модуль Spring Kafka в@EmbeddedKafkaАннотировать тестовый класс. Для получения дополнительной информации см.Spring For Apache KafkaСправочник.

Чтобы автоматическая настройка Spring Boot работала с ранее упомянутым встроенным брокером Apache Kafka, адрес встроенного брокера (установленныйEmbeddedKafkaBrokerFill) системные свойства переназначаются в свойства конфигурации Spring Boot для Apache Kafka. Есть несколько способов сделать это:

  • Предоставляет системное свойство для сопоставления встроенных прокси-адресов сspring.kafka.bootstrap-servers:
static {
    System.setProperty(EmbeddedKafkaBroker.BROKER_LIST_PROPERTY, "spring.kafka.bootstrap-servers");
}
  • существует@EmbeddedKafkaИмя атрибута конфигурации в аннотации:
@EmbeddedKafka(topics = "someTopic",
        bootstrapServersProperty = "spring.kafka.bootstrap-servers")
  • Используйте заполнители в свойствах конфигурации:
spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}

2.7 Поддержка интеграции Spring

Spring Integration также имеет адаптер для Kafka, поэтому мы можем легко использовать Spring Integration для реализации публикации и подписки.Конечно, вы также не можете использовать Spring Integration.

Что такое Spring Integration и для чего она нужна, вы можете прочитать в другой статье «Самое подробное объяснение Spring Integration».

3 Параметры конфигурации Spring Kafka

Вот описание всех конфигураций. Конфигурация Spring kafka делится на глобальную конфигурацию и конфигурацию подмодуля. Конфигурация подмодуля перезапишет глобальную конфигурацию. Например, SSL-аутентификацию можно настроить глобально, но ее также можно настроить в каждом подмодуле. -модуль, такой как потребитель, производитель, SSL можно настроить отдельно в потоковой обработке (возможно, для развертывания микросервиса, где потребители и производители не находятся в одном приложении). Здесь остановимся на конфигурации производителя и потребителя, а остальные не будем расширять, при использовании вы найдете и дополните.

3.1 Глобальная конфигурация

# 用逗号分隔的主机:端口对列表,用于建立到Kafka群集的初始连接。覆盖全局连接设置属性
spring.kafka.bootstrap-servers
# 在发出请求时传递给服务器的ID。用于服务器端日志记录
spring.kafka.client-id,默认无
# 用于配置客户端的其他属性,生产者和消费者共有的属性
spring.kafka.properties.*
# 消息发送的默认主题,默认无
spring.kafka.template.default-topic

3.2 Производители

«В весеннем сапоге», Кафка生产者Связанная конфигурация (все конфигурации имеют префиксspring.kafka.producer.):

# 生产者要求Leader在考虑请求完成之前收到的确认数
spring.kafka.producer.acks
# 默认批量大小。较小的批处理大小将使批处理不太常见,并可能降低吞吐量(批处理大小为零将完全禁用批处理)
spring.kafka.producer.batch-size
spring.kafka.producer.bootstrap-servers
# 生产者可用于缓冲等待发送到服务器的记录的总内存大小。
spring.kafka.producer.buffer-memory
# 在发出请求时传递给服务器的ID。用于服务器端日志记录。
spring.kafka.producer.client-id
# 生产者生成的所有数据的压缩类型
spring.kafka.producer.compression-type
# 键的序列化程序类
spring.kafka.producer.key-serializer
spring.kafka.producer.properties.*
# 大于零时,启用失败发送的重试次数
spring.kafka.producer.retries
spring.kafka.producer.ssl.key-password
spring.kafka.producer.ssl.key-store-location
spring.kafka.producer.ssl.key-store-password
spring.kafka.producer.ssl.key-store-type
spring.kafka.producer.ssl.protocol
spring.kafka.producer.ssl.trust-store-location
spring.kafka.producer.ssl.trust-store-password
spring.kafka.producer.ssl.trust-store-type
# 非空时,启用对生产者的事务支持
spring.kafka.producer.transaction-id-prefix
spring.kafka.producer.value-serializer

3.3 Потребители

В Spring Boot конфигурация, связанная с потребителем Kafka (все конфигурации с префиксомspring.kafka.consumer.):

# 如果“enable.auto.commit”设置为true,设置消费者偏移自动提交到Kafka的频率,默认值无,单位毫秒(ms)
spring.kafka.consumer.auto-commit-interval
# 当Kafka中没有初始偏移或服务器上不再存在当前偏移时策略设置,默认值无,latest/earliest/none三个值设置
# earliest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
# latest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
# none topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
spring.kafka.consumer.auto-offset-reset
# 用逗号分隔的主机:端口对列表,用于建立到Kafka群集的初始连接。覆盖全局连接设置属性
spring.kafka.consumer.bootstrap-servers
# 在发出请求时传递给服务器的ID,用于服务器端日志记录
spring.kafka.consumer.client-id
# 消费者的偏移量是否在后台定期提交
spring.kafka.consumer.enable-auto-commit
# 如果没有足够的数据来立即满足“fetch-min-size”的要求,则服务器在取回请求之前阻塞的最大时间量
spring.kafka.consumer.fetch-max-wait
# 服务器应为获取请求返回的最小数据量。
spring.kafka.consumer.fetch-min-size
# 标识此消费者所属的默认消费者组的唯一字符串
spring.kafka.consumer.group-id
# 消费者协调员的预期心跳间隔时间。
spring.kafka.consumer.heartbeat-interval
# 用于读取以事务方式写入的消息的隔离级别。
spring.kafka.consumer.isolation-level
# 密钥的反序列化程序类
spring.kafka.consumer.key-deserializer
# 在对poll()的单个调用中返回的最大记录数。
spring.kafka.consumer.max-poll-records
# 用于配置客户端的其他特定于消费者的属性。
spring.kafka.consumer.properties.*
# 密钥存储文件中私钥的密码。
spring.kafka.consumer.ssl.key-password
# 密钥存储文件的位置。
spring.kafka.consumer.ssl.key-store-location
# 密钥存储文件的存储密码。
spring.kafka.consumer.ssl.key-store-password
# 密钥存储的类型,如JKS
spring.kafka.consumer.ssl.key-store-type
# 要使用的SSL协议,如TLSv1.2, TLSv1.1, TLSv1
spring.kafka.consumer.ssl.protocol
# 信任存储文件的位置。
spring.kafka.consumer.ssl.trust-store-location
# 信任存储文件的存储密码。
spring.kafka.consumer.ssl.trust-store-password
# 信任存储区的类型。
spring.kafka.consumer.ssl.trust-store-type
# 值的反序列化程序类。
spring.kafka.consumer.value-deserializer

3.4 Слушатели

В Spring Boot конфигурация, связанная с Kafka Listener (все конфигурации с префиксомspring.kafka.listener.):

# ackMode为“COUNT”或“COUNT_TIME”时偏移提交之间的记录数
spring.kafka.listener.ack-count=
spring.kafka.listener.ack-mode
spring.kafka.listener.ack-time
spring.kafka.listener.client-id
spring.kafka.listener.concurrency
spring.kafka.listener.idle-event-interval
spring.kafka.listener.log-container-config
# 如果Broker上不存在至少一个配置的主题(topic),则容器是否无法启动,
# 该设置项结合Broker设置项allow.auto.create.topics=true,如果为false,则会自动创建不存在的topic
spring.kafka.listener.missing-topics-fatal=true
# 非响应消费者的检查间隔时间。如果未指定持续时间后缀,则将使用秒作为单位
spring.kafka.listener.monitor-interval
spring.kafka.listener.no-poll-threshold
spring.kafka.listener.poll-timeout
spring.kafka.listener.type

3.5 Управление

spring.kafka.admin.client-id
# 如果启动时代理不可用,是否快速失败
spring.kafka.admin.fail-fast=false
spring.kafka.admin.properties.*
spring.kafka.admin.ssl.key-password
spring.kafka.admin.ssl.key-store-location
spring.kafka.admin.ssl.key-store-password
spring.kafka.admin.ssl.key-store-type
spring.kafka.admin.ssl.protocol
spring.kafka.admin.ssl.trust-store-location
spring.kafka.admin.ssl.trust-store-password
spring.kafka.admin.ssl.trust-store-type

3.6 Служба авторизации (JAAS)

spring.kafka.jaas.control-flag=required
spring.kafka.jaas.enabled=false
spring.kafka.jaas.login-module=com.sun.security.auth.module.Krb5LoginModule
spring.kafka.jaas.options.*

3.7 SSL-аутентификация

spring.kafka.ssl.key-password
spring.kafka.ssl.key-store-location
spring.kafka.ssl.key-store-password
spring.kafka.ssl.key-store-type
spring.kafka.ssl.protocol
spring.kafka.ssl.trust-store-location
spring.kafka.ssl.trust-store-password
spring.kafka.ssl.trust-store-type

3.8 Потоковая обработка потоков

spring.kafka.streams.application-id
spring.kafka.streams.auto-startup
spring.kafka.streams.bootstrap-servers
spring.kafka.streams.cache-max-size-buffering
spring.kafka.streams.client-id
spring.kafka.streams.properties.*
spring.kafka.streams.replication-factor
spring.kafka.streams.ssl.key-password
spring.kafka.streams.ssl.key-store-location
spring.kafka.streams.ssl.key-store-password
spring.kafka.streams.ssl.key-store-type
spring.kafka.streams.ssl.protocol
spring.kafka.streams.ssl.trust-store-location
spring.kafka.streams.ssl.trust-store-password
spring.kafka.streams.ssl.trust-store-type
spring.kafka.streams.state-dir

4 Обзор основных возможностей подписки и публикации Kafka

  • Все потребители из одной группы потребителей координируют использование всех разделов темы подписки.
    • В одной и той же группе потребителей, если несколько потребителей подписываются на одну тему и один раздел, раздел будет назначен только одному из потребителей.Если потребитель не умрет, он будет назначен другому потребителю для потребления сообщений, что означает, что другие потребители наблюдают, чтобы поесть
    • В одной и той же группе потребителей, если N потребителей подпишутся на N разделов одной темы, каждому потребителю по умолчанию будет назначен раздел.
    • В одной и той же группе потребителей N потребителей подписываются на M разделов одной темы.Когда M > N, будет более одного раздела, выделенного потребителями, когда M
    • Все экземпляры-потребители, упомянутые выше, могут существовать в режиме потока или в режиме процесса.Механизм распределения разделов называется перебалансировкой.
    • Перебалансировка запускается при изменении количества участников в потребителе, перебалансировка запускается при изменении количества тем, на которые подписаны, перебалансировка запускается при изменении количества разделов тем, на которые подписаны.
    • Короче говоря, раздел может быть назначен только одному потребителю, а потребитель может быть назначен нескольким разделам.
  • Механизм управления смещением потребителей
    • Сообщения в каждом разделе темы имеют уникальное значение смещения, которое имеет последовательный порядок и имеет соответствующую связь с потребителями.Каждый раз, когда потребитель потребляет сообщение, смещение увеличивается на 1, записывается локально в потребителе и периодически. синхронизируется с сервером (Брокером), здесь можно настроить механизм синхронизации
    • Сообщения являются постоянными. Когда все потребители в группе повторно подписываются на тему, вы можете указать, следует ли начинать потребление сообщений с самого начала или начинать потребление с последнего записанного значения смещения.
  • Как установить количество разделов и потребителей
    • Мы знаем, что тематические разделы распределены по разным брокерам, и каждый раздел соответствует потребителю, поэтому обработка сообщений имеет высокую пропускную способность.
    • Раздел — это наименьшая единица для настройки параллелизма Kafka.Многопоточные потребители подключаются к нескольким разделам для потребления сообщений.С точки зрения реализации, они подключаются через сокеты, поэтому количество файловых дескрипторов также занято.
    • Создание разделов займет определенное количество памяти, не чем больше разделов, тем лучше Конечно, сообщество kafka сейчас оптимизирует эту часть, чтобы количество разделов было больше, и производительность не пострадает.

Как настраивать реплики, разделы, потребители и т. д., здесь подробно рассматриваться не будет и будет изучено позже.

5 Пример публикации-подписки

Среда, необходимая для реализации следующего примера:

  • Одноточечный сервер или кластер Kafka + Zookeeper настроен (если вы не знакомы с настройкой среды, вы можете прочитать предыдущую статью о настройке и тестировании среды Kafka) или использоватьSpring-kafka-test embedded Kafka Server
  • Среда разработки Spring Boot (2.2.1)
    • JDK (1.8 или выше)
    • STS(4.4.RELEASE)
    • Метод сборки MARVEN

5.1 Использование встроенного сервера Kafka

Мы знаем, что КафкаScala+ZookeeperBuild, вы можете загрузить пакет развертывания с официального сайта и развернуть его локально. Тем не менее, Spring Kafka Test инкапсулирует аннотированную функцию теста Kafka одним щелчком мыши для открытия сервера Kafka, что упрощает процесс разработки проверки функций, связанных с Kafka, и очень прост в использовании.

Добавьте зависимости:

<dependency>
	<groupId>org.springframework.kafka</groupId>
	<artifactId>spring-kafka-test</artifactId>
	<scope>test</scope>
</dependency>

Запустите службу, ниже используется тестовый пример Junit для прямого запуска службы сервера Kafka, включая четыре прокси-узла,Run as JUnit Test. :

@RunWith(SpringRunner.class)
@SpringBootTest(classes = ApplicationTests.class)
@EmbeddedKafka(count = 4,ports = {9092,9093,9094,9095})
public class ApplicationTests {
    @Test
    public void contextLoads()throws IOException {
        System.in.read();
    }
}

@EmbeddedKafkaСвязанные параметры можно установить в:

  • значение: установить количество агентов для создания
  • количество: то же, что и значение
  • порты: список номеров портов прокси
  • BrokerPropertiesLocation: укажите файл конфигурации, например, «classpath:application.properties».

Примечание. EmbeddedKafka не создает темы по умолчанию. подскажетTopic(s) [test] is/are not present and missingTopicsFatal is trueОшибка. @EmbeddedKafka По умолчанию создается брокер со случайным портом без каких-либо параметров, который выводит определенный порт и элементы конфигурации по умолчанию в журнале запуска.

5.2 Простая реализация публикации-подписки (без пользовательской конфигурации)

Далее реализуется простая функция публикации и подписки, вызывающая API через внешний веб-интерфейс, а затем производитель начинает отправлять сообщения после получения запроса в контроллере API, а потребитель прослушивает сообщение в фоновом режиме и печатает. его, если он получает потребительское сообщение.

5.2.1 Добавление зависимостей и настройка Kafka

Добавьте зависимости Кафки:

<dependency>
	<groupId>org.springframework.kafka</groupId>
	<artifactId>spring-kafka</artifactId>			
</dependency>

Настройте Kafka, где потребители и производители находятся в одном приложении, нам нужно только настроить адрес службы + порт Kafka Brokers:

server: 
  port: 9000
spring:
  kafka:
    bootstrap-servers: 10.151.113.57:9092,10.151.113.57:9093,10.151.113.57:9094
    listener:
        # 设置不监听主题错误,false时,如果broker设置了llow.auto.create.topics = true,生产者发送到未创建主题时,会默认自动创建主题
        # 且默认创建的主题是单副本单分区的
        missing-topics-fatal: false
    consumer:
        # 配置消费者消息offset是否自动重置(消费者重连会能够接收最开始的消息)
        auto-offset-reset: earliest

5.2.2 Добавить производителя

@Service
public class Producer {

	private static final Logger LOGGER = LogManager.getLogger(Producer.class);
    private static final String TOPIC = "users";

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(String message) {
    	LOGGER.info(String.format("===Producing message: {}", message));
        this.kafkaTemplate.send(TOPIC, message);
    }
}

5.2.3 Добавление потребителей

@Service
public class Consumer {

	private static final Logger LOGGER = LogManager.getLogger(Consumer.class);

    @KafkaListener(topics = "test", groupId = "group_test")
    public void consume(String message) throws IOException {
    	LOGGER.info(String.format("#### -> Consumed message -> %s", message));
    }
   
}

5.2.4 Добавить веб-контроллер

@RestController
@RequestMapping(value = "/kafka")
public class KafkaController {

    private final Producer producer;

    @Autowired
    KafkaController(Producer producer) {
        this.producer = producer;
    }

    @GetMapping(value = "/publish")
    public void sendMessageToKafkaTopic(@RequestParam("message") String message) {
        this.producer.sendMessage(message);
    }
}

5.2.5 Тестирование

Добавьте приложение Spring Boot:

@SpringBootApplication
public class TestKafkaApplication {
	public static void main(String[] args) {
		SpringApplication.run(TestKafkaApplication.class, args);
	}

}

После запуска Kafka Brokers вам необходимо вручную создать тему (если вы хотите создать ее автоматически, вам нужно использовать KafkaAdmin или настроить Kafka Brokerallow.auto.create.topics=trueи наборы приложенийlistener.missing-topics-fatal=false):

# 如果对kafka-topics.sh这里不熟悉,可以去翻看前面写的关于Kafka的相关文章(环境搭建和测试那一篇)
# 创建test主题
$ ./bin/kafka-topics.sh --create --bootstrap-server localhost:9092,localhost:9093,localhost:9094 --replication-factor 1 --partitions 2 --topic test

Тест в открытом браузере:

http://localhost:9000/kafka/publish?message=hello

то консоль приложения напечатаетhello. Реализация всей публикации и подписки использует только связанные с Kafka@KafkaListenerАннотации получают сообщения иKafkaTemplateШаблон для отправки сообщения, это очень просто.

5.3 Публикация и подписка реализации на основе пользовательской конфигурации

Выше приведен простой способ быстро реализовать функцию публикации и подписки с помощью конфигурации Spring Kafka, от которой зависит Spring Boot. В настоящее время мы не можем управлять этими конфигурациями в программе, поэтому в этом разделе используется наша предыдущая «Spring Boot from скретч 7_latest" Метод пользовательского файла конфигурации, описанный в статье "Подробное введение в конфигурацию и приоритет файла конфигурации", реализует функцию публикации и подписки.

В состав реализации входят:

  • Пользовательский файл параметров конфигурации Kafka (не application.properties/yml)
  • Multi-producer (каждый производитель представляет собой один сервис и один поток), multi-consumer (не-@KafkaListenerосуществлять мониторинг сообщений)
  • Поддержка конфигурации безопасности SSL
  • производитель мониторов

Исходный код не будет выложен напрямую, будет дана только основная часть.

Конфигурационный файл:

@Configuration
@ConfigurationProperties(prefix = "m2kc")
@PropertySource("classpath:kafka.properties")
@Validated
public class M2KCKafkaConfig {

    @Value("${m2kc.kafka.bootstrap.servers}")
    private String kafkaBootStrapServers;

    @Value("${m2kc.kafka.key.serializer.class}")
    private String kafkaKeySerializerClass;

    ......
    ......
}

Режиссер:

@Service
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public class KafkaProducer {
    private static final Logger LOGGER = LogManager.getLogger(KafkaProducer.class);
    private String mTopic = "test";
    private M2KCKafkaConfig mM2KCKafkaConfig;
    private KafkaTemplate<String, String> mKafkaTemplate;
   
    @Autowired
    public KafkaProducer(M2KCKafkaConfig kafkaConfig) {
        mTopic = kafkaConfig.getKafkaSourceTopic();
        mM2KCKafkaConfig = kafkaConfig;     
        mKafkaTemplate = getKafkaTemplate();
    }

    public KafkaTemplate<String, String> getKafkaTemplate() {
        KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<String, String>(producerFactory());
        return kafkaTemplate;
    }

    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> properties = new HashMap<String, Object>();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, mM2KCKafkaConfig.getKafkaBootStrapServers());
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, mM2KCKafkaConfig.getKafkaKeySerializerClass());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, mM2KCKafkaConfig.getKafkaValueSerializerClass());        
        if (mM2KCKafkaConfig.isKafkaSslEnable()) {
            // TODO : to test
            properties.put("security.protocol", mM2KCKafkaConfig.getKafkaSslProtocol());
            properties.put("ssl.truststore.location", mM2KCKafkaConfig.getKafkaSslStoreLocation());
            properties.put("ssl.truststore.password", mM2KCKafkaConfig.getKafkaSslTrustStorePassword());

            properties.put("ssl.keystore.location", mM2KCKafkaConfig.getKafkaSslStoreLocation());
            properties.put("ssl.keystore.password", mM2KCKafkaConfig.getKafkaSslKeyStorePassword());
            properties.put("ssl.key.password", mM2KCKafkaConfig.getKafkaSslKeyPassword());            
        }

        return new DefaultKafkaProducerFactory<String, String>(properties);
    }
    
    public void sendMessage(String msg) {
        LOGGER.info("===Producing message[{}]: {}", mTopic, msg);       
        ListenableFuture<SendResult<String, String>> future = mKafkaTemplate.send(mTopic, msg);
        future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
            @Override
            public void onSuccess(SendResult<String, String> result) {
                LOGGER.info("===Producing message success");  
            }

            @Override
            public void onFailure(Throwable ex) {
                LOGGER.info("===Producing message failed");  
            }

        });
    }
}

потребитель:

@Service
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public class KafkaConsumer implements InitializingBean {
    private static final Logger LOGGER = LogManager.getLogger(KafkaConsumer.class);

    private String mTopic;
    private M2KCKafkaConfig mM2KCKafkaConfig;
    private KafkaMessageListenerContainer<String, String> mKafkaMessageListenerContainer; 

    @Autowired
    public KafkaConsumer(M2KCKafkaConfig kafkaConfig) {
        LOGGER.info("===KafkaConsumer construct");
        mTopic = kafkaConfig.getKafkaSourceTopic();
        mM2KCKafkaConfig = kafkaConfig;
    }
    
    @PostConstruct
    public void start(){
        LOGGER.info("===KafkaConsumer start");        
    }
    
    @Override  
    public void afterPropertiesSet() throws Exception {          
        LOGGER.info("===afterPropertiesSet is called");      
        createContainer();
    }  

    private void createContainer() {
        mKafkaMessageListenerContainer =  createKafkaMessageListenerContainer();
        mKafkaMessageListenerContainer.setAutoStartup(false);;
        mKafkaMessageListenerContainer.start();
        LOGGER.info("===", mKafkaMessageListenerContainer);
    }
    
    private KafkaMessageListenerContainer<String, String> createKafkaMessageListenerContainer() {
        KafkaMessageListenerContainer<String, String> container = new KafkaMessageListenerContainer<>(consumerFactory(),
                createContainerProperties());
        LOGGER.info("===createKafkaMessageListenerContainer");
        return container;
    }
   
    private ContainerProperties createContainerProperties() {
        ContainerProperties containerProps = new ContainerProperties(mTopic);
        containerProps.setMessageListener(createMessageListener());
        return containerProps;
    }

    private ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> properties = new HashMap<String, Object>();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, mM2KCKafkaConfig.getKafkaBootStrapServers());
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, mM2KCKafkaConfig.getKafkaKeyDeserializerClass());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, mM2KCKafkaConfig.getKafkaValueDeserializerClass());
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, mM2KCKafkaConfig.getKafkaConsumerGroupID());
        if (mM2KCKafkaConfig.isKafkaSslEnable()) {
            // TODO : to test
            properties.put("security.protocol", mM2KCKafkaConfig.getKafkaSslProtocol());
            properties.put("ssl.truststore.location", mM2KCKafkaConfig.getKafkaSslStoreLocation());
            properties.put("ssl.truststore.password", mM2KCKafkaConfig.getKafkaSslTrustStorePassword());

            properties.put("ssl.keystore.location", mM2KCKafkaConfig.getKafkaSslStoreLocation());
            properties.put("ssl.keystore.password", mM2KCKafkaConfig.getKafkaSslKeyStorePassword());
            properties.put("ssl.key.password", mM2KCKafkaConfig.getKafkaSslKeyPassword());
        }

        return new DefaultKafkaConsumerFactory<String, String>(properties);
    }

    private MessageListener<String, String> createMessageListener() {
        return new MessageListener<String, String>() {
            @Override
            public void onMessage(ConsumerRecord<String, String> data) {
                // TODO Auto-generated method stub
                LOGGER.info("===Consuming msg: {}", data.value());
            }

        };
    }
}

наследоватьInitializingBeanПросто для инициализации его тоже можно убрать и прописать инициализацию в конструктор. И потребители, и производители здесь используют@Scope, поэтому вам нужно вручную получить экземпляр и вызвать getBean() через контекст. К тому же конфигурационный файл написан не полностью, так что тут нужно быть внимательным.

5.3 Реализация публикации и подписки на основе Spring Integration

Spring Integration также имеет адаптеры, поддерживающие Kafka.Используя Spring Integration, мы также можем быстро реализовать функцию публикации и подписки и реализовать функцию пакетного потребления группы с несколькими потребителями:

  • Реализовать пользовательский класс конфигурации Kafka
  • Использование Spring-интеграции
  • опубликовать подписаться
  • Групповое многопотребительское пакетное потребление
  • Используйте синтаксис, специфичный для домена DSL, для записи
  • Производитель публикует успешную и неудачную обработку исключений

Сначала мы можем взглянуть на общий канал обмена сообщениями Kafka:

  • KafkaProducerMessageHandler в исходящем канале используется для отправки сообщений в топик
  • KafkaMessageDrivenChannelAdapter для настройки входящих каналов и обработки сообщений

Конкретная демонстрация может относиться кОбразец в Github :

6 Резюме

В этой статье подробно описываются функции отправки и получения сообщений Spring Kafka. Другие включают краткое введение в Spring Kafka Stream, настройку параметров Spring Kafka и способы реализации функций публикации и подписки Kafka тремя способами в Spring Boot. -subscriber, безопасный транспорт SSL, Spring Integration Kafka и т. д. Статья очень длинная, схватывает общую ситуацию, сочетается с реальной ситуацией, охвачено почти основное содержание.

7 Расширение знаний

Язык выражений Spring (сокращенно SpEL) в Spring отличается от заполнителей свойств.${...}SpELвыражение помещается в#{...}in (за исключением Expression, используемого в кодовых блоках). Если в конфигурационном файле есть параметр темыspring.kafka.topics, вы можете передать параметры в файле конфигурации в аннотацию@KafkaListener(id = "foo", topics = "#{'${topicOne:annotated1,foo}'.split(',')}").

SpELРаспространенные примеры выражений:

// 字面量
#{3.1415926}    // 浮点数
#{9.87E4}       // 科学计数法表示98700
#{'Hello'}      // String 类型
#{false}        // Boolean 类型
// 引用Bean、属性和方法
#{sgtPeppers}                                   // 使用这个bean
#{sgtPeppers.artist}                            // 引用bean中的属性
#{sgtPeppers.selectArtist()}                    // 引用bean中的方法
#{sgtPeppers.selectArtist().toUpperCase()}      // 方法返回值的操作
#{sgtPeppers.selectArtist()?.toUpperCase()}     // 防止selectArtist()方法返回null,?表示非null则执行toUpperCase()
// 访问类作用域的方法和常量的话,使用T()这个关键的运算符
#{T(java.lang.Math)}   
#{T(java.lang.Math).PI}             // 引用PI的值
#{T(java.lang.Math).random()}       // 获取0-1的随机数
#{T(System).currentTimeMillis()}    // 获取时间到当前的毫秒数
// 替代属性占位符获取配置文件属性值
@Value("#{表达式}" 
private String variable;

8 ссылок