Spring Boot 2.X Combat -- очередь сообщений (RocketMQ)

Spring Boot

Автор: Сяосянь

Репозиторий исходного кода:GitHub.com/Округ Чжашуй/…

в предыдущем разделе«Актуальная база данных SQL (MyBatis)»В нем в основном рассказывается, как MyBatis подключается к базе данных и реализует такие операции, как добавление, удаление, изменение и запрос данных. В этом разделе мы интегрируем RocketMQ с Spring Boot. Промежуточное ПО для сообщений является важным компонентом современных распределенных систем. RocketMQ — это промежуточное ПО для распределенных сообщений с открытым исходным кодом, обеспечивающее низкую задержку, высокую производительность, высокую доступность и масштабируемые службы публикации сообщений и подписки, поддерживающие емкость уровня триллиона.

RocketMQ был разработан командой Alibaba с использованием языка Java и предоставлен Apache Foundation в 2016 году. Это проект Apache верхнего уровня.

1) Установка и эксплуатация RocketMQ

Требуется для установки и запуска RocketMQпредпосылки:

  • 64-битная система, рекомендуется Linux/Unix/Mac, также может работать система Windows
  • 64-битный JDK 1.8+
  • 4G бесплатный диск

Загрузите RocketMQ 4.6.1 и откройте официальный сайт RocketMQ.ракета в настоящее время.apache.org/release_not…, выберите бинарный файл:

下载 RocketMQ

После завершения загрузки разархивируйте его в каталог установки, откройте терминал, чтобы войти в каталог установки ROCKETMQ_HOME, и выполните следующую команду:

Установите минимальный и максимальный объем памяти для JVM.

# 打开 runbroker.sh 或者 runbroker.cmd(Windows)
# 根据电脑内存情况设置 JVM 最大内存和最小内存
JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g"

Запустить сервер имен

> nohup sh bin/mqnamesrv &
> tail -f ~/logs/rocketmqlogs/namesrv.log
# 如果成功的话可以看到这样的内容
The Name Server boot success. serializeType=JSON

Запустите брокера

> nohup sh bin/mqbroker -n localhost:9876 &
> tail -f ~/logs/rocketmqlogs/broker.log 
The broker[..., ...] boot success. serializeType=JSON and name server is localhost:9876

Закрыть сервер

 > sh bin/mqshutdown broker
 > sh bin/mqshutdown namesrv

Система Windows

# Windows 系统需要设置环境变量 %ROCKETMQ_HOME%
> cd %ROCKETMQ_HOME%\bin
> .\mqnamesrv
# 成功后会在终端看到这样子的输出
The Name Server boot success. serializeType=JSON
# 重新打开一个终端
> cd %ROCKETMQ_HOME%\bin
> .\mqbroker.cmd -n localhost:9876
The broker[..., ...] boot success. serializeType=JSON and name server is localhost:9876
# 在 Windows 关闭 Server 通过关闭终端或者 Ctrl + C 终止任务吧

2) Начать

rocketmq-spring-boot-starterЭто стартер (Starter) для Spring Boot для быстрой интеграции с RocketMQ, для которого требуется Spring Boot 2.0 и выше.

Практическая Spring Boot интегрирует RocketMQ для написания сообщений (Producer) и использования сообщений (Consumer).

2.1) Новый проект и общая конфигурация

Будет два новых проекта, 04-rocketmq-producer и 04-rocketmq-consumer, для производства и потребления информации соответственно. Выберите 2.1.13 для Spring Boot и выберите Spring Web для зависимостей.За исключением имени проекта, другие конфигурации в основном одинаковы.

Добавить кrocketmq-spring-boot-starter:

// Gradle
// https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-spring-boot-starter
compile group: 'org.apache.rocketmq', name: 'rocketmq-spring-boot-starter', version: '2.1.0'
<!-- Maven -->
<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-spring-boot-starter -->
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.1.0</version>
</dependency>

настроитьapplication.properties

# 04-rocketmq-producer 不需要设置 spring.main.web-application-type
# none 表示不启动 Web 容器
spring.main.web-application-type=none
spring.application.name=rocketmq-consumer
# RocketMQ Name Server (替换为 RocketMQ 的 IP 地址和端口号)
rocketmq.name-server=192.168.128.10:9876
# 兹定于 Name Server
boot.rocketmq.NameServer=192.168.128.10:9876
# 程序用使用到的属性配置 (替换为 RocketMQ 的 IP 地址和端口号)
boot.rocketmq.topic=string-topic
boot.rocketmq.topic.user=user-topic
boot.rocketmq.tag=tagA

Создайте новый класс User в обоих проектах:

public class User {
    private String userName;
    private Byte userAge;
   	// 省略 Getter Setter
    @Override
    public String toString() {
        return "User{" +
                "userName='" + userName + '\'' +
                ", userAge=" + userAge +
                '}';
    }
}

2.2) Производитель реализует написание сообщений

Название проекта 04-rocketmq-producer. Реализует запись сообщений, полученных от RESTful API, в RocketMQ.

Создайте новый класс ProducerService.class:

@Service
public class ProducerService {
    private Logger logger = LoggerFactory.getLogger(getClass());

    @Resource
    private RocketMQTemplate mqTemplate;

    @Value(value = "${boot.rocketmq.topic}")
    private String springTopic;

    @Value(value = "${boot.rocketmq.topic.user}")
    private String userTopic;

    @Value(value = "${boot.rocketmq.tag}")
    private String tag;

    public SendResult sendString(String message) {
        // 发送 String 类型的消息
        // 调用 RocketMQTemplate 的 syncSend 方法
        SendResult sendResult = mqTemplate.syncSend(springTopic + ":" + tag, message);
        logger.info("syncSend String to topic {} sendResult={} \n", springTopic, sendResult);
        return sendResult;
    }

    public SendResult sendUser(User user) {
        // 发送 User
        SendResult sendResult = mqTemplate.syncSend(userTopic, user);
        logger.info("syncSend User to topic {} sendResult= {} \n", userTopic, sendResult);
        return sendResult;
    }
}

Анализ кода:

@Value(value = "${boot.rocketmq.topic}"): будетapplication.propertiesЗначение boot.rocketmq.topic, определенное в файле, автоматически вставляется в переменную springTopic.

Создайте новый RESTful API, ProducerController.class

@RestController
@RequestMapping("/producer")
public class ProducerController {
    @Resource ProducerService producerService;

    @PostMapping("/string")
    public SendResult sendString(@RequestBody String message){
        return producerService.sendString(message);
    }

    @PostMapping("/user")
    public SendResult sendUser(@RequestBody User user){
        return producerService.sendUser(user);
    }
}

2.2) Информация о потребительском потреблении

Имя проекта — 04-rocketmq-consumer, который реализует чтение и потребление сообщений в RocketMQ. УведомлениеЭтот проект не требует запуска веб-контейнера.

StringConsumer.classПотреблять сообщения типа String.

@Service
@RocketMQMessageListener(topic = "${boot.rocketmq.topic}", consumerGroup = "string_consumer", selectorExpression = "${boot.rocketmq.tag}")
public class StringConsumer implements RocketMQListener<String> {
    private Logger logger = LoggerFactory.getLogger(getClass());

    @Override
    public void onMessage(String message) {
        // 重写消息处理方法
        logger.info("------- StringConsumer received:{} \n", message);
        // TODO 对消息进行处理,比如写入数据
    }
}

UserConsumer.classПотреблять сообщения типа Пользователь

@Service
@RocketMQMessageListener(nameServer = "${boot.rocketmq.NameServer}", topic = "${boot.rocketmq.topic.user}", consumerGroup = "user_consumer")
public class UserConsumer implements RocketMQListener<User> {
    private Logger logger = LoggerFactory.getLogger(getClass());
    @Override
    public void onMessage(User user) {
        logger.info("######## user_consumer received: {} ; age: {} ; name: {} \n", user,user.getUserAge(),user.getUserName());
        // TODO 对消息进行处理User
    }
}

Анализ кода:

@RocketMQMessageListener: указывает тему, ConsumerGroup, selectorExpression и т. д. для мониторинга;

тема: тема сообщения, указывающая тип сообщения, например тема строки и тема пользователя выше, тема = "тема строки" указывает, что значение потребляет сообщение темы темы строки;

consumerGroup: группа потребителей, одна и та же группа потребителей обычно потребляет одно и то же сообщение;

selectorExpression: Выберите тег, selectorExpression="tagA", используйте только сообщение с тегом tagA, по умолчанию "*", то есть все теги;

RocketMQListener: Для реализации RocketMQListener нам нужно только переписать метод обработки сообщений;

3) Запустить проект

Запустите RocketMQ и запустите 04-rocketmq-producer и 04-rocketmq-consumer соответственно.

Веб-порт, на котором работает источник, — 8080, а потребитель не запускает веб-контейнер.

Запустите Consumer, и вы увидите следующий вывод журнала:

DefaultRocketMQListenerContainer{consumerGroup='user_consumer', nameServer='192.168.128.10:9876', topic='user-topic', consumeMode=CONCURRENTLY, selectorType=TAG, selectorExpression='*', messageModel=CLUSTERING}
2020-03-16 23:11:19.636  INFO 16092 --- [           main] o.a.r.s.a.ListenerContainerConfiguration : Register the listener to container, listenerBeanName:userConsumer, containerBeanName:org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer_1
2020-03-16 23:11:19.924  INFO 16092 --- [           main] a.r.s.s.DefaultRocketMQListenerContainer : running container: DefaultRocketMQListenerContainer{consumerGroup='string_consumer', nameServer='192.168.128.10:9876', topic='string-topic', consumeMode=CONCURRENTLY, selectorType=TAG, selectorExpression='tagA', messageModel=CLUSTERING}
2020-03-16 23:11:19.924  INFO 16092 --- [           main] o.a.r.s.a.ListenerContainerConfiguration : Register the listener to container, listenerBeanName:stringConsumer, containerBeanName:org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer_2

Открыть Postname, тестировать сообщения типа String, доступhttp://localhost:8080/producer/string

image-20200316231442381

Вывод журнала производителя:

2020-03-16 23:14:21.681  INFO 16776 --- [nio-8080-exec-2] org.xian.producer.ProducerService        : 
syncSend String to topic string-topic sendResult=SendResult [sendStatus=SEND_OK, msgId=0000000000000000000000000000000100001F89AB83523BF6E30000, offsetMsgId=C0A8800A00002A9F000000000003ADF2, messageQueue=MessageQueue [topic=string-topic, brokerName=master, queueId=2], queueOffset=4] 

Вывод журнала потребителей:

2020-03-16 23:14:21.983  INFO 16092 --- [MessageThread_1] org.xian.consumer.StringConsumer         : 
------- StringConsumer received:Hello RocketMQ By Spring Boot 0

Чтобы протестировать сообщения типа User, откройтеhttp://localhost:8080/producer/user

image-20200316231826149

Вывод журнала производителя:

2020-03-16 23:18:11.548  INFO 16776 --- [nio-8080-exec-5] org.xian.producer.ProducerService        : 
syncSend User to topic user-topic sendResult= SendResult [sendStatus=SEND_OK, msgId=0000000000000000000000000000000100001F89AB83523F79590003, offsetMsgId=C0A8800A00002A9F000000000003B11F, messageQueue=MessageQueue [topic=user-topic, brokerName=master, queueId=3], queueOffset=2] 

Вывод журнала потребителей:

2020-03-16 23:18:11.591  INFO 16092 --- [MessageThread_1] org.xian.consumer.UserConsumer           : 
######## user_consumer received: User{userName='RocketMQ With Spring Boot', userAge=4} ; age: 4 ; name: RocketMQ With Spring Boot 

Справочник и расширенное чтение:

В этой главе в основном рассказывается, как запустить RocketMQ на одном компьютере и как Spring Boot интегрирует RocketMQ и реализует генерацию и потребление сообщений, Как запустить RocketMQ в кластере и расширенное использование RocketMQ здесь не будет. В следующей главе мы начнем реальную боевую структуру безопасности Spring, которая в основном включает:

  • Spring Security
  • Spring Security интегрирует JJWT для реализации входа и проверки токена.
  • Интеграция Широ (токен)
  • Реализовать вход в WeChat (токен)

Добро пожаловать в "Advanced Programming Technology" или в блог Xiaoxian.