Spring Cloud Alibaba (5) Реализация асинхронной связи RocketMQ

база данных

В этой статье обсуждается, как использовать RocketMQ Binder для подписки и публикации сообщений приложения Spring Cloud.

представлять

RocketMQЭто распределенная система обмена сообщениями с открытым исходным кодом, основанная на технологии распределенного кластера с высокой доступностью, предоставляющая услуги публикации сообщений и подписки с низкой задержкой и высокой надежностью, широко используемая во многих областях, включая развязку асинхронной связи, корпоративные решения, финансовые платежи, телекоммуникации. , электронная коммерция, экспресс-логистика, рекламный маркетинг, социальные сети, обмен мгновенными сообщениями, мобильные приложения, мобильные игры, видео, Интернет вещей, Интернет транспортных средств и др.

RocketMQ - это промежуточное ПО для распределенного обмена сообщениями с открытым исходным кодом от Alibaba, выпущенное в 2012 году. Оно было передано в дар Apache Software Foundation и 25 сентября 2017 года стало проектом высшего уровня Apache. В качестве отечественного промежуточного программного обеспечения, которое неоднократно проходило крещение в «суперпроектах», таких как Double Eleven от Alibaba, и имеет стабильную и выдающуюся производительность, в последние годы оно используется все больше и больше благодаря своей высокой производительности, низкой задержке и высокой надежности. , используется отечественными компаниями.

Возможности RocketMQ

  • Это промежуточное программное обеспечение сообщений модели очереди, которое обладает характеристиками высокой производительности, высокой надежности, высокой производительности в реальном времени, распределенной и так далее.
  • Производитель, Потребитель и Очередь могут быть распределены
  • Источник отправляет сообщения в некоторые очереди по очереди. Набор очередей называется темой. Если потребитель осуществляет широковещательное потребление, один экземпляр потребителя потребляет все очереди, соответствующие этой теме. Если он используется для потребления кластером, несколько экземпляров получателя потребляют набор очередей, соответствующих данной Теме в среднем.
  • Возможность гарантировать строгий порядок сообщений
  • Поддерживает два режима сообщений pull (тянуть) и push (толкать)
  • Эффективная горизонтальная масштабируемость абонента
  • Механизм подписки на сообщения в реальном времени
  • Возможность накопления сообщений на уровне миллиардов
  • Поддерживает несколько протоколов обмена сообщениями, таких как JMS, OpenMessaging и т. д.
  • менее зависимый

Spring Cloud Stream

Spring Cloud Stream — это платформа для создания микросервисов, управляемых сообщениями.

Spring Cloud Stream предоставляет унифицированную абстракцию для конфигурации промежуточного программного обеспечения сообщений и представляет поддержку унифицированной модели для публикаций и подписок, групп потребителей, семантики и разделов с отслеживанием состояния.

Основными компонентами Spring Cloud Stream являются: Binders, Bindings и Message. Приложение взаимодействует с связующим через входы или выходы и привязывается через нашу конфигурацию, а связующее отвечает за взаимодействие с промежуточным программным обеспечением. Сообщение представляет собой унифицированный формат спецификации данных. для обмена данными.

  • Привязка: включая привязку ввода и привязку вывода.

Связывание обеспечивает мост между промежуточным программным обеспечением сообщений и поставщиком и потребителем, предоставляемым приложением, так что разработчикам нужно использовать только поставщика или потребителя приложения для создания или потребления данных, что защищает разработчика от нижележащего промежуточного программного обеспечения сообщений.

  • Binder: компонент, интегрированный с внешним промежуточным ПО для создания привязки.Каждое промежуточное ПО для сообщений имеет собственную реализацию Binder.

НапримерKafkaреализацияKafkaMessageChannelBinder,RabbitMQреализацияRabbitMessageChannelBinderтак же какRocketMQреализацияRocketMQMessageChannelBinder.

  • Сообщение: это модуль в Spring Framework, и его ролью является программная модель унифицированного обмена сообщениями.

Например, модель, соответствующая обмену сообщениями, включает в себя полезную нагрузку тела сообщения и заголовок заголовка сообщения.

официальный сайт spring-cloud-stream

Window собирает и развертывает RocketMQ

скачать

Текущая последняя версия 4.6.0

Загрузите его и распакуйте в: каталог D:rocketmq, каталог не должен содержать пробелов и слишком глубок, иначе работа службы может сообщить об ошибке

Запустите службу NameServer

Перед запуском необходимо настроить системную среду, иначе будет сообщено об ошибке.

Please set the ROCKETMQ_HOME variable in your environment! 

Имя переменной системной среды: ROCKETMQ_HOME

Настройте переменные среды в соответствии с каталогом, который вы распаковали.Например, мое значение переменной: D:rocketmq

Войдите в командное окно окна, войдите в каталог D:rocketmqbin и выполните

start mqnamesrv.cmd

Как и выше, NameServer успешно запускается. Во время использования окно не должно быть закрыто.

Запустить брокерскую службу

Войдите в каталог bin и введите

start mqbroker.cmd -n localhost:9876

Вышеупомянутый ip+port — это адрес службы и порт RocketMQ.

Запустить от имени, вы можете сообщить о следующей ошибке. Не удается найти или загрузить основной класс

В этом случае откройте bin-->runbroker.cmd и измените %CLASSPATH% на «%CLASSPATH%».

Сохраните и снова выполните приведенную выше команду. После успешного выполнения подсказка об успешной загрузке означает успех.

Пример

В этом примере реализованы три вида публикации сообщений и получения подписки.

Создайте производителя сообщений RocketMQ

Создайте проект ali-rocketmq-producer, порт: 28081.

  • POM.xml Добавить зависимость
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">

    <parent>
        <artifactId>cloud-alibaba</artifactId>
        <groupId>com.easy</groupId>
        <version>1.0.0</version>
    </parent>

    <modelVersion>4.0.0</modelVersion>
    <artifactId>ali-rocketmq-producer</artifactId>
    <packaging>jar</packaging>

    <dependencies>

        <!--rocketmq依赖-->
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
        </dependency>

        <!--web依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

  • Настройте информацию о привязке вывода и сотрудничайте с@EnableBindingАннотируйте, чтобы заставить его работать

конфигурация application.yml

server:
  port: 28081

spring:
  application:
    name: ali-rocketmq-producer
  cloud:
    stream:
      rocketmq:
        binder:
          # RocketMQ 服务器地址
          name-server: 127.0.0.1:9876
      bindings:
        output1: {destination: test-topic1, content-type: application/json}
        output2: {destination: test-topic2, content-type: application/json}

management:
  endpoints:
    web:
      exposure:
        include: '*'
  endpoint:
    health:
      show-details: always

ArProduceApplication.java

@SpringBootApplication
@EnableBinding({MySource.class})
public class ArProduceApplication {

    public static void main(String[] args) {
        SpringApplication.run(ArProduceApplication.class, args);
    }
}

  • служба производителя сообщений

MySource.java

package com.easy.arProduce;

import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;

public interface MySource {

    @Output("output1")
    MessageChannel output1();

    @Output("output2")
    MessageChannel output2();
}

SenderService.java

package com.easy.arProduce;

import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import org.springframework.util.MimeTypeUtils;

@Service
public class SenderService {

    @Autowired
    private MySource source;

    /**
     * 发送字符串
     *
     * @param msg
     */
    public void send(String msg) {
        Message message = MessageBuilder.withPayload(msg)
                .build();
        source.output1().send(message);
    }

    /**
     * 发送带tag的字符串
     *
     * @param msg
     * @param tag
     */
    public void sendWithTags(String msg, String tag) {
        Message message = MessageBuilder.withPayload(msg)
                .setHeader(RocketMQHeaders.TAGS, tag)
                .build();
        source.output1().send(message);
    }

    /**
     * 发送对象
     *
     * @param msg
     * @param tag
     * @param <T>
     */
    public <T> void sendObject(T msg, String tag) {
        Message message = MessageBuilder.withPayload(msg)
                .setHeader(RocketMQHeaders.TAGS, tag)
                .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
                .build();
        source.output2().send(message);
    }
}

Напишите контроллер TestController.java для облегчения тестирования.

package com.easy.arProduce;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping(value = "test")
public class TestController {
    @Autowired
    SenderService senderService;

    @RequestMapping(value = "/send", method = RequestMethod.GET)
    public String send(String msg) {
        senderService.send(msg);
        return "字符串消息发送成功!";
    }

    @RequestMapping(value = "/sendWithTags", method = RequestMethod.GET)
    public String sendWithTags(String msg) {
        senderService.sendWithTags(msg, "tagStr");
        return "带tag字符串消息发送成功!";
    }

    @RequestMapping(value = "/sendObject", method = RequestMethod.GET)
    public String sendObject(int index) {
        senderService.sendObject(new Foo(index, "foo"), "tagObj");
        return "Object对象消息发送成功!";
    }
}

Создайте потребителя сообщений RocketMQ

Создайте проект ali-rocketmq-consumer, порт: 28082.

  • pom.xml добавить зависимости
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">

    <parent>
        <artifactId>cloud-alibaba</artifactId>
        <groupId>com.easy</groupId>
        <version>1.0.0</version>
    </parent>

    <modelVersion>4.0.0</modelVersion>

    <artifactId>ali-rocketmq-consumer</artifactId>
    <packaging>jar</packaging>

    <dependencies>
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

- Настройте информацию о привязке ввода и сотрудничайте с@EnableBindingАннотируйте, чтобы заставить его работать

конфигурация application.yml

server:
  port: 28082

spring:
  application:
    name: ali-rocketmq-consumer
  cloud:
    stream:
      rocketmq:
        binder:
          name-server: 127.0.0.1:9876 #rocketmq 服务地址
        bindings:
          input1: {consumer.orderly: true}  #是否排序
          input2: {consumer.tags: tagStr}   #订阅 带tag值为tagStr的字符串
          input3: {consumer.tags: tagObj}   #订阅 带tag值为tabObj的字符串
      bindings:
        input1: {destination: test-topic1, content-type: text/plain, group: test-group1, consumer.maxAttempts: 1}
        input2: {destination: test-topic1, content-type: application/plain, group: test-group2, consumer.maxAttempts: 1}
        input3: {destination: test-topic2, content-type: application/plain, group: test-group3, consumer.maxAttempts: 1}

management:
  endpoints:
    web:
      exposure:
        include: '*'
  endpoint:
    health:
      show-details: always

ArConsumerApplication.java

package com.easy.arConsumer;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;

@SpringBootApplication
@EnableBinding({MySource.class})
public class ArConsumerApplication {

    public static void main(String[] args) {
        SpringApplication.run(ArConsumerApplication.class, args);
    }
}

  • служба обработки сообщений

MySource.java

package com.easy.arConsumer;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;

public interface MySource {
    @Input("input1")
    SubscribableChannel input1();

    @Input("input2")
    SubscribableChannel input2();

    @Input("input3")
    SubscribableChannel input3();
}

ReceiveService.java

package com.easy.arConsumer;

import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;

@Service
@Slf4j
public class ReceiveService {

    @StreamListener("input1")
    public void receiveInput1(String receiveMsg) {
        log.info("input1 接收到了消息:" + receiveMsg);
    }

    @StreamListener("input2")
    public void receiveInput2(String receiveMsg) {
        log.info("input2 接收到了消息:" + receiveMsg);
    }

    @StreamListener("input3")
    public void receiveInput3(@Payload Foo foo) {
        log.info("input3 接收到了消息:" + foo);
    }
}

Пример использования

Примеры связанных проектов

Для этого примера мы создали две реализации проекта

  • ali-rocketmq-producer: Производитель службы сообщений RocketMQ, имя службы: ali-rocketmq-producer, порт: 28081
  • ali-rocketmq-consumer: потребитель службы сообщений RocketMQ, имя службы: ali-rocketmq-producer, порт: 28082

Запустите пробные тесты

Сначала запустите службу ali-rocketmq-producer и службу ali-rocketmq-consumer.

  • Получите доступ к адресу производителя службы сообщений: http://localhost:28081/test/send?msg=yuntian.

Глядя на консоль потребителя службы, вывод

2019-12-04 15:37:47.859  INFO 6356 --- [MessageThread_1] com.easy.arConsumer.ReceiveService       : input1 接收到了消息:yuntian
2019-12-04 15:37:47.859  INFO 6356 --- [MessageThread_1] s.b.r.c.RocketMQListenerBindingContainer : consume C0A8096E200818B4AAC212CDA70E0014 cost: 1 ms

Указывает, что потребление строки было успешно использовано input1

  • Доступ к адресу производителя службы сообщений: http://localhost:28081/test/sendWithTags?msg=tagyuntian

Глядя на консоль потребителя службы, вывод

2019-12-04 15:38:09.586  INFO 6356 --- [MessageThread_1] com.easy.arConsumer.ReceiveService       : input2 接收到了消息:tagyuntian
2019-12-04 15:38:09.592  INFO 6356 --- [MessageThread_1] com.easy.arConsumer.ReceiveService       : input1 接收到了消息:tagyuntian
2019-12-04 15:38:09.592  INFO 6356 --- [MessageThread_1] s.b.r.c.RocketMQListenerBindingContainer : consume C0A8096E200818B4AAC212CDFCD30015 cost: 6 ms

Указывает, что строка с тегом была успешно использована input2 и input1, так как input1 также подписался на test-topic1, и мы не добавляли фильтрацию тегов, по умолчанию это означает, что все сообщения получены, поэтому строка tagyuntian также может быть успешно получена

  • Получите доступ к адресу производителя службы сообщений: http://localhost:28081/test/sendObject?index=1.

Глядя на консоль потребителя службы, вывод

2019-12-04 15:41:15.285  INFO 6356 --- [MessageThread_1] com.easy.arConsumer.ReceiveService       : input3 接收到了消息:Foo{id=1, bar='foo'}

Указывает, что input3 успешно получил объектное сообщение с тегом и tagObj, но input1 не выдал сообщение. Это связано с тем, что сообщение, опубликованное sendObject, проходит через конвейер сообщений test-topic2, поэтому оно не будет опубликовано для подписчиков input1 и input2.

материал

Spring Boot, проект облачного обучения