MQ интегрирует SpringBoot и оптимизацию очереди задержки | Серия RabbitMQ (11)

задняя часть RabbitMQ
MQ интегрирует SpringBoot и оптимизацию очереди задержки | Серия RabbitMQ (11)

Это 16-й день моего участия в августовском испытании обновлений. Узнайте подробности события:Испытание августовского обновления


Статьи по Теме

Краткое изложение серии RabbitMQ:Серия RabbitMQ


предисловие

  • Создайте новый проект SpringBoot, обратитесь к статье за ​​конкретными методами.Два способа создания проекта SpringBoot

  • Структура каталогов выглядит следующим образом

    • image-20210806182220682.png
  • Диаграмма архитектуры кода

  • image-20210806182248690.png

  • Создайте две очереди QA и QB с TTL, установленными на 10S и 40S соответственно.

  • Затем создайте коммутатор X и коммутатор Y с недоставленными буквами, оба из которых относятся к прямому типу.

  • Создайте очередь недоставленных сообщений QD.

1. Очередь задержки

  • банка для импорта помпонов

    • 	<!--RabbitMQ 依赖-->
             <dependency>
                 <groupId>org.springframework.boot</groupId>
                 <artifactId>spring-boot-starter-amqp</artifactId>
             </dependency>
             <dependency>
                 <groupId>org.springframework.boot</groupId>
                 <artifactId>spring-boot-starter-web</artifactId>
             </dependency>
             <dependency>
                 <groupId>com.alibaba</groupId>
                 <artifactId>fastjson</artifactId>
                 <version>1.2.47</version>
             </dependency>
             <dependency>
                 <groupId>org.projectlombok</groupId>
                 <artifactId>lombok</artifactId>
             </dependency>
             <!--swagger-->
             <dependency>
                 <groupId>io.springfox</groupId>
                 <artifactId>springfox-swagger2</artifactId>
                 <version>2.9.2</version>
             </dependency>
             <dependency>
                 <groupId>io.springfox</groupId>
                 <artifactId>springfox-swagger-ui</artifactId>
                 <version>2.9.2</version>
             </dependency>
             <!--RabbitMQ 测试依赖-->
             <dependency>
                 <groupId>org.springframework.amqp</groupId>
                 <artifactId>spring-rabbit-test</artifactId>
                 <scope>test</scope>
             </dependency>
      
  • Измените суффикс application.proprties наyml

    • spring:
        rabbitmq:
          host: IP地址
          port: 5672
          username: admin
          password: 111111
      
  • Добавить класс конфигурации Swagger

    • import org.springframework.context.annotation.Bean;
      import org.springframework.context.annotation.Configuration;
      import springfox.documentation.builders.ApiInfoBuilder;
      import springfox.documentation.service.ApiInfo;
      import springfox.documentation.service.Contact;
      import springfox.documentation.spi.DocumentationType;
      import springfox.documentation.spring.web.plugins.Docket;
      import springfox.documentation.swagger2.annotations.EnableSwagger2;
      
      /**
       * Swagger配置类
       * @author DingYongJun
       * @date 2021/8/6
       */
      @Configuration
      @EnableSwagger2
      public class SwaggerConfig {
          @Bean
          public Docket webApiConfig() {
              return new Docket(DocumentationType.SWAGGER_2)
                      .groupName("webApi")
                      .apiInfo(webApiInfo())
                      .select()
                      .build();
          }
      
          private ApiInfo webApiInfo() {
              return new ApiInfoBuilder()
                      .title("rabbitmq 接口文档")
                      .description("本文档描述了 rabbitmq 微服务接口定义")
                      .version("1.0")
                      .contact(new Contact("dayu", "https://juejin.cn/user/2084329779387864/posts",
                              "773530472@qq.com"))
                      .build();
          }
      }
      
  • MQ config

    • import org.springframework.amqp.core.*;
      import org.springframework.beans.factory.annotation.Qualifier;
      import org.springframework.context.annotation.Bean;
      import org.springframework.context.annotation.Configuration;
      
      import java.util.HashMap;
      import java.util.Map;
      
      /**
       * MQ配置类
       * @author DingYongJun
       * @date 2021/8/6
       */
      @Configuration
      public class TtlQueueConfig {
          public static final String X_EXCHANGE = "X";
          public static final String QUEUE_A = "QA";
          public static final String QUEUE_B = "QB";
          public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
          public static final String DEAD_LETTER_QUEUE = "QD";
      
          // 声明 xExchange
          @Bean("xExchange")
          public DirectExchange xExchange() {
              return new DirectExchange(X_EXCHANGE);
          }
      
          // 声明 xExchange
          @Bean("yExchange")
          public DirectExchange yExchange() {
              return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
          }
      
          //声明队列 A ttl 为 10s 并绑定到对应的死信交换机
          @Bean("queueA")
          public Queue queueA() {
              Map<String, Object> args = new HashMap<>(3);
              //声明当前队列绑定的死信交换机
              args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
              //声明当前队列的死信路由 key
              args.put("x-dead-letter-routing-key", "YD");
              //声明队列的 TTL
              args.put("x-message-ttl", 10000);
              return QueueBuilder.durable(QUEUE_A).withArguments(args).build();
          }
      
          // 声明队列 A 绑定 X 交换机
          @Bean
          public Binding queueaBindingX(@Qualifier("queueA") Queue queueA,
                                        @Qualifier("xExchange") DirectExchange xExchange) {
              return BindingBuilder.bind(queueA).to(xExchange).with("XA");
          }
      
          //声明队列 B ttl 为 40s 并绑定到对应的死信交换机
          @Bean("queueB")
          public Queue queueB() {
              Map<String, Object> args = new HashMap<>(3);
              //声明当前队列绑定的死信交换机
              args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
              //声明当前队列的死信路由 key
              args.put("x-dead-letter-routing-key", "YD");
              //声明队列的 TTL
              args.put("x-message-ttl", 40000);
              return QueueBuilder.durable(QUEUE_B).withArguments(args).build();
          }
      
          //声明队列 B 绑定 X 交换机
          @Bean
          public Binding queuebBindingX(@Qualifier("queueB") Queue queue1B,
                                        @Qualifier("xExchange") DirectExchange xExchange) {
              return BindingBuilder.bind(queue1B).to(xExchange).with("XB");
          }
      
          //声明死信队列 QD
          @Bean("queueD")
          public Queue queueD() {
              return new Queue(DEAD_LETTER_QUEUE);
          }
      
          //声明死信队列 QD 绑定关系
          @Bean
          public Binding deadLetterBindingQAD(@Qualifier("queueD") Queue queueD,
                                              @Qualifier("yExchange") DirectExchange yExchange) {
              return BindingBuilder.bind(queueD).to(yExchange).with("YD");
          }
      }
      
  • Контроллер имитирует производителя

    • import lombok.extern.slf4j.Slf4j;
      import org.springframework.amqp.rabbit.core.RabbitTemplate;
      import org.springframework.beans.factory.annotation.Autowired;
      import org.springframework.web.bind.annotation.GetMapping;
      import org.springframework.web.bind.annotation.PathVariable;
      import org.springframework.web.bind.annotation.RequestMapping;
      import org.springframework.web.bind.annotation.RestController;
      
      import java.util.Date;
      
      /**
       * controller模拟生产者
       * @author DingYongJun
       * @date 2021/8/6
       */
      @Slf4j
      @RequestMapping("ttl")
      @RestController
      public class SendMsgController {
          @Autowired
          private RabbitTemplate rabbitTemplate;
      
          @GetMapping("sendMsg/{message}")
          public void sendMsg(@PathVariable String message) {
              log.info("当前时间:{},发送一条信息给两个 TTL 队列:{}", new Date(), message);
              rabbitTemplate.convertAndSend("X", "XA", "消息来自 ttl 为 10S 的队列: " + message);
              rabbitTemplate.convertAndSend("X", "XB", "消息来自 ttl 为 40S 的队列: " + message);
          }
      }
      
  • потребитель

    • import com.rabbitmq.client.Channel;
      import lombok.extern.slf4j.Slf4j;
      import org.springframework.amqp.core.Message;
      import org.springframework.amqp.rabbit.annotation.RabbitListener;
      import org.springframework.stereotype.Component;
      
      import java.io.IOException;
      import java.util.Date;
      
      /**
       * 消费者
       * @author DingYongJun
       * @date 2021/8/6
       */
      @Slf4j
      @Component
      public class DeadLetterQueueConsumer {
          @RabbitListener(queues = "QD")
          public void receiveD(Message message, Channel channel) throws IOException {
              String msg = new String(message.getBody());
              log.info("当前时间:{},收到死信队列信息{}", new Date().toString(), msg);
          }
      }
      
  • Что, после интеграции SpringBoot стало намного проще?

  • Готово, приступаем к проекту, входим в браузер

    • http://localhost:8080/ttl/sendMsg/你好呀大鱼
      
  • Результаты

    • image-20210806183209464.png
    • Рассчитайте время, первый временной интервал равен 10 с.
    • Второй бар 40s.
    • Идеально соответствует нашим ожиданиям по дизайну!
    • Первое сообщение становится недоставленным сообщением через 10 с, а затем потребляется потребителями.Второе сообщение становится недоставленным сообщением через 40 с, а затем потребляется.Таким образом создается очередь с задержкой.

Во-вторых, оптимизировать очередь задержки

  • В соответствии с приведенным выше дизайном, что, если нам теперь нужно добавить сообщение с 50-секундной задержкой?

  • Может быть, мы добавляем еще одну конфигурацию MQ? Это слишком неразумно!

  • Если необходимо забронировать конференц-зал, не нужно ли добавлять бесчисленные очереди?

  • Итак, нам нужно оптимизировать приведенную выше структуру ниже!

  • Диаграмма архитектуры кода

    • image-20210806183552128.png
  • Добавлен конфиг MQ.

    • import org.springframework.amqp.core.*;
      import org.springframework.beans.factory.annotation.Qualifier;
      import org.springframework.context.annotation.Bean;
      import org.springframework.stereotype.Component;
      
      import java.util.HashMap;
      import java.util.Map;
      /**
       * 新的MQ配置类
       * @author DingYongJun
       * @date 2021/8/6
       */
      @Component
      public class MsgTtlQueueConfig {
          public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
          public static final String QUEUE_C = "QC";
      
          //声明队列 C 死信交换机
          @Bean("queueC")
          public Queue queueB() {
              Map<String, Object> args = new HashMap<>(3);
              //声明当前队列绑定的死信交换机
              args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
              //声明当前队列的死信路由 key
              args.put("x-dead-letter-routing-key", "YD");
              //没有声明 TTL 属性
              return QueueBuilder.durable(QUEUE_C).withArguments(args).build();
          }
      
          //声明队列 B 绑定 X 交换机
          @Bean
          public Binding queuecBindingX(@Qualifier("queueC") Queue queueC,
                                        @Qualifier("xExchange") DirectExchange xExchange) {
              return BindingBuilder.bind(queueC).to(xExchange).with("XC");
          }
      }
      
  • controller

    •     @RequestMapping(value = "sendExpirationMsg", method = RequestMethod.GET)
          public void sendMsg(@RequestParam Map<String,Object> parmsMap) {
              String message = parmsMap.get("message").toString();
              String ttlTime = parmsMap.get("ttlTime").toString();
              rabbitTemplate.convertAndSend("X", "XC", message, correlationData ->{
                  correlationData.getMessageProperties().setExpiration(ttlTime);
                  return correlationData;
              });
              log.info("当前时间:{},发送一条时长{}毫秒 TTL 信息给队列 C:{}", new Date(),ttlTime, message);
          }
      
    • Здесь мы используем Map для приема параметров, ведь Map непобедим и прост в использовании~

  • ввод в браузере

    • http://localhost:8080/ttl/sendExpirationMsg?message=你好呀大鱼先生&ttlTime=50000
      http://localhost:8080/ttl/sendExpirationMsg?message=你好呀大鱼先生&ttlTime=5000
      
  • Результаты

    • image-20210806192458906.png
    • Полностью соответствуют требованиям! Время истечения индивидуально! Это интеллект!

3. Плагин MQ реализует очередь задержки

  • Вышеупомянутая функция, кажется, в порядке, верно?

  • Но нам нужно знать, что MQ будет проверять, не просрочено ли первое сообщение, только если первое сообщение задерживается на долгое время.

  • Это приведет к тому, что наше второе сообщение будет выполнено первым без ответа.

  • Потому что очередь в порядке очереди, без проблем, верно?

  • видеть значит верить

  • image-20210806192926498.png

  • очевидно

    • Что нам нужно, так это второе 5-секундное предварительное потребление с задержкой, а затем 50-секундное сообщение с задержкой.
    • Фактически, MQ ожидает выполнения первого, прежде чем немедленно выполнить второй.
    • Вот почему это первый пришел, первый ушел.
  • Плагин MQ помогает нам решить эту проблему

  • Сначала скачайте плагинrabbitmq_delayed_message_exchange

  • Обязательно оптимистично оценивайте версию~ В противном случае будет сообщено об ошибке версии.

  • а потом залить на сервер/usr/lib/rabbitmq/lib/rabbitmq_server-3.7.18/pluginsПод содержанием

  • воплощать в жизньrabbitmq-plugins enable rabbitmq_delayed_message_exchangeУстановить

  • перезапустить MQsystemctl restart rabbitmq-server

  • Взгляните на нашу бэкэнд-страницу MQ.

  • image-20210806215903919.png

  • Появилсяx-delayed-messageТип переключателя , поздравляем с успешным включением плагина!

  • MQ config

    • import org.springframework.amqp.core.Binding;
      import org.springframework.amqp.core.BindingBuilder;
      import org.springframework.amqp.core.CustomExchange;
      import org.springframework.amqp.core.Queue;
      import org.springframework.beans.factory.annotation.Qualifier;
      import org.springframework.context.annotation.Bean;
      import org.springframework.context.annotation.Configuration;
      
      import java.util.HashMap;
      import java.util.Map;
      
      @Configuration
      public class DelayedQueueConfig {
          public static final String DELAYED_QUEUE_NAME = "delayed.queue";
          public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
          public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";
      
          @Bean
          public Queue delayedQueue() {
              return new Queue(DELAYED_QUEUE_NAME);
          }
      
          //自定义交换机 我们在这里定义的是一个延迟交换机
          @Bean
          public CustomExchange delayedExchange() {
              Map<String, Object> args = new HashMap<>();
              //自定义交换机的类型
              args.put("x-delayed-type", "direct");
              return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false,
                      args);
          }
      
          @Bean
          public Binding bindingDelayedQueue(@Qualifier("delayedQueue") Queue queue,
                                             @Qualifier("delayedExchange") CustomExchange
                                                     delayedExchange) {
              return
                      BindingBuilder.bind(queue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
          }
      }
      
  • controller

    •     public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
          public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";
      
          @GetMapping("sendDelayMsg/{message}/{delayTime}")
          public void sendMsg(@PathVariable String message, @PathVariable Integer delayTime) {
              rabbitTemplate.convertAndSend(DELAYED_EXCHANGE_NAME, DELAYED_ROUTING_KEY, message,
                      correlationData -> {
                          correlationData.getMessageProperties().setDelay(delayTime);
                          return correlationData;
                      });
              log.info(" 当 前 时 间 : {}, 发送一条延迟 {} 毫秒的信息给队列 delayed.queue:{}", new
                      Date(), delayTime, message);
          }
      
  • воплощать в жизнь

    • http://localhost:8080/ttl/sendDelayMsg/come on baby1/20000
      http://localhost:8080/ttl/sendDelayMsg/come on baby2/2000
      
  • Результаты

    • image-20210806220520327.png
  • решить! ! !


Впереди долгий путь, и я обязательно буду его искать вдоль и поперёк~

Если вы думаете, что я блогеры хорошо пишу! Писать нелегко, пожалуйста, ставьте лайки, подписывайтесь и комментируйте, чтобы поощрять блоггеров ~ хахах