Добро пожаловать в публичный аккаунт автора в WeChat:Сяоха изучает Java, Продолжайте каждый день продвигать сухие статьи в поле Java и давать ежемесячные выгоды! !
Персональный сайт:woohoo.exception.site/весенняя загрузка/…
Что такое Кафка?
Kafka — это распределенное ПО промежуточного слоя для сообщений публикации и подписки и платформа потоковой обработки с открытым исходным кодом Apache Foundation. Он возник в LinkedIn и написан на Scala и Java. Он стал проектом Apache в 2011 году и проектом верхнего уровня в рамках Apache Foundation в 2012 году.
Kafka предназначена для распределенных систем с высокой пропускной способностью. По сравнению с другим промежуточным программным обеспечением для сообщений, таким как RabbitMq, Kafka имеет лучшую пропускную способность, встроенные разделы, репликацию и присущую отказоустойчивость, что делает его очень подходящим для применения в области больших данных. Кроме того, Kafka также поддерживает потребление сообщений в автономном и онлайн-режиме.
Зачем использовать Кафку
- низкая задержка- Kafka поддерживает доставку сообщений с малой задержкой, что очень быстро и может достигать 200 Вт операций записи в секунду;
- высокая производительность- Kafka имеет высокую пропускную способность как для публикации сообщений, так и для подписки. Даже при сохранении сообщений уровня TB можно гарантировать стабильную работу;
- надежность- Kafka распределена, разделена, реплицирована и отказоустойчива, что гарантирует нулевое время простоя и нулевую потерю данных.
- Масштабируемость- Kafka поддерживает горизонтальное расширение кластера.
- Долговечность— Kafka использует «распределенный журнал фиксации», в котором сообщения быстро сохраняются на диск.
Установка среды Кафка
Далее Xiaoha продемонстрирует вам, что в системе Linux принят простейший автономный метод установки, поскольку в этой статье основное внимание уделяется быстрой интеграции и интеграции Kafka со Spring Boot 2.x.
Скачать Кафку
Посетите официальный сайт Кафкиkafka.apache.org/downloads, скачайте пакет tgz, демо-версия здесь последняя версия 2.3.0.
Разархивировать, войти в каталог
После загрузки поместите его в указанное место и выполните команду для распаковки:
tar -zxvf kafka_2.11-2.3.0.tgz
После завершения распаковки перейдите в каталог Kafka:
cd kafka_2.11-2.3.0
начать смотритель зоопарка
Через каталог binzookeeper-server-start.sh
Запустите скрипт для запуска экземпляра zk с одним узлом:
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
Начать Кафку
Через каталог binkafka-server-start.sh
начать:
bin/kafka-server-start.sh config/server.properties
Примечание: Kafka по умолчанию использует порт 9092. Не забудьте отключить брандмауэр.Для облачного сервера Alibaba не забудьте добавить группу безопасности.
Spring Boot 2.x начинает интегрироваться
Создайте новый веб-проект Spring Boot 2.x.
Структура проекта
добавить зависимости maven
Полные зависимости Xiaoha от maven здесь следующие:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.2.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>site.exception</groupId>
<artifactId>spring-boot-kafka</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>spring-boot-kafka</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- Kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
<!-- lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<!-- 阿里巴巴 fastjson -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.58</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
Добавить конфигурацию кафки
Измените файл application.yml и добавьте конфигурацию, связанную с kafka:
spring:
kafka:
# 指定 kafka 地址,我这里在本地,直接就 localhost, 若外网地址,注意修改【PS: 可以指定多个】
bootstrap-servers: localhost:9092
consumer:
# 指定 group_id
group-id: group_id
auto-offset-reset: earliest
# 指定消息key和消息体的编解码方式
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
# 指定消息key和消息体的编解码方式
key-deserializer: org.apache.kafka.common.serialization.StringSerializer
value-deserializer: org.apache.kafka.common.serialization.StringSerializer
Об авто-смещении-сбросе
auto.offset.reset
Конфигурация имеет 3 значения, которые можно установить следующим образом:
-
earliest: Когда каждый раздел отправил
offset
, из представленныхoffset
начать потреблять; без фиксацииoffset
, начать потребление с нуля; -
latest: Когда каждый раздел отправил
offset
, из представленныхoffset
начать потреблять; без фиксацииoffset
использовать вновь сгенерированные данные в разделе; -
none:
topic
Все разделы зафиксированыoffset
когда, изoffset
Начать потребление после; пока нет зафиксированного разделаoffset
, то выбрасывается исключение;
Рекомендуется по умолчаниюearliest
, После установки этого параметра кафка перезапускается после ошибки, находит неизрасходованное смещение и может продолжать потреблять.
Последняя настройка легко потерять сообщения.Если проблема с кафкой, и данные пишутся в топик, перезапустите кафку в это время, и эта настройка начнет потребление с последнего смещения, независимо от проблем в середине.
нет Этот параметр не использовался, совместимость слишком плохая, и часто возникают проблемы.
Добавить класс заказа
В смоделированной бизнес-системе каждый раз, когда пользователь размещает заказ, отправляется сообщение для использования другими службами:
/**
* @author 犬小哈(公众号:小哈学Java)
* @date 2019/4/12
* @time 下午3:05
* @discription 订单实体类
**/
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class Order {
/**
* 订单id
*/
private long orderId;
/**
* 订单号
*/
private String orderNum;
/**
* 订单创建时间
*/
private LocalDateTime createTime;
}
Добавить издателя сообщения
создать новыйKafkaProvider
Класс поставщика сообщений, исходный код выглядит следующим образом:
/**
* @author 犬小哈(公众号:小哈学Java)
* @date 2019/4/12
* @time 下午3:05
* @discription 消息提供者
**/
@Component
@Slf4j
public class KafkaProvider {
/**
* 消息 TOPIC
*/
private static final String TOPIC = "xiaoha";
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(long orderId, String orderNum, LocalDateTime createTime) {
// 构建一个订单类
Order order = Order.builder()
.orderId(orderId)
.orderNum(orderNum)
.createTime(createTime)
.build();
// 发送消息,订单类的 json 作为消息体
ListenableFuture<SendResult<String, String>> future =
kafkaTemplate.send(TOPIC, JSONObject.toJSONString(order));
// 监听回调
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onFailure(Throwable throwable) {
log.info("## Send message fail ...");
}
@Override
public void onSuccess(SendResult<String, String> result) {
log.info("## Send message success ...");
}
});
}
}
Добавить получателя сообщения
После отправки сообщения, конечно, необходим потребитель.После того, как потребитель получит сообщение, он выполнит соответствующую бизнес-обработку.Здесь Xiaoha просто печатает тело сообщения.
Добавить кKafkaConsumer
Класс потребления:
/**
* @author 犬小哈(公众号:小哈学Java)
* @date 2019/4/12
* @time 下午3:05
* @discription 消息消费者
**/
@Component
@Slf4j
public class KafkaConsumer {
@KafkaListener(topics = "xiaoha", groupId = "group_id")
public void consume(String message) {
log.info("## consume message: {}", message);
}
}
пройти через@KafkaListener
Аннотация, мы можем указать, что нужно контролироватьtopic
а такжеgroupId
, обратите внимание, что здесьtopics
это массив, что означает, что мы можем указать несколькоtopic
,как:@KafkaListener(topics = {"xiaoha", "xiaoha2"}, groupId = "group_id")
.
Примечание. ТЕМА издателя сообщения должна соответствовать ТЕМЕ, отслеживаемой потребителем, иначе потребитель не сможет использовать сообщение.
модульный тест
Создание новых модульных тестов, публикация функциональных тестовых сообщений и потребление.
/**
* @author 犬小哈(公众号:小哈学Java)
* @date 2019/4/12
* @time 下午3:05
* @discription
**/
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringBootKafkaApplicationTests {
@Autowired
private KafkaProvider kafkaProvider;
@Test
public void sendMessage() throws InterruptedException {
// 发送 1000 个消息
for (int i = 0; i < 1000; i++) {
long orderId = i+1;
String orderNum = UUID.randomUUID().toString();
kafkaProvider.sendMessage(orderId, orderNum, LocalDateTime.now());
}
TimeUnit.MINUTES.sleep(1);
}
}
Отправьте 1000 сообщений, чтобы узнать, могут ли они быть опубликованы и использованы в обычном режиме.Журнал консоли выглядит следующим образом:
Можно обнаружить, что 1000 сообщений были успешно отправлены и использованы нормально.
Давайте еще раз проверим список тем Кафки, см.xiaoha
этоtopic
Нормально ли он создается и выполняетсяbin
Посмотреть в каталогеtopic
списокkafka-topics.sh
сценарий:
bin/kafka-topics.sh --list --zookeeper localhost:2181
Готово!
Суммировать
Сегодня Сяоха в основном поделилась с вами тем, как установить автономную версию среды kafka, как быстро интегрировать промежуточное программное обеспечение сообщений Kafka в Spring Boot 2.x, а также продемонстрировать соответствующий пример кода для публикации и использования сообщений. увидимся в следующий раз!
Исходный адрес GitHub
использованная литература
Woohoo. Моя школа 3С. Talent/Apache_Kafka…
woooooo.brief.com/afraid/oh 1 место 7 из 18 нет…