5 минут, чтобы познакомиться с Кафкой

Java

Брат-гид пообещал всем вторую оригинальную статью из серии о Кафке. Чтобы обеспечить обновление контента в режиме реального времени, я также отправил соответствующие статьи на Gihub! адрес: https://GitHub.com/snail Climb/spring boot-kafka

Связанное чтение:Начиная! Ярнакулярный берет вас, чтобы знать кафка!

Предварительные требования: на вашем компьютере установлен Docker.

основное содержание:

  1. Установить с помощью Докера
  2. Проверка функциональности очередей сообщений с помощью командной строки
  3. инструменты визуального управления zookeeper и kafka
  4. Простое использование Kafka в программах на Java

Настройте среду Kafka с установкой Docker

единое видение

Автономная версия Kafka используется в качестве демонстрации. Для изучения рекомендуется собрать автономную версию Kafka.

В следующем примере Docker используется для создания базовой среды Kafka из проекта с открытым исходным кодом:GitHub.com/simple step будет…. Конечно, вы также можете следить за официальными:GitHub.com/Загрязнение окружающей среды Мейстер….

Создайте новый с именемzk-single-kafka-single.ymlфайл, содержимое файла следующее:

version: '2.1'

services:
  zoo1:
    image: zookeeper:3.4.9
    hostname: zoo1
    ports:
      - "2181:2181"
    environment:
      ZOO_MY_ID: 1
      ZOO_PORT: 2181
      ZOO_SERVERS: server.1=zoo1:2888:3888
    volumes:
      - ./zk-single-kafka-single/zoo1/data:/data
      - ./zk-single-kafka-single/zoo1/datalog:/datalog

  kafka1:
    image: confluentinc/cp-kafka:5.3.1
    hostname: kafka1
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka1:19092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
      KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
      KAFKA_BROKER_ID: 1
      KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

    volumes:
      - ./zk-single-kafka-single/kafka1/data:/var/lib/kafka/data
    depends_on:
      - zoo1

Выполните следующую команду, чтобы завершить настройку среды (zookeeper и kafka будут загружены и запущены автоматически)

docker-compose -f zk-single-kafka-single.yml up

Если вам нужно остановить контейнеры, связанные с Kafka, выполните следующую команду:

docker-compose -f zk-single-kafka-single.yml down

Кластерная версия

В следующем примере Docker используется для создания базовой среды Kafka из проекта с открытым исходным кодом:GitHub.com/simple step будет….

Создайте новый с именемzk-single-kafka-multiple.ymlфайл, содержимое файла следующее:

version: '2.1'

services:
  zoo1:
    image: zookeeper:3.4.9
    hostname: zoo1
    ports:
      - "2181:2181"
    environment:
        ZOO_MY_ID: 1
        ZOO_PORT: 2181
        ZOO_SERVERS: server.1=zoo1:2888:3888
    volumes:
      - ./zk-single-kafka-multiple/zoo1/data:/data
      - ./zk-single-kafka-multiple/zoo1/datalog:/datalog

  kafka1:
    image: confluentinc/cp-kafka:5.4.0
    hostname: kafka1
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka1:19092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
      KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
      KAFKA_BROKER_ID: 1
      KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
    volumes:
      - ./zk-single-kafka-multiple/kafka1/data:/var/lib/kafka/data
    depends_on:
      - zoo1

  kafka2:
    image: confluentinc/cp-kafka:5.4.0
    hostname: kafka2
    ports:
      - "9093:9093"
    environment:
      KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka2:19093,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9093
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
      KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
      KAFKA_BROKER_ID: 2
      KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
    volumes:
      - ./zk-single-kafka-multiple/kafka2/data:/var/lib/kafka/data
    depends_on:
      - zoo1


  kafka3:
    image: confluentinc/cp-kafka:5.4.0
    hostname: kafka3
    ports:
      - "9094:9094"
    environment:
      KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka3:19094,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9094
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
      KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
      KAFKA_BROKER_ID: 3
      KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
    volumes:
      - ./zk-single-kafka-multiple/kafka3/data:/var/lib/kafka/data
    depends_on:
      - zoo1

Выполните следующую команду, чтобы завершить среду Kafka с 1 узлом Zookeeper + 3 узлами.

docker-compose -f zk-single-kafka-multiple.yml up

Если вам нужно остановить контейнеры, связанные с Kafka, выполните следующую команду:

docker-compose -f zk-single-kafka-multiple.yml down

Протестируйте создание и потребление сообщений с помощью командной строки.

В общем, мы редко используем операции командной строки Kafka.

1. Войдите в контейнер Kafka, чтобы выполнить некоторые команды, с которыми Kafka официально поставляется.

docker exec -ti docker_kafka1_1 bash

2. Перечислите все темы

root@kafka1:/# kafka-topics --describe --zookeeper zoo1:2181

3. Создайте тему

root@kafka1:/# kafka-topics --create --topic test --partitions 3 --zookeeper zoo1:2181 --replication-factor 1
Created topic test.

Мы создали тему под названием test с 3 разделами и 1 репликой.

4. Потребители подписываются на темы

root@kafka1:/# kafka-console-consumer --bootstrap-server localhost:9092 --topic test
send hello from console -producer

Мы подписались на тему под названием test.

5. Производитель отправляет сообщение в тему

root@kafka1:/# kafka-console-producer --broker-list localhost:9092 --topic test
>send hello from console -producer
>

Мы используемkafka-console-producerКоманда отправила сообщение в тему с именем test с содержимым: «отправить привет из консоли -производитель».

В это время вы обнаружите, что потребитель успешно получил сообщение:

root@kafka1:/# kafka-console-consumer --bootstrap-server localhost:9092 --topic test
send hello from console -producer

Рекомендуемые подключаемые модули, связанные с IDEA

Zoolytic-Zookeeper tool

Это плагин для визуализации Zookeeper, предоставленный IDEA, который очень прост в использовании! Мы можем передать это:

  1. Визуализируйте информацию об узле ZkNodes
  2. Управление узлом ZkNodes — добавить/удалить
  3. Изменить данные zkNodes
  4. ......

Фактический эффект использования выглядит следующим образом:

Инструкции:

  1. Откройте инструмент: View-> Tool Windows-> Zoolytic;
  2. После нажатия на знак "+" данные во всплывающем окне: "127.0.0.1:2181" подключены к zookeeper;
  3. После подключения нажмите на только что созданное подключение, а затем нажмите кнопку обновления рядом со знаком «+»!

Kafkalytic

Плагин визуального управления Kafka, предоставленный IDEA. Этот плагин предоставляет нам следующие функции:

  1. Поддержка нескольких кластеров
  2. Управление темами: создание/удаление/изменение разделов
  3. Поиск тем с использованием регулярных выражений
  4. Опубликовать сериализованное сообщение строки/байта
  5. Потребляйте сообщения, используя разные стратегии

Фактический эффект использования выглядит следующим образом:

Инструкции:

  1. Откройте инструмент: Вид->Окна инструментов->кафкалитик;

  2. После нажатия знака «+» подключите данные во всплывающем окне: «127.0.0.1:9092»;

Простое использование Kafka в программах на Java

Кодовый адрес:GitHub.com/snail Climb/…

Шаг 1: Создайте новый проект Maven

Step2: pom.xmlДобавить связанные зависимости

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.2.0</version>
        </dependency>

Шаг 3: Инимируйте потребителей и производителей

KafkaConstantsНекоторые общие константы конфигурации Kafka определены в классе констант.

public class KafkaConstants {
    public static final String BROKER_LIST = "localhost:9092";
    public static final String CLIENT_ID = "client1";
    public static String GROUP_ID_CONFIG="consumerGroup1";
    private KafkaConstants() {

    }
}

ProducerCreatorсуществует одинcreateProducer()Методы метода используются для возвратаKafkaProducerобъект

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

/**
 * @author shuang.kou
 */
public class ProducerCreator {


    public static Producer<String, String> createProducer() {
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConstants.BROKER_LIST);
        properties.put(ProducerConfig.CLIENT_ID_CONFIG, KafkaConstants.CLIENT_ID);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        return new KafkaProducer<>(properties);
    }
}

Есть один в потребителяхкреатcreateConsumer()Методы метода используются для возвратаKafkaConsumerобъект

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.Properties;

public class ConsumerCreator {

    public static Consumer<String, String> createConsumer() {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConstants.BROKER_LIST);
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, KafkaConstants.GROUP_ID_CONFIG);
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        return new KafkaConsumer<>(properties);
    }
}

Шаг 4. Отправка и использование сообщений

Производитель отправляет сообщение:

private static final String TOPIC = "test-topic";
Producer<String, String> producer = ProducerCreator.createProducer();
ProducerRecord<String, String> record =
 new ProducerRecord<>(TOPIC, "hello, Kafka!");
try {
 //send message
 RecordMetadata metadata = producer.send(record).get();
 System.out.println("Record sent to partition " + metadata.partition()
                    + " with offset " + metadata.offset());
} catch (ExecutionException | InterruptedException e) {
 System.out.println("Error in sending record");
 e.printStackTrace();
}
producer.close();

Потребители потребляют сообщения:

Consumer<String, String> consumer = ConsumerCreator.createConsumer();
// 循环消费消息
while (true) {
  //subscribe topic and consume message
  consumer.subscribe(Collections.singletonList(TOPIC));

  ConsumerRecords<String, String> consumerRecords =
    consumer.poll(Duration.ofMillis(1000));
  for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
    System.out.println("Consumer consume message:" + consumerRecord.value());
  }
}

Шаг 5: Тест

Консоль бегуна выводит:

Record sent to partition 0 with offset 20
Consumer consume message:hello, Kafka!

Рекомендация проекта с открытым исходным кодом

Другие рекомендации автора по проектам с открытым исходным кодом:

  1. JavaGuide: [Изучение Java + интервью] Обложка, которая охватывает основные знания, которые необходимо освоить большинству Java-программистов.
  2. springboot-guide: Учебное пособие по Spring Boot, подходящее для начинающих и опытных разработчиков (поддержка в свободное время, добро пожаловать в совместную поддержку).
  3. programmer-advancement: Я думаю, некоторые хорошие привычки, которые должны быть у техников!
  4. spring-security-jwt-guide:Начинать с нуля! Spring Security с JWT (включая проверку авторизации) бэкэнд-часть кода.

публика