Eagle, самый популярный графический инструмент Kafka, должен быть вам рекомендован!

Java задняя часть
Eagle, самый популярный графический инструмент Kafka, должен быть вам рекомендован!

Адрес фактического центра электронной коммерции SpringBoot (40k+star):GitHub.com/macro-positive/…

Резюме

Kafka — очень популярное промежуточное ПО для обмена сообщениями, согласно официальному сайту, его используют тысячи компаний. Недавно я практиковал волну Кафки, которая действительно хороша и мощна. Сегодня мы изучим Kafka с трех сторон: установка Kafka под Linux, инструмент визуализации Kafka и совместное использование Kafka и SpringBoot. Я надеюсь, что вы сможете быстро начать работу с Kafka и освоить это популярное промежуточное ПО для сообщений после его прочтения!

Знакомство с Кафкой

Кафка сделанаLinkedInПлатформа распределенного потока сообщений с открытым исходным кодом, разработанная компанией, написанная на Scala и Java. Основная функция — предоставить унифицированную платформу с высокой пропускной способностью и малой задержкой для обработки данных в реальном времени.发布订阅模式система обработки сообщений.

Кафка обладает следующими характеристиками:

  • Высокая пропускная способность и низкая задержка: Kafka очень быстро отправляет и получает сообщения, а задержка при обработке сообщений с использованием кластера может составлять всего 2 мс.
  • Высокая масштабируемость: Kafka может эластично расширяться и сжиматься, а также масштабироваться до тысяч брокеров, сотен тысяч разделов и обрабатывать триллионы сообщений в день.
  • Постоянное хранилище: Kafka может безопасно хранить данные в распределенном, надежном, отказоустойчивом кластере.
  • Высокая доступность: Kafka может эффективно расширять кластер в зоне доступности.Если узел выходит из строя, кластер все еще может нормально работать.

Кафка инсталляция

Мы будем использовать метод установки под Linux, а среда установки — CentOS 7.6. Docker здесь не используется для установки и развертывания Лично мне проще установить напрямую (в основном из-за того, что официальный образ Docker не предоставляется)!

  • После завершения загрузки извлеките Kafka в указанный каталог:
cd /mydata/kafka/
tar -xzf kafka_2.13-2.8.0.tgz
  • После завершения распаковки перейдите в каталог распаковки:
cd kafka_2.13-2.8.0
  • Хотя есть новости, что Kafka собирается удалить Zookeeper, в последней версии Kafka он не был удален, так что вам все равно нужно запустить Zookeeper перед запуском Kafka;

  • Запустите службу Zookeeper, служба будет работать в2181порт;
# 后台运行服务,并把日志输出到当前文件夹下的zookeeper-out.file文件中
nohup bin/zookeeper-server-start.sh config/zookeeper.properties > zookeeper-out.file 2>&1 &
  • Поскольку Kafka в настоящее время развернута на сервере Linux, если вы хотите получить доступ к внешней сети, вам необходимо изменить файл конфигурации Kafka.config/server.properties, измените адрес прослушивания Kafka, иначе он не сможет подключиться;
############################# Socket Server Settings #############################

# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://192.168.5.78:9092
  • Наконец, запустите службу Kafka, служба будет работать в9092порт.
# 后台运行服务,并把日志输出到当前文件夹下的kafka-out.file文件中
nohup bin/kafka-server-start.sh config/server.properties > kafka-out.file 2>&1 &

Операции командной строки Kafka

Далее мы используем командную строку для работы с Kafka и знакомимся с использованием Kafka.

  • Сначала создайтеconsoleTopicТема;
bin/kafka-topics.sh --create --topic consoleTopic --bootstrap-server 192.168.5.78:9092
  • Затем проверьте Тему;
bin/kafka-topics.sh --describe --topic consoleTopic --bootstrap-server 192.168.5.78:9092
  • Отобразится следующая информация о теме;
Topic: consoleTopic	TopicId: tJmxUQ8QRJGlhCSf2ojuGw	PartitionCount: 1	ReplicationFactor: 1	Configs: segment.bytes=1073741824
	Topic: consoleTopic	Partition: 0	Leader: 0	Replicas: 0	Isr: 0
  • Отправить сообщение в тему:
bin/kafka-console-producer.sh --topic consoleTopic --bootstrap-server 192.168.5.78:9092
  • Вы можете отправлять информацию прямо в командной строке;

  • Снова откройте окно и получите сообщение из темы с помощью следующей команды:
bin/kafka-console-consumer.sh --topic consoleTopic --from-beginning --bootstrap-server 192.168.5.78:9092

Кафка визуализация

Использование командной строки для управления Kafka действительно немного громоздко, давайте попробуем следующий инструмент визуализацииkafka-eagle.

Установить JDK

Если вы используете CentOS, полная версия JDK по умолчанию не установлена, вам нужно установить ее самостоятельно!

  • После завершения загрузки извлеките JDK в указанный каталог;
cd /mydata/java
tar -zxvf OpenJDK8U-jdk_x64_linux_xxx.tar.gz
mv OpenJDK8U-jdk_x64_linux_xxx.tar.gz jdk1.8
  • существует/etc/profileДобавьте переменные окружения в файлJAVA_HOME.
vi /etc/profile
# 在profile文件中添加
export JAVA_HOME=/mydata/java/jdk1.8
export PATH=$PATH:$JAVA_HOME/bin
# 使修改后的profile文件生效
. /etc/profile

Установитьkafka-eagle

  • После завершения загрузки появитсяkafka-eagleРаспаковать в указанную директорию;
cd /mydata/kafka/
tar -zxvf kafka-eagle-web-2.0.5-bin.tar.gz
  • существует/etc/profileДобавьте переменные окружения в файлKE_HOME;
vi /etc/profile
# 在profile文件中添加
export KE_HOME=/mydata/kafka/kafka-eagle-web-2.0.5
export PATH=$PATH:$KE_HOME/bin
# 使修改后的profile文件生效
. /etc/profile
  • Установите MySQL и добавьте базу данныхke,kafka-eagleбудет использовать его позже;

  • Изменить файл конфигурации$KE_HOME/conf/system-config.properties, главным образом, чтобы изменить конфигурацию Zookeeper и конфигурацию базы данных, закомментировать конфигурацию sqlite и вместо этого использовать MySQL;

######################################
# multi zookeeper & kafka cluster list
######################################
kafka.eagle.zk.cluster.alias=cluster1
cluster1.zk.list=localhost:2181

######################################
# kafka eagle webui port
######################################
kafka.eagle.webui.port=8048

######################################
# kafka sqlite jdbc driver address
######################################
# kafka.eagle.driver=org.sqlite.JDBC
# kafka.eagle.url=jdbc:sqlite:/hadoop/kafka-eagle/db/ke.db
# kafka.eagle.username=root
# kafka.eagle.password=www.kafka-eagle.org

######################################
# kafka mysql jdbc driver address
######################################
kafka.eagle.driver=com.mysql.cj.jdbc.Driver
kafka.eagle.url=jdbc:mysql://localhost:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
kafka.eagle.username=root
kafka.eagle.password=root
  • Начните со следующей командыkafka-eagle;
$KE_HOME/bin/ke.sh start
  • После выполнения команды будет отображена следующая информация, но это не означает, что служба запущена успешно, и нужно какое-то время подождать;

  • Еще несколько полезныхkafka-eagleЗаказ:
# 停止服务
$KE_HOME/bin/ke.sh stop
# 重启服务
$KE_HOME/bin/ke.sh restart
# 查看服务运行状态
$KE_HOME/bin/ke.sh status
# 查看服务状态
$KE_HOME/bin/ke.sh stats
# 动态查看服务输出日志
tail -f $KE_HOME/logs/ke_console.out
  • После успешного запуска вы можете получить прямой доступ, введите пароль учетной записиadmin:123456,адрес:http://192.168.5.78:8048/

  • После успешного входа в систему вы можете получить доступ к панели инструментов, и интерфейс по-прежнему великолепен!

Использование инструментов визуализации

  • До того, как мы использовали командную строку для создания темы, здесь ее можно создать прямо через интерфейс;

  • Мы также можем напрямую передатьkafka-eagleОтправить сообщение;

  • Мы можем получать сообщения в теме через командную строку;
bin/kafka-console-consumer.sh --topic testTopic --from-beginning --bootstrap-server 192.168.5.78:9092
  • Информация, полученная из консоли, отображается следующим образом;

  • Также есть интересная функция под названиемKSQL, вы можете запрашивать сообщения в теме с помощью операторов SQL;

  • Визуальные инструменты, естественно, незаменимы для мониторинга, если вы хотите открытьkafka-eagleДля функции мониторинга Kafka вам необходимо изменить сценарий запуска Kafka, чтобы открыть порт JMX;
vi kafka-server-start.sh
# 暴露JMX端口
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    export KAFKA_HEAP_OPTS="-server -Xms2G -Xmx2G -XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70"
    export JMX_PORT="9999"
fi
  • Взгляните на интерфейс диаграммы мониторинга;

  • Также есть очень раздражающая функция мониторинга большого экрана;

  • Существует также функция командной строки Zookeeper, короче говоря, она очень функциональна и мощна!

SpringBoot интегрирует Kafka

Также очень просто управлять Kafka в SpringBoot, например, режим сообщений Kafka очень простой, очереди нет, только Topic.

  • Первое приложениеpom.xmlДобавьте зависимость Spring Kafka в;
<!--Spring整合Kafka-->
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.7.1</version>
</dependency>
  • Изменить файл конфигурации приложенияapplication.yml, настройте адрес службы Kafka и потребительgroup-id;
server:
  port: 8088
spring:
  kafka:
    bootstrap-servers: '192.168.5.78:9092'
    consumer:
      group-id: "bootGroup"
  • Создайте производителя для отправки сообщений в тему Кафки;
/**
 * Kafka消息生产者
 * Created by macro on 2021/5/19.
 */
@Component
public class KafkaProducer {
    @Autowired
    private KafkaTemplate kafkaTemplate;

    public void send(String message){
        kafkaTemplate.send("bootTopic",message);
    }
}
  • Создайте потребителя, чтобы получать сообщения от Kafka и потреблять;
/**
 * Kafka消息消费者
 * Created by macro on 2021/5/19.
 */
@Slf4j
@Component
public class KafkaConsumer {

    @KafkaListener(topics = "bootTopic")
    public void processMessage(String content) {
        log.info("consumer processMessage : {}",content);
    }

}
  • Создать интерфейс для отправки сообщений и вызвать производителя для отправки сообщений;
/**
 * Kafka功能测试
 * Created by macro on 2021/5/19.
 */
@Api(tags = "KafkaController", description = "Kafka功能测试")
@Controller
@RequestMapping("/kafka")
public class KafkaController {

    @Autowired
    private KafkaProducer kafkaProducer;

    @ApiOperation("发送消息")
    @RequestMapping(value = "/sendMessage", method = RequestMethod.GET)
    @ResponseBody
    public CommonResult sendMessage(@RequestParam String message) {
        kafkaProducer.send(message);
        return CommonResult.success(null);
    }
}
  • Вызовите интерфейс прямо в Swagger для тестирования;

  • Консоль проекта выведет следующую информацию, указывающую, что сообщение получено и использовано.
2021-05-19 16:59:21.016  INFO 2344 --- [ntainer#0-0-C-1] c.m.mall.tiny.component.KafkaConsumer    : consumer processMessage : Spring Boot message!

Суммировать

Благодаря практике, описанной в этой статье, каждый может начать работу с Kafka. Установка, инструменты визуализации и SpringBoot — это в основном операции, связанные с разработчиками, и они также являются единственным способом изучения Kafka.

использованная литература

Адрес исходного кода проекта

GitHub.com/macro-positive/…

Эта статьяGitHubGitHub.com/macro-positive/…Записано, приветствую всех на Star!