Полная версия реализации сообщения о задержке Spring Boot RabbitMQ

RabbitMQ

Обзор


Когда я пошел в Netease на собеседование, интервьюер задал мне вопрос, сказав:

После оформления заказа, если пользователь не оплатил и ему необходимо отменить заказ, что я могу сделать?

Мой ответ в то время состоял в том, чтобы сканировать таблицу БД с помощью запланированной задачи. Интервьюер был не очень доволен и предложил:

Невозможно добиться уведомления квазиреального времени с временными задачами, есть ли другой способ?

Мой ответ в то время был:

Вы можете использовать очередь.После того, как заказ размещен, отправьте сообщение в очередь и укажите время истечения срока действия.Когда время истечет, интерфейс обратного вызова выполняется.

Выслушав вопросы интервью, не задавайте их снова. На самом деле, мои мысли в то время были правильными, но я был не очень профессионален. Профессиональная поговорка заключается в использовании延迟消息.

На самом деле, есть некоторые проблемы с использованием временных задач.Первоначальная бизнес-система рассчитывала, что через 10 минут, если заказ не будет оплачен, заказ будет немедленно отменен, а товарный запас будет освобожден. Однако, как только объем данных будет большим, время получения данных о неоплаченных заказах будет увеличено, а некоторые заказы не будут отменены через 10 минут, может быть, 15 минут, 20 минут и т. д. В этом случае инвентарь не будет выпущен вовремя, и это повлияет на нечетное количество. С отложенным сообщением теоретически возможно отменить заказ в соответствии с установленным временем.

В настоящее время большинство статей в Интернете об использовании RabbitMQ для реализации отложенных сообщений посвящены тому, как использовать очередь недоставленных сообщений RabbitMQ для ее реализации.Схема реализации кажется громоздкой и сложной, и она по-прежнему реализуется с использованием оригинального клиента RabbitMQ. API, который еще более подробный.

Spring Boot упаковал клиентский API RabbitMQ, который намного проще в использовании.rabbitmq_delayed_message_exchangeПлагины и Spring Boot для реализации отложенных сообщений.


подготовка программного обеспечения


erlang

Пожалуйста, обратитесь кУстановить erlang под Win10

Версия, используемая в этой статье:

Erlang 20.3


RabbitMQ

Пожалуйста, обратитесь кУстановить rabbitmq под win10

В этой статье используетсяwindowВерсия RabbitMQ, номер версии:

3.7.4


плагин rabbitmq_delayed_message_exchange

Адрес загрузки плагина:

woohoo.rabbitcurrent.com/community-afraid…

После открытия URL-адреса, ctrl + f, поискrabbitmq_delayed_message_exchange.
这里写图片描述

Помните, что вы должны выбрать номер версии.Поскольку я использую RabbitMQ 3.7.4, соответствующийrabbitmq_delayed_message_exchangeПлагины также должны выбирать 3.7.x.

Если вы не выберете правильную версию, вы столкнетесь со всевозможными странными проблемами при использовании отложенных сообщений, и вы не сможете найти решение в Интернете. Всю ночь бился с этой проблемой. Не забудьте выбрать правильную версию плагина.

После загрузки плагина поместите его в каталог установки RabbitMQ.pluginsкаталог и запустите плагин с помощью следующей команды:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

В случае успешного запуска появится следующее сообщение:

The following plugins have been enabled: rabbitmq_delayed_message_exchange

После успешного запуска плагина не забудьте перезапустить RabbitMQ, чтобы он вступил в силу.


Интеграция RabbitMQ


Это очень просто, добавьте его прямо в файл pom.xml проекта maven.

<dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

Версия Spring Boot, которую я использую,2.0.1.RELEASE.

следующий вapplication.propertiesДобавьте конфигурацию Redis в файл:

spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

Определите ConnectionFactory и RabbitTemplate


Это также очень просто, код выглядит следующим образом:

package com.mq.rabbitmq;

import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@ConfigurationProperties(prefix = "spring.rabbitmq")
public class RabbitMqConfig {
    private String host;
    private int port;
    private String userName;
    private String password;

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(host,port);
        cachingConnectionFactory.setUsername(userName);
        cachingConnectionFactory.setPassword(password);
        cachingConnectionFactory.setVirtualHost("/");
        cachingConnectionFactory.setPublisherConfirms(true);
        return cachingConnectionFactory;
    }

    @Bean
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
        return rabbitTemplate;
    }

    public String getHost() {
        return host;
    }

    public void setHost(String host) {
        this.host = host;
    }

    public int getPort() {
        return port;
    }

    public void setPort(int port) {
        this.port = port;
    }

    public String getUserName() {
        return userName;
    }

    public void setUserName(String userName) {
        this.userName = userName;
    }

    public String getPassword() {
        return password;
    }

    public void setPassword(String password) {
        this.password = password;
    }
}

Конфигурация Exchange и очереди


package com.mq.rabbitmq;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class QueueConfig {

    @Bean
    public CustomExchange delayExchange() {
        Map<String, Object> args = new HashMap<>();
        args.put("x-delayed-type", "direct");
        return new CustomExchange("test_exchange", "x-delayed-message",true, false,args);
    }

    @Bean
    public Queue queue() {
        Queue queue = new Queue("test_queue_1", true);
        return queue;
    }

    @Bean
    public Binding binding() {
        return BindingBuilder.bind(queue()).to(delayExchange()).with("test_queue_1").noargs();
    }
}

Здесь важно отметить, что использованиеCustomExchange,нетDirectExchange,Кроме тогоCustomExchangeТип должен бытьx-delayed-message.


реализовать отправку сообщений


package com.mq.rabbitmq;

import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.text.SimpleDateFormat;
import java.util.Date;

@Service
public class MessageServiceImpl {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendMsg(String queueName,String msg) {
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        System.out.println("消息发送时间:"+sdf.format(new Date()));
        rabbitTemplate.convertAndSend("test_exchange", queueName, msg, new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                message.getMessageProperties().setHeader("x-delay",3000);
                return message;
            }
        });
    }
}

Обратите внимание, что при отправке необходимо добавить заголовок

x-delay

Время задержки, которое я установил здесь, составляет 3 секунды.


потребитель сообщений


package com.mq.rabbitmq;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.text.SimpleDateFormat;
import java.util.Date;

@Component
public class MessageReceiver {

    @RabbitListener(queues = "test_queue_1")
    public void receive(String msg) {
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        System.out.println("消息接收时间:"+sdf.format(new Date()));
        System.out.println("接收到的消息:"+msg);
    }
}

Запуск программ Spring Boot и отправка сообщений


Запустите программу Spring Boot непосредственно в основном методе, и Spring Boot автоматически разрешит ее.MessageReceiverКатегория.

Далее вам нужно использовать Junit только для запуска интерфейса для отправки сообщений.

package com.mq.rabbitmq;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitmqApplicationTests {

    @Autowired
    private MessageServiceImpl messageService;

    @Test
    public void send() {
        messageService.sendMsg("test_queue_1","hello i am delay msg");
    }

}

После запуска вы можете увидеть следующую информацию:

Время отправки сообщения: 2018-05-03 12:44:53

Через 3 секунды консоль Spring Boot выводит:

Время приема сообщения: 2018-05-03 12:44:56 Сообщение получено: привет, я задерживаю сообщение