Адрес фактического центра электронной коммерции SpringBoot (40k+star):GitHub.com/macro-positive/…
Резюме
Kafka — очень популярное промежуточное ПО для обмена сообщениями, согласно официальному сайту, его используют тысячи компаний. Недавно я практиковал волну Кафки, которая действительно хороша и мощна. Сегодня мы изучим Kafka с трех сторон: установка Kafka под Linux, инструмент визуализации Kafka и совместное использование Kafka и SpringBoot. Я надеюсь, что вы сможете быстро начать работу с Kafka и освоить это популярное промежуточное ПО для сообщений после его прочтения!
Знакомство с Кафкой
Кафка сделанаLinkedIn
Платформа распределенного потока сообщений с открытым исходным кодом, разработанная компанией, написанная на Scala и Java. Основная функция — предоставить унифицированную платформу с высокой пропускной способностью и малой задержкой для обработки данных в реальном времени.发布订阅模式
система обработки сообщений.
Кафка обладает следующими характеристиками:
- Высокая пропускная способность и низкая задержка: Kafka очень быстро отправляет и получает сообщения, а задержка при обработке сообщений с использованием кластера может составлять всего 2 мс.
- Высокая масштабируемость: Kafka может эластично расширяться и сжиматься, а также масштабироваться до тысяч брокеров, сотен тысяч разделов и обрабатывать триллионы сообщений в день.
- Постоянное хранилище: Kafka может безопасно хранить данные в распределенном, надежном, отказоустойчивом кластере.
- Высокая доступность: Kafka может эффективно расширять кластер в зоне доступности.Если узел выходит из строя, кластер все еще может нормально работать.
Кафка инсталляция
Мы будем использовать метод установки под Linux, а среда установки — CentOS 7.6. Docker здесь не используется для установки и развертывания Лично мне проще установить напрямую (в основном из-за того, что официальный образ Docker не предоставляется)!
- Сначала нам нужно скачать установочный пакет Kafka, адрес загрузки:зеркала.Встречное требование.Квота.Способность/Apache/Kafka…
- После завершения загрузки извлеките Kafka в указанный каталог:
cd /mydata/kafka/
tar -xzf kafka_2.13-2.8.0.tgz
- После завершения распаковки перейдите в каталог распаковки:
cd kafka_2.13-2.8.0
- Хотя есть новости, что Kafka собирается удалить Zookeeper, в последней версии Kafka он не был удален, так что вам все равно нужно запустить Zookeeper перед запуском Kafka;
- Запустите службу Zookeeper, служба будет работать в
2181
порт;
# 后台运行服务,并把日志输出到当前文件夹下的zookeeper-out.file文件中
nohup bin/zookeeper-server-start.sh config/zookeeper.properties > zookeeper-out.file 2>&1 &
- Поскольку Kafka в настоящее время развернута на сервере Linux, если вы хотите получить доступ к внешней сети, вам необходимо изменить файл конфигурации Kafka.
config/server.properties
, измените адрес прослушивания Kafka, иначе он не сможет подключиться;
############################# Socket Server Settings #############################
# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
# FORMAT:
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://192.168.5.78:9092
- Наконец, запустите службу Kafka, служба будет работать в
9092
порт.
# 后台运行服务,并把日志输出到当前文件夹下的kafka-out.file文件中
nohup bin/kafka-server-start.sh config/server.properties > kafka-out.file 2>&1 &
Операции командной строки Kafka
Далее мы используем командную строку для работы с Kafka и знакомимся с использованием Kafka.
- Сначала создайте
consoleTopic
Тема;
bin/kafka-topics.sh --create --topic consoleTopic --bootstrap-server 192.168.5.78:9092
- Затем проверьте Тему;
bin/kafka-topics.sh --describe --topic consoleTopic --bootstrap-server 192.168.5.78:9092
- Отобразится следующая информация о теме;
Topic: consoleTopic TopicId: tJmxUQ8QRJGlhCSf2ojuGw PartitionCount: 1 ReplicationFactor: 1 Configs: segment.bytes=1073741824
Topic: consoleTopic Partition: 0 Leader: 0 Replicas: 0 Isr: 0
- Отправить сообщение в тему:
bin/kafka-console-producer.sh --topic consoleTopic --bootstrap-server 192.168.5.78:9092
- Вы можете отправлять информацию прямо в командной строке;
- Снова откройте окно и получите сообщение из темы с помощью следующей команды:
bin/kafka-console-consumer.sh --topic consoleTopic --from-beginning --bootstrap-server 192.168.5.78:9092
Кафка визуализация
Использование командной строки для управления Kafka действительно немного громоздко, давайте попробуем следующий инструмент визуализации
kafka-eagle
.
Установить JDK
Если вы используете CentOS, полная версия JDK по умолчанию не установлена, вам нужно установить ее самостоятельно!
- Загрузите JDK 8, адрес загрузки:Зеркала. Дыхание. Его четырехлетний план. Квота. Талант/принять открытый JD…
- После завершения загрузки извлеките JDK в указанный каталог;
cd /mydata/java
tar -zxvf OpenJDK8U-jdk_x64_linux_xxx.tar.gz
mv OpenJDK8U-jdk_x64_linux_xxx.tar.gz jdk1.8
- существует
/etc/profile
Добавьте переменные окружения в файлJAVA_HOME
.
vi /etc/profile
# 在profile文件中添加
export JAVA_HOME=/mydata/java/jdk1.8
export PATH=$PATH:$JAVA_HOME/bin
# 使修改后的profile文件生效
. /etc/profile
Установитьkafka-eagle
- скачать
kafka-eagle
установочный пакет, адрес загрузки:GitHub.com/умный LOLI/can…
- После завершения загрузки появится
kafka-eagle
Распаковать в указанную директорию;
cd /mydata/kafka/
tar -zxvf kafka-eagle-web-2.0.5-bin.tar.gz
- существует
/etc/profile
Добавьте переменные окружения в файлKE_HOME
;
vi /etc/profile
# 在profile文件中添加
export KE_HOME=/mydata/kafka/kafka-eagle-web-2.0.5
export PATH=$PATH:$KE_HOME/bin
# 使修改后的profile文件生效
. /etc/profile
-
Установите MySQL и добавьте базу данных
ke
,kafka-eagle
будет использовать его позже; -
Изменить файл конфигурации
$KE_HOME/conf/system-config.properties
, главным образом, чтобы изменить конфигурацию Zookeeper и конфигурацию базы данных, закомментировать конфигурацию sqlite и вместо этого использовать MySQL;
######################################
# multi zookeeper & kafka cluster list
######################################
kafka.eagle.zk.cluster.alias=cluster1
cluster1.zk.list=localhost:2181
######################################
# kafka eagle webui port
######################################
kafka.eagle.webui.port=8048
######################################
# kafka sqlite jdbc driver address
######################################
# kafka.eagle.driver=org.sqlite.JDBC
# kafka.eagle.url=jdbc:sqlite:/hadoop/kafka-eagle/db/ke.db
# kafka.eagle.username=root
# kafka.eagle.password=www.kafka-eagle.org
######################################
# kafka mysql jdbc driver address
######################################
kafka.eagle.driver=com.mysql.cj.jdbc.Driver
kafka.eagle.url=jdbc:mysql://localhost:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
kafka.eagle.username=root
kafka.eagle.password=root
- Начните со следующей команды
kafka-eagle
;
$KE_HOME/bin/ke.sh start
- После выполнения команды будет отображена следующая информация, но это не означает, что служба запущена успешно, и нужно какое-то время подождать;
- Еще несколько полезных
kafka-eagle
Заказ:
# 停止服务
$KE_HOME/bin/ke.sh stop
# 重启服务
$KE_HOME/bin/ke.sh restart
# 查看服务运行状态
$KE_HOME/bin/ke.sh status
# 查看服务状态
$KE_HOME/bin/ke.sh stats
# 动态查看服务输出日志
tail -f $KE_HOME/logs/ke_console.out
- После успешного запуска вы можете получить прямой доступ, введите пароль учетной записи
admin:123456
,адрес:http://192.168.5.78:8048/
- После успешного входа в систему вы можете получить доступ к панели инструментов, и интерфейс по-прежнему великолепен!
Использование инструментов визуализации
- До того, как мы использовали командную строку для создания темы, здесь ее можно создать прямо через интерфейс;
- Мы также можем напрямую передать
kafka-eagle
Отправить сообщение;
- Мы можем получать сообщения в теме через командную строку;
bin/kafka-console-consumer.sh --topic testTopic --from-beginning --bootstrap-server 192.168.5.78:9092
- Информация, полученная из консоли, отображается следующим образом;
- Также есть интересная функция под названием
KSQL
, вы можете запрашивать сообщения в теме с помощью операторов SQL;
- Визуальные инструменты, естественно, незаменимы для мониторинга, если вы хотите открыть
kafka-eagle
Для функции мониторинга Kafka вам необходимо изменить сценарий запуска Kafka, чтобы открыть порт JMX;
vi kafka-server-start.sh
# 暴露JMX端口
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-server -Xms2G -Xmx2G -XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70"
export JMX_PORT="9999"
fi
- Взгляните на интерфейс диаграммы мониторинга;
- Также есть очень раздражающая функция мониторинга большого экрана;
- Существует также функция командной строки Zookeeper, короче говоря, она очень функциональна и мощна!
SpringBoot интегрирует Kafka
Также очень просто управлять Kafka в SpringBoot, например, режим сообщений Kafka очень простой, очереди нет, только Topic.
- Первое приложение
pom.xml
Добавьте зависимость Spring Kafka в;
<!--Spring整合Kafka-->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.7.1</version>
</dependency>
- Изменить файл конфигурации приложения
application.yml
, настройте адрес службы Kafka и потребительgroup-id
;
server:
port: 8088
spring:
kafka:
bootstrap-servers: '192.168.5.78:9092'
consumer:
group-id: "bootGroup"
- Создайте производителя для отправки сообщений в тему Кафки;
/**
* Kafka消息生产者
* Created by macro on 2021/5/19.
*/
@Component
public class KafkaProducer {
@Autowired
private KafkaTemplate kafkaTemplate;
public void send(String message){
kafkaTemplate.send("bootTopic",message);
}
}
- Создайте потребителя, чтобы получать сообщения от Kafka и потреблять;
/**
* Kafka消息消费者
* Created by macro on 2021/5/19.
*/
@Slf4j
@Component
public class KafkaConsumer {
@KafkaListener(topics = "bootTopic")
public void processMessage(String content) {
log.info("consumer processMessage : {}",content);
}
}
- Создать интерфейс для отправки сообщений и вызвать производителя для отправки сообщений;
/**
* Kafka功能测试
* Created by macro on 2021/5/19.
*/
@Api(tags = "KafkaController", description = "Kafka功能测试")
@Controller
@RequestMapping("/kafka")
public class KafkaController {
@Autowired
private KafkaProducer kafkaProducer;
@ApiOperation("发送消息")
@RequestMapping(value = "/sendMessage", method = RequestMethod.GET)
@ResponseBody
public CommonResult sendMessage(@RequestParam String message) {
kafkaProducer.send(message);
return CommonResult.success(null);
}
}
- Вызовите интерфейс прямо в Swagger для тестирования;
- Консоль проекта выведет следующую информацию, указывающую, что сообщение получено и использовано.
2021-05-19 16:59:21.016 INFO 2344 --- [ntainer#0-0-C-1] c.m.mall.tiny.component.KafkaConsumer : consumer processMessage : Spring Boot message!
Суммировать
Благодаря практике, описанной в этой статье, каждый может начать работу с Kafka. Установка, инструменты визуализации и SpringBoot — это в основном операции, связанные с разработчиками, и они также являются единственным способом изучения Kafka.
использованная литература
-
Официальная документация Кафки:kafka.apache.org/quickstart
-
kafka-eagle
Официальная документация:Woohoo. Кафка - eagle.org/articles/do… -
Понятия, связанные с Кафкой:nuggets.capable/post/684490…
Адрес исходного кода проекта
Эта статьяGitHubGitHub.com/macro-positive/…Записано, приветствую всех на Star!