Брат-гид пообещал всем вторую оригинальную статью из серии о Кафке. Чтобы обеспечить обновление контента в режиме реального времени, я также отправил соответствующие статьи на Gihub! адрес: https://GitHub.com/snail Climb/spring boot-kafka
Связанное чтение:Начиная! Ярнакулярный берет вас, чтобы знать кафка!
Предварительные требования: на вашем компьютере установлен Docker.
основное содержание:
- Установить с помощью Докера
- Проверка функциональности очередей сообщений с помощью командной строки
- инструменты визуального управления zookeeper и kafka
- Простое использование Kafka в программах на Java
Настройте среду Kafka с установкой Docker
единое видение
Автономная версия Kafka используется в качестве демонстрации. Для изучения рекомендуется собрать автономную версию Kafka.
В следующем примере Docker используется для создания базовой среды Kafka из проекта с открытым исходным кодом:GitHub.com/simple step будет…. Конечно, вы также можете следить за официальными:GitHub.com/Загрязнение окружающей среды Мейстер….
Создайте новый с именемzk-single-kafka-single.yml
файл, содержимое файла следующее:
version: '2.1'
services:
zoo1:
image: zookeeper:3.4.9
hostname: zoo1
ports:
- "2181:2181"
environment:
ZOO_MY_ID: 1
ZOO_PORT: 2181
ZOO_SERVERS: server.1=zoo1:2888:3888
volumes:
- ./zk-single-kafka-single/zoo1/data:/data
- ./zk-single-kafka-single/zoo1/datalog:/datalog
kafka1:
image: confluentinc/cp-kafka:5.3.1
hostname: kafka1
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka1:19092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
KAFKA_BROKER_ID: 1
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
volumes:
- ./zk-single-kafka-single/kafka1/data:/var/lib/kafka/data
depends_on:
- zoo1
Выполните следующую команду, чтобы завершить настройку среды (zookeeper и kafka будут загружены и запущены автоматически)
docker-compose -f zk-single-kafka-single.yml up
Если вам нужно остановить контейнеры, связанные с Kafka, выполните следующую команду:
docker-compose -f zk-single-kafka-single.yml down
Кластерная версия
В следующем примере Docker используется для создания базовой среды Kafka из проекта с открытым исходным кодом:GitHub.com/simple step будет….
Создайте новый с именемzk-single-kafka-multiple.yml
файл, содержимое файла следующее:
version: '2.1'
services:
zoo1:
image: zookeeper:3.4.9
hostname: zoo1
ports:
- "2181:2181"
environment:
ZOO_MY_ID: 1
ZOO_PORT: 2181
ZOO_SERVERS: server.1=zoo1:2888:3888
volumes:
- ./zk-single-kafka-multiple/zoo1/data:/data
- ./zk-single-kafka-multiple/zoo1/datalog:/datalog
kafka1:
image: confluentinc/cp-kafka:5.4.0
hostname: kafka1
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka1:19092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
KAFKA_BROKER_ID: 1
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
volumes:
- ./zk-single-kafka-multiple/kafka1/data:/var/lib/kafka/data
depends_on:
- zoo1
kafka2:
image: confluentinc/cp-kafka:5.4.0
hostname: kafka2
ports:
- "9093:9093"
environment:
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka2:19093,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9093
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
KAFKA_BROKER_ID: 2
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
volumes:
- ./zk-single-kafka-multiple/kafka2/data:/var/lib/kafka/data
depends_on:
- zoo1
kafka3:
image: confluentinc/cp-kafka:5.4.0
hostname: kafka3
ports:
- "9094:9094"
environment:
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka3:19094,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9094
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
KAFKA_BROKER_ID: 3
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
volumes:
- ./zk-single-kafka-multiple/kafka3/data:/var/lib/kafka/data
depends_on:
- zoo1
Выполните следующую команду, чтобы завершить среду Kafka с 1 узлом Zookeeper + 3 узлами.
docker-compose -f zk-single-kafka-multiple.yml up
Если вам нужно остановить контейнеры, связанные с Kafka, выполните следующую команду:
docker-compose -f zk-single-kafka-multiple.yml down
Протестируйте создание и потребление сообщений с помощью командной строки.
В общем, мы редко используем операции командной строки Kafka.
1. Войдите в контейнер Kafka, чтобы выполнить некоторые команды, с которыми Kafka официально поставляется.
docker exec -ti docker_kafka1_1 bash
2. Перечислите все темы
root@kafka1:/# kafka-topics --describe --zookeeper zoo1:2181
3. Создайте тему
root@kafka1:/# kafka-topics --create --topic test --partitions 3 --zookeeper zoo1:2181 --replication-factor 1
Created topic test.
Мы создали тему под названием test с 3 разделами и 1 репликой.
4. Потребители подписываются на темы
root@kafka1:/# kafka-console-consumer --bootstrap-server localhost:9092 --topic test
send hello from console -producer
Мы подписались на тему под названием test.
5. Производитель отправляет сообщение в тему
root@kafka1:/# kafka-console-producer --broker-list localhost:9092 --topic test
>send hello from console -producer
>
Мы используемkafka-console-producer
Команда отправила сообщение в тему с именем test с содержимым: «отправить привет из консоли -производитель».
В это время вы обнаружите, что потребитель успешно получил сообщение:
root@kafka1:/# kafka-console-consumer --bootstrap-server localhost:9092 --topic test
send hello from console -producer
Рекомендуемые подключаемые модули, связанные с IDEA
Zoolytic-Zookeeper tool
Это плагин для визуализации Zookeeper, предоставленный IDEA, который очень прост в использовании! Мы можем передать это:
- Визуализируйте информацию об узле ZkNodes
- Управление узлом ZkNodes — добавить/удалить
- Изменить данные zkNodes
- ......
Фактический эффект использования выглядит следующим образом:
Инструкции:
- Откройте инструмент: View-> Tool Windows-> Zoolytic;
- После нажатия на знак "+" данные во всплывающем окне: "127.0.0.1:2181" подключены к zookeeper;
- После подключения нажмите на только что созданное подключение, а затем нажмите кнопку обновления рядом со знаком «+»!
Kafkalytic
Плагин визуального управления Kafka, предоставленный IDEA. Этот плагин предоставляет нам следующие функции:
- Поддержка нескольких кластеров
- Управление темами: создание/удаление/изменение разделов
- Поиск тем с использованием регулярных выражений
- Опубликовать сериализованное сообщение строки/байта
- Потребляйте сообщения, используя разные стратегии
Фактический эффект использования выглядит следующим образом:
Инструкции:
-
Откройте инструмент: Вид->Окна инструментов->кафкалитик;
-
После нажатия знака «+» подключите данные во всплывающем окне: «127.0.0.1:9092»;
Простое использование Kafka в программах на Java
Кодовый адрес:GitHub.com/snail Climb/…
Шаг 1: Создайте новый проект Maven
Step2: pom.xml
Добавить связанные зависимости
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.2.0</version>
</dependency>
Шаг 3: Инимируйте потребителей и производителей
KafkaConstants
Некоторые общие константы конфигурации Kafka определены в классе констант.
public class KafkaConstants {
public static final String BROKER_LIST = "localhost:9092";
public static final String CLIENT_ID = "client1";
public static String GROUP_ID_CONFIG="consumerGroup1";
private KafkaConstants() {
}
}
ProducerCreator
существует одинcreateProducer()
Методы метода используются для возвратаKafkaProducer
объект
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
/**
* @author shuang.kou
*/
public class ProducerCreator {
public static Producer<String, String> createProducer() {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConstants.BROKER_LIST);
properties.put(ProducerConfig.CLIENT_ID_CONFIG, KafkaConstants.CLIENT_ID);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
return new KafkaProducer<>(properties);
}
}
Есть один в потребителяхкреатcreateConsumer()
Методы метода используются для возвратаKafkaConsumer
объект
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.Properties;
public class ConsumerCreator {
public static Consumer<String, String> createConsumer() {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConstants.BROKER_LIST);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, KafkaConstants.GROUP_ID_CONFIG);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
return new KafkaConsumer<>(properties);
}
}
Шаг 4. Отправка и использование сообщений
Производитель отправляет сообщение:
private static final String TOPIC = "test-topic";
Producer<String, String> producer = ProducerCreator.createProducer();
ProducerRecord<String, String> record =
new ProducerRecord<>(TOPIC, "hello, Kafka!");
try {
//send message
RecordMetadata metadata = producer.send(record).get();
System.out.println("Record sent to partition " + metadata.partition()
+ " with offset " + metadata.offset());
} catch (ExecutionException | InterruptedException e) {
System.out.println("Error in sending record");
e.printStackTrace();
}
producer.close();
Потребители потребляют сообщения:
Consumer<String, String> consumer = ConsumerCreator.createConsumer();
// 循环消费消息
while (true) {
//subscribe topic and consume message
consumer.subscribe(Collections.singletonList(TOPIC));
ConsumerRecords<String, String> consumerRecords =
consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println("Consumer consume message:" + consumerRecord.value());
}
}
Шаг 5: Тест
Консоль бегуна выводит:
Record sent to partition 0 with offset 20
Consumer consume message:hello, Kafka!
Рекомендация проекта с открытым исходным кодом
Другие рекомендации автора по проектам с открытым исходным кодом:
- JavaGuide: [Изучение Java + интервью] Обложка, которая охватывает основные знания, которые необходимо освоить большинству Java-программистов.
- springboot-guide: Учебное пособие по Spring Boot, подходящее для начинающих и опытных разработчиков (поддержка в свободное время, добро пожаловать в совместную поддержку).
- programmer-advancement: Я думаю, некоторые хорошие привычки, которые должны быть у техников!
- spring-security-jwt-guide:Начинать с нуля! Spring Security с JWT (включая проверку авторизации) бэкэнд-часть кода.