Способность Kafka накапливать сообщения относительно сильна, и она может накапливать сотни миллионов сообщений.Он особенно подходит для сценариев с низкими требованиями к реальному времени, такими как обработка журналов.В то же время он поддерживает развертывание кластера и имеет более высокую способность накопления и надежность, чем редис.
Полный код проекта выложен на github:GitHub.com/аккуратная жизнь/нет…
Вы можете быстро начать работу с этой кафкой, выполнив следующие действия.
Получить доступный экземпляр kafka
Вы можете использовать докер для запуска кластера kafka одним щелчком мыши, см.:GitHub.com/simple step будет…
git clone https://github.com/simplesteph/kafka-stack-docker-compose.git
cd kafka-stack-docker-compose
docker-compose -f full-stack.yml up -d
Эффект операции следующий
использовать командуdocker-compose -f full-stack.yml ps
Получите порт, который kafka может прослушивать
Запишите адрес 9092, который слушает kafka, который будет использоваться позже
Порт 8000 - это пользовательский интерфейс этой темы кафки, этот интерфейс может просматривать текущий список тем, эффект следующий
Здесь же можно посмотреть сохраненные в теме данные
Подготовить кейс-проект
Тестовые проекты можно создавать на https://start.spring.io/
Необходимо добавить следующие три пакета- spring-boot-starter-web
- spring-kafka
- lombok
существуетappliation.properties
Настройте адрес kafka и идентификатор группы, используемый в , имя идентификатора группы можно определить самостоятельно, например: myconsumergroup
spring.kafka.bootstrap-servers=127.0.0.1:9092
spring.kafka.consumer.group-id=myconsumergroup
Отправить сообщение с помощью клиента kafka
Используйте службу весенней загрузки для инкапсуляции кода для отправки сообщений из kafka.Основной код выглядит следующим образом
package mykafka.service;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class Producer {
private final KafkaTemplate<String, String> kafkaTemplate;
private String topic = "自行定义的topic";
Producer(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void send(String message) {
this.kafkaTemplate.send(topic, message);
System.out.println("Sent sample message [" + message + "] to " + topic);
}
}
Затем напишите интерфейс для вызова службы, которая отправляет сообщения kafka.Основной код выглядит следующим образом:
@RestController
@RequestMapping("/")
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class MyController {
private final Producer producer;
@RequestMapping("/test1")
public String test1() {
producer.send(String.format("my message currentTimeMillis: %d", System.currentTimeMillis()));
return "test1";
}
}
Примечание. Тема kafka, используемая в приведенном выше коде, может быть определена вами самостоятельно, например, mytopic
Затем войдите в этот интерфейс ip:8080/test1 в браузере.
Вы можете увидеть сообщения, отправленные в kafka, в пользовательском интерфейсе этой темы kafka.Вы можете видеть, что сообщение было отправлено в kafka
Использование сообщений
Чтобы использовать сообщения, вам нужно только добавить KafkaListener в метод и указать тему и groupId.
Основной код выглядит следующим образом
@KafkaListener(topics = "mytopic", groupId = "myconsumergroup")
public void processMessage(String message,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
@Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
@Header(KafkaHeaders.OFFSET) List<Long> offsets) {
log.info(
"received message, topic: {}, partition: {}, offset: {}, message: {}",
topics.get(0),
partitions.get(0),
offsets.get(0),
message
);
}
Эффект операции следующий:
Вы можете видеть, что сообщение в kafka было успешно полученодругие клиенты
php отправить и использовать клиентскую ссылку:GitHub.com/AR Возьми U-Shield-Boss/Боюсь…
ссылка на клиента:GitHub.com/confluent в…
некоторые замечания
Отправка и использование сообщений должны обеспечивать согласованность темы.
Журнал можно сначала отправить в kafka для буферизации, а затем сообщение можно вывести через клиент kafka и поместить в систему хранения логов, например elk, для анализа и визуализации.
Поскольку клиент kafka отправляет сообщение, а сервер сохраняет сообщение на диск, это асинхронные операции, поэтому сообщение может быть потеряно после остановки сервера.Если требования к надежности выше, вы можете использовать улучшенную версию kafka: RocketMQ