Spring Boot 2.0 быстро интегрирует и интегрирует промежуточное ПО для сообщений Kafka

Spring Boot
Spring Boot 2.0 быстро интегрирует и интегрирует промежуточное ПО для сообщений Kafka

Добро пожаловать в публичный аккаунт автора в WeChat:Сяоха изучает Java, Продолжайте каждый день продвигать сухие статьи в поле Java и давать ежемесячные выгоды! !

Персональный сайт:woohoo.exception.site/весенняя загрузка/…

Что такое Кафка?

Kafka — это распределенное ПО промежуточного слоя для сообщений публикации и подписки и платформа потоковой обработки с открытым исходным кодом Apache Foundation. Он возник в LinkedIn и написан на Scala и Java. Он стал проектом Apache в 2011 году и проектом верхнего уровня в рамках Apache Foundation в 2012 году.

Kafka предназначена для распределенных систем с высокой пропускной способностью. По сравнению с другим промежуточным программным обеспечением для сообщений, таким как RabbitMq, Kafka имеет лучшую пропускную способность, встроенные разделы, репликацию и присущую отказоустойчивость, что делает его очень подходящим для применения в области больших данных. Кроме того, Kafka также поддерживает потребление сообщений в автономном и онлайн-режиме.

Зачем использовать Кафку

  • низкая задержка- Kafka поддерживает доставку сообщений с малой задержкой, что очень быстро и может достигать 200 Вт операций записи в секунду;
  • высокая производительность- Kafka имеет высокую пропускную способность как для публикации сообщений, так и для подписки. Даже при сохранении сообщений уровня TB можно гарантировать стабильную работу;
  • надежность- Kafka распределена, разделена, реплицирована и отказоустойчива, что гарантирует нулевое время простоя и нулевую потерю данных.
  • Масштабируемость- Kafka поддерживает горизонтальное расширение кластера.
  • Долговечность— Kafka использует «распределенный журнал фиксации», в котором сообщения быстро сохраняются на диск.

Установка среды Кафка

Далее Xiaoha продемонстрирует вам, что в системе Linux принят простейший автономный метод установки, поскольку в этой статье основное внимание уделяется быстрой интеграции и интеграции Kafka со Spring Boot 2.x.

Скачать Кафку

Посетите официальный сайт Кафкиkafka.apache.org/downloads, скачайте пакет tgz, демо-версия здесь последняя версия 2.3.0.

Разархивировать, войти в каталог

После загрузки поместите его в указанное место и выполните команду для распаковки:

tar -zxvf kafka_2.11-2.3.0.tgz 

После завершения распаковки перейдите в каталог Kafka:

cd kafka_2.11-2.3.0

начать смотритель зоопарка

Через каталог binzookeeper-server-start.shЗапустите скрипт для запуска экземпляра zk с одним узлом:

bin/zookeeper-server-start.sh -daemon config/zookeeper.properties

Начать Кафку

Через каталог binkafka-server-start.shначать:

bin/kafka-server-start.sh  config/server.properties

Примечание: Kafka по умолчанию использует порт 9092. Не забудьте отключить брандмауэр.Для облачного сервера Alibaba не забудьте добавить группу безопасности.

Spring Boot 2.x начинает интегрироваться

Создайте новый веб-проект Spring Boot 2.x.

Структура проекта

добавить зависимости maven

Полные зависимости Xiaoha от maven здесь следующие:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.2.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>site.exception</groupId>
    <artifactId>spring-boot-kafka</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>spring-boot-kafka</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!-- Kafka -->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
      
       	<dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <scope>test</scope>
        </dependency>

        <!-- lombok -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>

        <!-- 阿里巴巴 fastjson -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.58</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

Добавить конфигурацию кафки

Измените файл application.yml и добавьте конфигурацию, связанную с kafka:

spring:
  kafka:
    # 指定 kafka 地址,我这里在本地,直接就 localhost, 若外网地址,注意修改【PS: 可以指定多个】
    bootstrap-servers: localhost:9092
    consumer:
      # 指定 group_id
      group-id: group_id
      auto-offset-reset: earliest
      # 指定消息key和消息体的编解码方式
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    producer:
      # 指定消息key和消息体的编解码方式
      key-deserializer: org.apache.kafka.common.serialization.StringSerializer
      value-deserializer: org.apache.kafka.common.serialization.StringSerializer

Об авто-смещении-сбросе

auto.offset.resetКонфигурация имеет 3 значения, которые можно установить следующим образом:

  • earliest: Когда каждый раздел отправилoffset, из представленныхoffsetначать потреблять; без фиксацииoffset, начать потребление с нуля;
  • latest: Когда каждый раздел отправилoffset, из представленныхoffsetначать потреблять; без фиксацииoffsetиспользовать вновь сгенерированные данные в разделе;
  • none: topicВсе разделы зафиксированыoffsetкогда, изoffsetНачать потребление после; пока нет зафиксированного разделаoffset, то выбрасывается исключение;

Рекомендуется по умолчаниюearliest, После установки этого параметра кафка перезапускается после ошибки, находит неизрасходованное смещение и может продолжать потреблять.

Последняя настройка легко потерять сообщения.Если проблема с кафкой, и данные пишутся в топик, перезапустите кафку в это время, и эта настройка начнет потребление с последнего смещения, независимо от проблем в середине.

нет Этот параметр не использовался, совместимость слишком плохая, и часто возникают проблемы.

Добавить класс заказа

В смоделированной бизнес-системе каждый раз, когда пользователь размещает заказ, отправляется сообщение для использования другими службами:

/**
 * @author 犬小哈(公众号:小哈学Java)
 * @date 2019/4/12
 * @time 下午3:05
 * @discription 订单实体类
 **/
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class Order {
    /**
     * 订单id
     */
    private long orderId;
    /**
     * 订单号
     */
    private String orderNum;
    /**
     * 订单创建时间
     */
    private LocalDateTime createTime;
}

Добавить издателя сообщения

создать новыйKafkaProviderКласс поставщика сообщений, исходный код выглядит следующим образом:

/**
 * @author 犬小哈(公众号:小哈学Java)
 * @date 2019/4/12
 * @time 下午3:05
 * @discription 消息提供者
 **/
@Component
@Slf4j
public class KafkaProvider {

    /**
     * 消息 TOPIC
     */
    private static final String TOPIC = "xiaoha";

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(long orderId, String orderNum, LocalDateTime createTime) {
        // 构建一个订单类
        Order order = Order.builder()
                .orderId(orderId)
                .orderNum(orderNum)
                .createTime(createTime)
                .build();

        // 发送消息,订单类的 json 作为消息体
        ListenableFuture<SendResult<String, String>> future =
                kafkaTemplate.send(TOPIC, JSONObject.toJSONString(order));

        // 监听回调
        future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
            @Override
            public void onFailure(Throwable throwable) {
                log.info("## Send message fail ...");
            }

            @Override
            public void onSuccess(SendResult<String, String> result) {
                log.info("## Send message success ...");
            }
        });
    }
}

Добавить получателя сообщения

После отправки сообщения, конечно, необходим потребитель.После того, как потребитель получит сообщение, он выполнит соответствующую бизнес-обработку.Здесь Xiaoha просто печатает тело сообщения.

Добавить кKafkaConsumerКласс потребления:

/**
 * @author 犬小哈(公众号:小哈学Java)
 * @date 2019/4/12
 * @time 下午3:05
 * @discription 消息消费者
 **/
@Component
@Slf4j
public class KafkaConsumer {

    @KafkaListener(topics = "xiaoha", groupId = "group_id")
    public void consume(String message) {
        log.info("## consume message: {}", message);
    }
}

пройти через@KafkaListenerАннотация, мы можем указать, что нужно контролироватьtopicа такжеgroupId, обратите внимание, что здесьtopicsэто массив, что означает, что мы можем указать несколькоtopic,как:@KafkaListener(topics = {"xiaoha", "xiaoha2"}, groupId = "group_id").

Примечание. ТЕМА издателя сообщения должна соответствовать ТЕМЕ, отслеживаемой потребителем, иначе потребитель не сможет использовать сообщение.

модульный тест

Создание новых модульных тестов, публикация функциональных тестовых сообщений и потребление.

/**
 * @author 犬小哈(公众号:小哈学Java)
 * @date 2019/4/12
 * @time 下午3:05
 * @discription
 **/
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringBootKafkaApplicationTests {

    @Autowired
    private KafkaProvider kafkaProvider;

    @Test
    public void sendMessage() throws InterruptedException {
        // 发送 1000 个消息
        for (int i = 0; i < 1000; i++) {
            long orderId = i+1;
            String orderNum = UUID.randomUUID().toString();
            kafkaProvider.sendMessage(orderId, orderNum, LocalDateTime.now());
        }

        TimeUnit.MINUTES.sleep(1);
    }
}

Отправьте 1000 сообщений, чтобы узнать, могут ли они быть опубликованы и использованы в обычном режиме.Журнал консоли выглядит следующим образом:

Можно обнаружить, что 1000 сообщений были успешно отправлены и использованы нормально.

Давайте еще раз проверим список тем Кафки, см.xiaohaэтоtopicНормально ли он создается и выполняетсяbinПосмотреть в каталогеtopicсписокkafka-topics.shсценарий:

bin/kafka-topics.sh --list --zookeeper localhost:2181

Готово!

Суммировать

Сегодня Сяоха в основном поделилась с вами тем, как установить автономную версию среды kafka, как быстро интегрировать промежуточное программное обеспечение сообщений Kafka в Spring Boot 2.x, а также продемонстрировать соответствующий пример кода для публикации и использования сообщений. увидимся в следующий раз!

Исходный адрес GitHub

GitHub.com/для моей любви/…

использованная литература

zh.wikipedia.org/wiki/Kafka

Woohoo. Моя школа 3С. Talent/Apache_Kafka…

nuggets.capable/post/684490…

woooooo.brief.com/afraid/oh 1 место 7 из 18 нет…

Добро пожаловать в публичный аккаунт WeChat: Сяоха изучает Java

关注微信公众号【小哈学Java】,回复【资源】,即可免费无套路领取资源链接哦
Подпишитесь на общедоступную учетную запись WeChat [Xiao Ha Learn Java], ответьте на [Ресурсы], вы можете получить ссылку на ресурс бесплатно без каких-либо процедур.