Когда Весна встречает Кафку, количество интересных знаний увеличивается

Spring Kafka
Когда Весна встречает Кафку, количество интересных знаний увеличивается

0. Прочитав эту статью, вы узнаете

  • Некоторые распространенные команды Linux
  • Как установить JDK, ZooKeeper, Kafka на Linux
  • Легкая интеграция Spring и Kafka

Первоначально Kafka была распределенной системой обмена сообщениями на основе ZooKeeper с несколькими разделами и несколькими копиями, разработанной LinkedIn с использованием языка Scala и переданной в дар Apache Foundation.

В настоящее время Kafka позиционируется как распределенная платформа обработки потоков, которая широко используется благодаря высокой пропускной способности, устойчивости, горизонтальной масштабируемости и поддержке потоковой обработки данных.

Есть и другая история о происхождении имени Кафка. Если у вас хорошая память, вы должны помнить школьный текст под названием «Метаморфозы», написанный австрийским писателем.Franz Kafka. Сам автор тоже очень любит свой "Замок". Автору Apache Kafka тоже очень нравился Франц Кафка, когда он учился в колледже, поэтому он назвал систему Kafka.

А теперь давайте включим компьютер и потренируемся вместе!

Если на вашем компьютере установлена ​​Kafka, вы можете пропустить первую часть и сразу перейти ко второй.

1. Установка и настройка Кафки

Перед установкой Kafka нам нужно установить Java и ZooKeeper.

1.1 Установите JDK

1. Проверьте, установлена ​​ли в системе Java.

Перед установкой JDK мы должны сначала подтвердить, установлен ли JDK в системе, следующим образом:

rem -qa | grep java
rem -qa | grep jdk
rem -qa | grep gcj

Если информации нет, значит, в системе не установлена ​​Java.

Если вы хотите удалить установленный JDK, вы можете выполнить следующую команду.

rpm -qa | grep java | xargs rpm -e --nodeps

2. Установите Java

Приступим к установке Java, здесь для примера возьмем 1.8.

yum list java-1.8*

С помощью этой команды мы можем увидеть все файлы версии Java 1.8.

java-1.8.0-openjdk.x86_64                                                                                                                                  
java-1.8.0-openjdk-accessibility.x86_64                                                                                                                    
java-1.8.0-openjdk-demo.x86_64                                                                                                                             
java-1.8.0-openjdk-devel.x86_64                                                                                                                            
java-1.8.0-openjdk-headless.x86_64                                                                                                                         
java-1.8.0-openjdk-headless-slowdebug.x86_64                                                                                                               
java-1.8.0-openjdk-javadoc.noarch                                                                                                                          
java-1.8.0-openjdk-javadoc-zip.noarch                                                                                                                      
java-1.8.0-openjdk-slowdebug.x86_64                                                                                                                        
java-1.8.0-openjdk-src.x86_64        

Затем мы можем установить все файлы для версии Java 1.8 с помощью этой команды.

yum install java-1.8.0-openjdk* -y

когда консоль вернетсяCompleteПосле этого он показывает, что Java была успешно установлена.

3. Подтвердите, что установка Java прошла успешно.

Используйте следующую команду для подтверждения

java -version

Результат отображается следующим образом, указывая на то, что установка прошла успешно.

При использовании yum для установки переменные среды настраиваются автоматически.

openjdk version "1.8.0_312"
OpenJDK Runtime Environment (build 1.8.0_312-b07)
OpenJDK 64-Bit Server VM (build 25.312-b07, mixed mode)

1.2 Установите ZooKeeper

1. Создайте каталог данных и загрузите ZooKeeper версии 3.7.0.

mkdir /data
cd /data
wget https://mirrors.bfsu.edu.cn/apache/zookeeper/zookeeper-3.7.0/apache-zookeeper-3.7.0-bin.tar.gz

2. Распаковать

tar -zxvf apache-zookeeper-3.7.0-bin.tar.gz

3. Измените файл конфигурации

// 进入配置文件目录
cd apache-zookeeper-3.7.0/conf

// 将zoo_sample.cfg这个文件复制为zoo.cfg 
cp zoo_sample.cfg  zoo.cfg

// 修改配置文件
vi zoo.cfg

После входа в vi zoo.cfg вам нужно нажать i, чтобы войти в режим вставки и внести изменения. После изменения нажмите ESC, чтобы выйти из режима вставки, войдите в режим командной строки, а затем нажмите два последовательных ZZ в верхнем регистре, чтобы сохранить и выйти.

будетdataDir=/tmp/zookeeperизменился наdataDir=/data/apache-zookeeper-3.7.0-bin/data

3. Создайте соответствующий каталог данных

mkdir /data/apache-zookeeper-3.7.0-bin/data

4. Запустите ZooKeeper

Войдите в каталог bin ZooKeeper и запустите службу.

cd /data/apache-zookeeper-3.7.0-bin/bin
./zkServer.sh start

После успешного завершения Zookeeper появится следующая информация:

/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /data/apache-zookeeper-3.7.0-bin/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED

Вот несколько других часто используемых команд

// 停止
./zkServer.sh stop

// 重启
./zkServer.sh restart

// 查看状态
./zkServer.sh status

1.3 Установить кафку

1. Скачать какфа версии 3.0.0

cd /data
wget https://mirrors.bfsu.edu.cn/apache/kafka/3.0.0/kafka_2.13-3.0.0.tgz

2. Распаковать

tar -zxvf kafka_2.13-3.0.0.tgz kafka_2.13-3.0.0

3. Старт

config/server.propertiesсерединаzookeeper.connectАдрес по умолчаниюlocalhost:2181, если ваш Zookeeper установлен локально, оставьте значение по умолчанию.

cd kafka_2.13-3.0.0.tgz kafka_2.13-3.0.0

// 前台启动:bin/kafka-server-start.sh config/server.properties
// 下面的命令行是后台启动,不会像前台启动一直打印日记。
bin/kafka-server-start.sh -daemon config/server.properties

Теперь, когда вы успешно запустили Kafka, поздравляем вас с первым шагом!

2. Интеграция Spring и Kafka

2.1 Настроить пом

Нам нужно добавить зависимости Kafka в pom.xml:

<dependency> 
    <groupId>org.springframework.kafka</groupId> 
    <artifactId>spring-kafka</artifactId> 
    <version>2.7.2</version> 
</dependency>

Демонстрационное приложение в этой статье будет приложением Spring Boot, вы можетездесьСоздайте приложение Spring Boot быстро и легко.

2.2 Настройка тем

Давайте сначала рассмотрим, что такое тема:

В Kafka атрибут категории используется для разделения класса, к которому принадлежат данные, а класс, который разделяет данные, называется топиком. Если Kafka рассматривать как базу данных, топик можно понимать как таблицу в базе данных, а имя топика — это имя таблицы.

Прежде чем мы сможем создать тему из командной строки

bin/kafka-topics.sh --create \ --zookeeper localhost:2181 \ --replication-factor 1 --partitions 1 \ --topic mytopic

Теперь, благодаря внедрению AdminClient в Kafka, мы можем создавать темы в программе. Нам нужно добавить бин KafkaAdmin, который может автоматически заносить темы всех бинов NewTopic.

@Configuration
public class KafkaTopicConfig {

    @Value(value = "${kafka.bootstrapAddress}")
    private String bootstrapAddress;

    @Bean
    public KafkaAdmin kafkaAdmin() {
        Map<String, Object> configs = new HashMap<>();
        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        return new KafkaAdmin(configs);
    }

    @Bean
    public NewTopic topic1() {
        return new NewTopic("jayxu", 1, (short) 1);
    }
}

2.3 Производственные сообщения

Чтобы создавать сообщения, нам сначала нужно настроить ProducerFactory. ProducerFactory устанавливает стратегию создания экземпляров Kafka Producer.

Затем нам нужен KafkaTemplate, который обертывает экземпляр Producer и предоставляет методы для отправки сообщений в Kafka Topic.

Экземпляры производителя являются потокобезопасными. Использование синглетонов во всей среде приложения приведет к повышению производительности. Экземпляры KakfaTemplate также являются потокобезопасными, поэтому рекомендуется использовать их.

2.3.1 Конфигурация производителя

@Configuration
public class KafkaProducerConfig {

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(
                ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                bootstrapAddress);
        configProps.put(
                ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class);
        configProps.put(
                ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

2.3.2 Публикация сообщения

Мы можем использовать KafkaTemplate для отправки сообщений.

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

public void sendMessage(String msg) {
    kafkaTemplate.send(topicName, msg);
}

sendAPI возвращает объект ListenableFuture. Если мы хотим заблокировать отправляющий поток и получить результат об отправленном сообщении, мы можем вызвать get API объекта ListenableFuture. Поток будет ждать результата, но это замедлит производителя.

Kafka — это платформа для быстрой обработки потоков. Поэтому лучше обрабатывать результаты асинхронно, чтобы последующие сообщения не ждали результатов предыдущего сообщения.

Мы можем сделать это с помощью обратных вызовов:

public void sendMessage(String message) {

    ListenableFuture<SendResult<String, String>> future =
            kafkaTemplate.send(topicName, message);

    future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {

        @Override
        public void onSuccess(SendResult<String, String> result) {
            System.out.println("Sent message=[" + message +
                    "] with offset=[" + result.getRecordMetadata().offset() + "]");
        }
        @Override
        public void onFailure(Throwable ex) {
            System.out.println("Unable to send message=["
                    + message + "] due to : " + ex.getMessage());
        }
    });
}

2.4 Использование сообщений

2.4.1 Конфигурация потребителя

Чтобы использовать сообщения, нам нужно настроить ConsumerFactory и KafkaListenerContainerFactory. Как только эти bean-компоненты станут доступны в фабрике bean-компонентов Spring, потребитель на основе POJO можно настроить с помощью аннотации @KafkaListener.

Аннотация @EnableKafka требуется в классе конфигурации для обнаружения аннотации @KafkaListener в bean-компонентах, управляемых Spring.

@EnableKafka
@Configuration
public class KafkaConsumerConfig {

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(
                ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                bootstrapAddress);
        props.put(
                ConsumerConfig.GROUP_ID_CONFIG,
                groupId);
        props.put(
                ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class);
        props.put(
                ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String>
    kafkaListenerContainerFactory() {

        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

2.4.2 Использование сообщений

@KafkaListener(topics = "topicName", groupId = "foo")
public void listenGroupFoo(String message) {
    System.out.println("Received Message in group foo: " + message);
}

Мы можем реализовать несколько слушателей для темы, каждый с другим идентификатором группы. Кроме того, потребитель может прослушивать сообщения из разных тем.

@KafkaListener(topics = "topic1, topic2", groupId = "foo")

Spring также поддерживает извлечение одного или нескольких заголовков сообщений с помощью аннотации @Header в слушателях.

@KafkaListener(topics = "topicName")
public void listenWithHeaders(
        @Payload String message,
        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
    System.out.println(
            "Received Message: " + message"
                    + "from partition: " + partition);
}

2.4.3 Использование информации о конкретном разделе

Обратите внимание, что тема, которую мы создалиjayxuЕсть только один раздел.

Однако для темы с несколькими разделами @KafkaListener может явно подписаться на тему с несколькими разделами.initial offsetКонкретный раздел темы.

@KafkaListener(
        topicPartitions = @TopicPartition(topic = "topicName",
                partitionOffsets = {
                        @PartitionOffset(partition = "0", initialOffset = "0"),
                        @PartitionOffset(partition = "3", initialOffset = "0")}),
        containerFactory = "partitionsKafkaListenerContainerFactory")
public void listenToPartition(
        @Payload String message,
        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
    System.out.println(
            "Received Message: " + message"
                    + "from partition: " + partition);
}

Поскольку в этом слушателеinitialOffsetимеет значение 0, поэтому каждый раз, когда этот прослушиватель инициализируется, все ранее использованные сообщения для разделов 0 и 3 используются повторно.

Если нам не нужно устанавливать смещение, мы можем использовать атрибут partitions аннотации @TopicPartition, чтобы установить только раздел без смещения.

@KafkaListener(topicPartitions = @TopicPartition(topic = "topicName", partitions = { "0", "1" }))

2.4.4 Добавление фильтра сообщений в прослушиватель

Мы можем настроить прослушиватель для приема определенных типов сообщений, добавив собственный фильтр. Это можно сделать, задав для RecordFilterStrategy значение KafkaListenerContainerFactory.

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String>
filterKafkaListenerContainerFactory() {

    ConcurrentKafkaListenerContainerFactory<String, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setRecordFilterStrategy(
            record -> record.value().contains("World"));
    return factory;
}

Затем мы можем настроить прослушиватель для использования этой фабрики контейнеров.

@KafkaListener(
        topics = "topicName",
        containerFactory = "filterKafkaListenerContainerFactory")
public void listenWithFilter(String message) {
    System.out.println("Received Message in filtered listener: " + message);
}

В этом прослушивателе все сообщения, соответствующие фильтру, будут отброшены.

2.5 Пользовательские конвертеры сообщений

До сих пор мы рассматривали только сообщения, которые отправляют и получают строки. Однако мы также можем отправлять и получать пользовательские объекты Java. Для этого требуется настроить соответствующий сериализатор в ProducerFactory и десериализатор в ConsumerFactory.

Давайте посмотрим на простой класс компонента, который мы отправим в виде сообщения.

public class Greeting {

    private String msg;
    private String name;

    // standard getters, setters and constructor
}

2.5.1 Создание пользовательских сообщений

В этом примере мы будем использовать JsonSerializer.

Давайте посмотрим на код ProducerFactory и KafkaTemplate.

@Bean
public ProducerFactory<String, Greeting> greetingProducerFactory() {
    // ...
    configProps.put(
            ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
            JsonSerializer.class);
    return new DefaultKafkaProducerFactory<>(configProps);
}

@Bean
public KafkaTemplate<String, Greeting> greetingKafkaTemplate() {
    return new KafkaTemplate<>(greetingProducerFactory());
}

Мы можем использовать этот новый KafkaTemplate для отправки приветственной информации.

kafkaTemplate.send(topicName, new Greeting("Hello", "World"));

2.5.2 Использование пользовательских сообщений

Аналогичным образом давайте изменим ConsumerFactory и KafkaListenerContainerFactory, чтобы правильно десериализовать приветственное сообщение.

@Bean
public ConsumerFactory<String, Greeting> greetingConsumerFactory() {
    // ...
    return new DefaultKafkaConsumerFactory<>(
            props,
            new StringDeserializer(),
            new JsonDeserializer<>(Greeting.class));
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, Greeting>
greetingKafkaListenerContainerFactory() {

    ConcurrentKafkaListenerContainerFactory<String, Greeting> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(greetingConsumerFactory());
    return factory;
}

Сериализатор и десериализатор JSON Spring-kafka использует библиотеку Jackson, которая также является дополнительной зависимостью Maven от проекта spring-kafka.

Итак, давайте добавим это в наш pom.xml.

<dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.9.7</version> </dependency>

Рекомендуется не использовать последнюю версию Джексона, а использовать версию, добавленную в pom.xml spring-kafka.

Наконец, нам нужно написать прослушиватель для приема приветственных сообщений.

@KafkaListener(
        topics = "topicName",
        containerFactory = "greetingKafkaListenerContainerFactory")
public void greetingListener(Greeting greeting) {
    // process greeting message
}

3. Резюме

В этой статье мы рассмотрели, как установить Kafka и основы того, как Spring поддерживает Apache Kafka. Мы кратко рассмотрели классы, используемые для отправки и получения сообщений.

Перед запуском кода убедитесь, что сервер Kafka запущен и тема создана вручную.

Спасибо за просмотр, и, пожалуйста, поддержите, если можете! Вы также можете посетить мою домашнюю страницу, чтобы следить за моими последующими статьями, большое спасибо!