Запустить систему накопления логов kafka за три минуты при весенней загрузке

Kafka

Способность Kafka накапливать сообщения относительно сильна, и она может накапливать сотни миллионов сообщений.Он особенно подходит для сценариев с низкими требованиями к реальному времени, такими как обработка журналов.В то же время он поддерживает развертывание кластера и имеет более высокую способность накопления и надежность, чем редис.

Полный код проекта выложен на github:GitHub.com/аккуратная жизнь/нет…

Вы можете быстро начать работу с этой кафкой, выполнив следующие действия.

Получить доступный экземпляр kafka

Вы можете использовать докер для запуска кластера kafka одним щелчком мыши, см.:GitHub.com/simple step будет…

git clone https://github.com/simplesteph/kafka-stack-docker-compose.git
cd kafka-stack-docker-compose
docker-compose -f full-stack.yml up -d

Эффект операции следующий

использовать командуdocker-compose -f full-stack.yml psПолучите порт, который kafka может прослушивать

Запишите адрес 9092, который слушает kafka, который будет использоваться позже

Порт 8000 - это пользовательский интерфейс этой темы кафки, этот интерфейс может просматривать текущий список тем, эффект следующий

Здесь же можно посмотреть сохраненные в теме данные

Подготовить кейс-проект

Тестовые проекты можно создавать на https://start.spring.io/

Необходимо добавить следующие три пакета

  1. spring-boot-starter-web
  2. spring-kafka
  3. lombok

существуетappliation.propertiesНастройте адрес kafka и идентификатор группы, используемый в , имя идентификатора группы можно определить самостоятельно, например: myconsumergroup

spring.kafka.bootstrap-servers=127.0.0.1:9092
spring.kafka.consumer.group-id=myconsumergroup

Отправить сообщение с помощью клиента kafka

Используйте службу весенней загрузки для инкапсуляции кода для отправки сообщений из kafka.Основной код выглядит следующим образом

package mykafka.service;

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class Producer {

    private final KafkaTemplate<String, String> kafkaTemplate;


    private String topic = "自行定义的topic";

    Producer(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void send(String message) {
        this.kafkaTemplate.send(topic, message);
        System.out.println("Sent sample message [" + message + "] to " + topic);
    }

}

Затем напишите интерфейс для вызова службы, которая отправляет сообщения kafka.Основной код выглядит следующим образом:

@RestController
@RequestMapping("/")
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class MyController {

    private final Producer producer;

    @RequestMapping("/test1")
    public String test1() {
        producer.send(String.format("my message currentTimeMillis: %d", System.currentTimeMillis()));
        return "test1";
    }
}

Примечание. Тема kafka, используемая в приведенном выше коде, может быть определена вами самостоятельно, например, mytopic

Затем войдите в этот интерфейс ip:8080/test1 в браузере.

Вы можете увидеть сообщения, отправленные в kafka, в пользовательском интерфейсе этой темы kafka.

Вы можете видеть, что сообщение было отправлено в kafka

Использование сообщений

Чтобы использовать сообщения, вам нужно только добавить KafkaListener в метод и указать тему и groupId.

Основной код выглядит следующим образом

@KafkaListener(topics = "mytopic", groupId = "myconsumergroup")
public void processMessage(String message,
                           @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
                           @Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
                           @Header(KafkaHeaders.OFFSET) List<Long> offsets) {
    log.info(
            "received message, topic: {}, partition: {}, offset: {}, message: {}",
            topics.get(0),
            partitions.get(0),
            offsets.get(0),
            message
    );
}

Эффект операции следующий:

Вы можете видеть, что сообщение в kafka было успешно получено

другие клиенты

php отправить и использовать клиентскую ссылку:GitHub.com/AR Возьми U-Shield-Boss/Боюсь…

ссылка на клиента:GitHub.com/confluent в…

некоторые замечания

Отправка и использование сообщений должны обеспечивать согласованность темы.

Журнал можно сначала отправить в kafka для буферизации, а затем сообщение можно вывести через клиент kafka и поместить в систему хранения логов, например elk, для анализа и визуализации.

Поскольку клиент kafka отправляет сообщение, а сервер сохраняет сообщение на диск, это асинхронные операции, поэтому сообщение может быть потеряно после остановки сервера.Если требования к надежности выше, вы можете использовать улучшенную версию kafka: RocketMQ

Ссылка на ссылку

  1. Woohoo. Принесите арлингтонского терьера.com/spring-kafka…
  2. Woohoo. Положите Arlington Terrier.com/spring-in JE…
  3. docs.confluent.IO/current/processing…