Промежуточное ПО для сообщений - построение среды RocketMQ (1) (полная версия)

Java
Промежуточное ПО для сообщений - построение среды RocketMQ (1) (полная версия)

求关注
环境搭建

Немного позитива в каждой главе: самоконтроль — это инстинкт сильнейшего. - Бернард Шоу

предисловие

В последнее время я изучаю промежуточное ПО сообщений — RocketMQ, и планирую записывать этот процесс обучения. Эта глава в основном знакомит с настройкой среды. На этот раз это в основном автономная конструкция (с ограниченными условиями), включая сборку в средах Windows и Linux, а также сборку платформы мониторинга консоли, и, наконец, добавлена ​​демонстрация для проверки.

Подготовка окружающей среды

Перед сборкой RocketMQ убедитесь, что настроена следующая среда.

  • Среда Java (мой JDK1.8)
  • Среда Maven (моя последняя версия 3.6.1)
  • Git-окружение

Студенты, которые еще не построили, проходят через портал:

Создание среды JDK:Создание среды JAVA8Создание среды Maven:Использование Nexus 3.X в среде Windows для создания частного сервера Maven и использования введенияСреда Git для сборки:Создание и настройка окружения Git


1. Сборка в среде Windows

1.1 Скачать

Официальный сайт:rocketmq.apache.org/

官网
Последняя версия V4.5.0, нажмите, чтобы войти.
rocketmq-all-4.5.0-bin-release.zip
Выберите загрузку Rocketmq-all-4.5.0-bin-release.zip. Появится еще одна страница, здесь выберите Rocketmq-all-4.5.0-bin-release.zip для загрузки.
rocketmq-all-4.5.0-bin-release.zip
После успешной загрузки выберите каталог и разархивируйте его.
解压

1.2 Изменить конфигурацию JVM

После выполнения вышеуказанных операций заходим в директорию bin директория, здесь яH:\rocketmq\rocketmq-all-4.5.0-bin-release\rocketmq-all-4.5.0-bin-release\bin. оказатьсяrunserver.cmdиrunbroker.cmdJAVA_OPT в .

在这里插入图片描述
Исходный JAVA_OPT:

set "JAVA_OPT=%JAVA_OPT% -server -Xms2g -Xmx2g -Xmn1g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"

Измените два значения Xms Xmx на меньшие, на 1g, например:

set "JAVA_OPT=%JAVA_OPT% -server -Xms1g -Xmx1g -Xmn1g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"

Вы можете установить его в соответствии с объемом памяти виртуальной машины.При превышении объема памяти может быть сообщено об ошибке.

1.3 Настройка переменных среды

После выполнения вышеуказанных шагов нам нужно настроить каталог bin каталога установки RocketMQ в переменной среды.

RocketMQ_HOME

1.4 Запуск

Вышеупомянутая конфигурация завершена, следующим шагом является процесс запуска. В середине есть яма, обязательно установите ее в соответствии с шагами.

В каталоге bin каталога установки RocketMQ выполните команду cmd:

мой каталог:

H:\rocketmq\rocketmq-all-4.5.0-bin-release\rocketmq-all-4.5.0-bin-release\bin

Параметры окна cmd можно вызвать с помощью Shift+щелчок правой кнопкой мыши. Вы также можете ввести cmd в окне через win + R, войти в окно cmd и перейти в каталог bin.

1.4.1 Запустите NAMESERVER

  • Выполните команду: запустите mqnamesrv.cmd

После успеха появится окно подсказки, не закрывайте это окно.

success

1.4.3 Запуск БРОКЕРА

  • Выполните команду: «запустить mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true»

Примечание. Если появится всплывающее окно с сообщением «Ошибка: не удалось найти или загрузить основной класс xxxxxx». Откройте runbroker.cmd и поместите «%CLASSPATH%» в двойные кавычки.

失败
Открытымrunbroker.cmdмодифицировать Оригинал:

set "JAVA_OPT=%JAVA_OPT% -cp %CLASSPATH%"

После модификации:

set "JAVA_OPT=%JAVA_OPT% -cp "%CLASSPATH%""

修改后
Выполните команду еще раз: Успешно стартовал!

成功
Всего три окна.


2. Установите консольный мониторинг

2.1 Скачать

ссылка для скачивания:GitHub.com/Apache/рок…

После загрузки, как показано на рисунке: выберите --> RocketMQ-Console

rocketmq-console

2.2 Конфигурация

После завершения загрузки перейдите в папку «rocketmq-externals\rocketmq-console\src\main\resources» и откройте «application.properties» для настройки.

修改配置
修改配置

2.2 Скомпилируйте и запустите

Войдите в папку '\ rocketmq-externals\rocketmq-console', выполните 'mvn clean package -Dmaven.test.skip=true', скомпилируйте и сгенерируйте. В середине происходит относительно медленный процесс загрузки, который нужно подождать.

编译
После успешной компиляции cmd входит в «целевую» папку, выполняет «java -jar rocketmq-console-ng-1.0.1.jar» и запускает «rocketmq-console-ng-1.0.1.jar».

rocketmq-console-ng-1.0.1.jar
rocketmq-console-ng-1.0.1.jar

2.3 Просмотр

Адрес доступа: локальный: 8082

界面

2. Сборка в среде Linux

2.1 Подготовка окружающей среды

  • Java-среда
  • Среда Maven

2.1.1 Среда Linux для сборки Jdk

Скачать JDK:Woohoo. Сеть Oracle.com/tech…

Скачайте нужную версию:

下载地址

Загрузить в созданный каталог /usr/java

команда распаковать

tar -zxvf jdk-8u181-linux-x64.tar.gz

команда настройки переменной среды

vim /etc/profile
 
JAVA_HOME=/usr/java/jdk1.8.0_161
JRE_HOME=/usr/java/jdk1.8.0_161/jre
CLASS_PATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib
PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin
export JAVA_HOME JRE_HOME CLASS_PATH PATH
 
source /etc/profile

Убедитесь, что команда выполнена успешно


java -version

JDK安装成功

Выполните описанные выше шаги, чтобы завершить установку JDK. Затем установите среду Maven.

2.1.2 Сборка Maven в среде Linux

  1. Команда загрузки:

wget http://mirror.bit.edu.cn/apache/maven/binaries/apache-maven-3.2.2-bin.tar.gz

  1. Команда распаковать:

tar -zxvf apache-maven-3.2.2-bin.tar.gz

  1. Настройте команду среды Maven:

vim /etc/profile
 
#配置maven环境变量
export MAVEN_HOME=/usr/maven/apache-maven-3.5.4
export MAVEN_HOME
export PATH=$PATH:$MAVEN_HOME/bin
 
source /etc/profile

  1. Подтвердите успех команды:

mvn -v


Maven安装成功

2.2 Скачать RocketMQ

  1. Команда загрузки:

wget http://mirrors.hust.edu.cn/apache/rocketmq/4.4.0/rocketmq-all-4.4.0-source-release.zip

  1. Команда распаковать:
unzip rocketmq-all-4.4.0-source-release.zip

解压完毕

  1. построить двоичную команду

Перейдите в папку с распакованным файлом.

mvn -Prelease-all -DskipTests clean install -U

构建二进制成功

2.3 Изменить конфигурацию JVM

Как и в среде Windows, измените конфигурацию JVM. перейти в каталог/home/rocketmq/rocketmq-all-4.4.0/distribution/target/apache-rocketmq/binсередина. Отредактируйте каталог binrunserver.shиrunbroker.shдокумент.

Изменить в соответствии с размером личной виртуальной машины


vim runserver.sh 
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:PermSize=64m -XX:MaxPermSize=128m"
vim runbroker.sh
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:PermSize=64m -XX:MaxPermSize=128m"

修改JVM配置

2.4 Настройка переменных среды RocketMQ

Выполните следующие команды соответственно:

#修改环境变量
vim /etc/profile

export ROCKETMQ=/home/rocketmq/rocketmq-all-4.4.0/distribution/target/apache-rocketmq
export PATH=$PATH:$ROCKETMQ/bin

#更新配置
source /etc/profile

配置RocketMQ环境变量

2.5 Запустите ИМЯСЕРВЕР

все еще в предыдущем каталоге/home/rocketmq/rocketmq-all-4.4.0/distribution/target/apache-rocketmq

  • Выполнение заказа:
##启动命令
nohup sh bin/mqnamesrv  >/dev/null 2>&1 &

##查看日志
tail -f ~/logs/rocketmqlogs/namesrv.log

启动NAMESERVER

Видно, что картина удалась!

2.6 Запуск БРОКЕРА

  • Выполнение заказа:
##启动命令
nohup sh bin/mqbroker -n localhost:9876 &

##查看日志
tail -f ~/logs/rocketmqlogs/broker.log

启动BROKER

Обратите внимание на брандмауэр.Если подключение к порту не удается, обратите внимание на его открытие.

2.7 Команда отключения

sh bin/mqshutdown broker    //停止 broker
 
sh bin/mqshutdown namesrv   //停止 nameserver

2.8 Настройка платформы мониторинга консоли

Сборка с платформой Windows

2.8.1 Запустить консоль

Я напрямую перекинул пакет jar, упакованный на платформе Windows, в систему Linux.

  • Команда запуска:
java -jar rocketmq-console-ng-1.0.1.jar

console 启动

2.8.2 Доступ к интерфейсу управления Консолью

адрес:http://192.168.220.72:8082

Console管理界面


3. Описание платформы консольного мониторинга

Не вдаваясь в подробности здесь, вы можете обратиться к следующим статьям.

Адрес официального сайта:GitHub.com/Apache/рок…

Адреса других блогов:Живой .net/rocket пока только…

3. Кейс-тест

Среда интеграции корпуса: среда SpringBoot Дело взято из интернета

3.1 файл pom.xml


<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.1.4.RELEASE</version>
		<relativePath/> <!-- lookup parent from repository -->
	</parent>
	<groupId>com.coderprogramming.rocketmq</groupId>
	<artifactId>rocketmq</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<name>rocketmq</name>
	<description>Demo project for Spring Boot</description>

	<properties>
		<java.version>1.8</java.version>
	</properties>

	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>

		<dependency>
			<groupId>org.apache.rocketmq</groupId>
			<artifactId>rocketmq-spring-boot-starter</artifactId>
			<version>2.0.2</version>
		</dependency>
	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>

</project>

3.2 Производитель производитель


**
&emsp;* @Description: 生产者
&emsp;* @author Coder编程
&emsp;* @date 2019/5/8 17:08
&emsp;*/

@Component
public class Producer {
    /**
     * 生产者的组名
     */
    @Value("${apache.rocketmq.producer.producerGroup}")
    private String producerGroup;

    /**
     * NameServer 地址
     */
    @Value("${apache.rocketmq.namesrvAddr}")
    private String namesrvAddr;


    public void orderedProducer() throws MQClientException, InterruptedException {
        /**
         * 一个应用创建一个Producer,由应用来维护此对象,可以设置为全局对象或者单例
         * 注意:ProducerGroupName需要由应用来保证唯一
         * ProducerGroup这个概念发送普通的消息时,作用不大,但是发送分布式事务消息时,比较关键,
         * 因为服务器会回查这个Group下的任意一个Producer
         */
        DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
        producer.setNamesrvAddr(namesrvAddr);
        /**
         * Producer对象在使用之前必须要调用start初始化,初始化一次即可 注意:切记不可以在每次发送消息时,都调用start方法
         */
        producer.start();


        /**
         * 下面这段代码表明一个Producer对象可以发送多个topic,多个tag的消息。
         * 注意:send方法是同步调用,只要不抛异常就标识成功。但是发送成功也可会有多种状态
         * 例如消息写入Master成功,但是Slave不成功,这种情况消息属于成功,但是对于个别应用如果对消息可靠性要求极高,
         * 需要对这种情况做处理。另外,消息可能会存在发送失败的情况,失败重试由应用来处理。
         */
        try {
            for (int i = 0; i < 10; i++) {
                    Message msg = new Message("Topic1",// topic
                            "TagA",// tag
                            "001",// key
                            ("Send Msg:Hello MetaQ1").getBytes());// body
                    SendResult sendResult = producer.send(msg);
                    System.out.println(sendResult);

                    Message msg2 = new Message("Topic2",// topic
                            "TagB",// tag
                            "002",// key
                            ("Send Msg:Hello MetaQ2").getBytes());// body
                    SendResult sendResult2 = producer.send(msg2);
                    System.out.println(sendResult2);


                    Message msg3 = new Message("Topic3",// topic
                            "TagC",// tag
                            "003",// key
                            ("Send Msg:Hello MetaQ3").getBytes());// body
                    SendResult sendResult3 = producer.send(msg3);
                    System.out.println(sendResult3);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        /**
         * 应用退出时,要调用shutdown来清理资源,关闭网络连接,从MetaQ服务器上注销自己
         * 注意:我们建议应用在JBOSS、Tomcat等容器的退出钩子里调用shutdown方法
         */
        producer.shutdown();
    }

}


3.3 Потребитель



/**
&emsp;* @Description: 消费者
&emsp;* @author Coder编程
&emsp;* @date 2019/5/8 17:08
&emsp;*/

@Component
public class Consumer {

    /**
     * 生产者的组名
     */
    @Value("${apache.rocketmq.producer.producerGroup}")
    private String producerGroup;

    /**
     * NameServer 地址
     */
    @Value("${apache.rocketmq.namesrvAddr}")
    private String namesrvAddr;


    /**
     * 当前例子是PushConsumer用法,使用方式给用户感觉是消息从RocketMQ服务器推到了应用客户端。
     * 但是实际PushConsumer内部是使用长轮询Pull方式从Broker拉消息,然后再回调用户Listener方法
     */
    public void orderedConsumer() throws InterruptedException,MQClientException {
        /**
         * 一个应用创建一个Consumer,由应用来维护此对象,可以设置为全局对象或者单例
         * 注意:ConsumerGroupName需要由应用来保证唯一
         */
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(producerGroup);
        // consumer.setNamesrvAddr("10.10.0.102:9876");
        consumer.setNamesrvAddr(namesrvAddr);
        /**
         * 订阅指定topic下tags分别等于TagA或TagC或TagD
         */
        consumer.subscribe("Topic1", "TagA || TagC || TagD");
        /**
         * 订阅指定topic下所有消息<br>
         * 注意:一个consumer对象可以订阅多个topic
         */
        consumer.subscribe("Topic2", "*");
        consumer.subscribe("Topic3", "*");


        /**
         * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费 如果非第一次启动,那么按照上次消费的位置继续消费
         */
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            /**
             * 默认msgs里只有一条消息,可以通过设置consumeMessageBatchMaxSize参数来批量接收消息
             */
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(
                    List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);

                MessageExt msg = msgs.get(0);
                if (msg.getTopic().equals("Topic1")) {
                    if (null != msg.getTags()) {
                        // 执行Topic1的消费逻辑
                        if (msg.getTags().equals("TagA")) {
                            // 执行TagA的消费
                            System.out.println("TagA开始。");
                        } else if (msg.getTags().equals("TagC")) {
                            System.out.println("TagC开始。");
                            // 执行TagC的消费
                        } else if (msg.getTags().equals("TagD")) {
                            // 执行TagD的消费
                            System.out.println("TagD开始。");
                        }
                    }
                } else if (msg.getTopic().equals("Topic2")) {
                    // 执行Topic2的消费逻辑
                    System.out.println("Topic2");
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        /**
         * Consumer对象在使用之前必须要调用start初始化,初始化一次即可
         */
        consumer.start();
        System.out.println("Consumer Started.");
    }

}

3.3 файл конфигурации свойств


# 消费者的组名
apache.rocketmq.consumer.PushConsumer=PushConsumer
# 生产者的组名
apache.rocketmq.producer.producerGroup=Producer
# NameServer地址
apache.rocketmq.namesrvAddr=192.168.220.72:9876
# 设置应用端口
server.port=8089

3.4 Тестовый код


/**
 * @author Coder编程
 * @Title: HelloWord
 * @ProjectName rocketmq
 * @Description: Hello World
 * @date 2019/5/814:14
 */

@RestController
public class Test {

    @Autowired
    private Producer producer;

    @Autowired
    private Consumer consumer;

    @RequestMapping("/test")
    public String testMQ2() {
        try {
            System.out.println("-----------------开始生产-----------------");
            producer.orderedProducer();
            System.out.println("-----------------开始消费-----------------");
            consumer.orderedConsumer();
        } catch (Exception e) {
            e.printStackTrace();
        }
        return "success";
    }

}


4. Предоставьте исходный код

Приведенный выше установочный пакет jar и исходный код тестового случая были загружены на GitHub/Gitee.

源码图
Адрес источника:

адрес гитхаба

Адрес дома

конец статьи

Добро пожаловать в публичный аккаунт:Программирование кодераПолучайте последние оригинальные технические статьи и соответствующие бесплатные учебные материалы и изучайте технические знания в любое время и в любом месте!

微信公众号
求关注