Обзор
Когда я пошел в Netease на собеседование, интервьюер задал мне вопрос, сказав:
После оформления заказа, если пользователь не оплатил и ему необходимо отменить заказ, что я могу сделать?
Мой ответ в то время состоял в том, чтобы сканировать таблицу БД с помощью запланированной задачи. Интервьюер был не очень доволен и предложил:
Невозможно добиться уведомления квазиреального времени с временными задачами, есть ли другой способ?
Мой ответ в то время был:
Вы можете использовать очередь.После того, как заказ размещен, отправьте сообщение в очередь и укажите время истечения срока действия.Когда время истечет, интерфейс обратного вызова выполняется.
Выслушав вопросы интервью, не задавайте их снова. На самом деле, мои мысли в то время были правильными, но я был не очень профессионален. Профессиональная поговорка заключается в использовании延迟消息
.
На самом деле, есть некоторые проблемы с использованием временных задач.Первоначальная бизнес-система рассчитывала, что через 10 минут, если заказ не будет оплачен, заказ будет немедленно отменен, а товарный запас будет освобожден. Однако, как только объем данных будет большим, время получения данных о неоплаченных заказах будет увеличено, а некоторые заказы не будут отменены через 10 минут, может быть, 15 минут, 20 минут и т. д. В этом случае инвентарь не будет выпущен вовремя, и это повлияет на нечетное количество. С отложенным сообщением теоретически возможно отменить заказ в соответствии с установленным временем.
В настоящее время большинство статей в Интернете об использовании RabbitMQ для реализации отложенных сообщений посвящены тому, как использовать очередь недоставленных сообщений RabbitMQ для ее реализации.Схема реализации кажется громоздкой и сложной, и она по-прежнему реализуется с использованием оригинального клиента RabbitMQ. API, который еще более подробный.
Spring Boot упаковал клиентский API RabbitMQ, который намного проще в использовании.rabbitmq_delayed_message_exchange
Плагины и Spring Boot для реализации отложенных сообщений.
подготовка программного обеспечения
erlang
Пожалуйста, обратитесь кУстановить erlang под Win10
Версия, используемая в этой статье:
Erlang 20.3
RabbitMQ
Пожалуйста, обратитесь кУстановить rabbitmq под win10
В этой статье используетсяwindow
Версия RabbitMQ, номер версии:
3.7.4
плагин rabbitmq_delayed_message_exchange
Адрес загрузки плагина:
После открытия URL-адреса, ctrl + f, поискrabbitmq_delayed_message_exchange
.
Помните, что вы должны выбрать номер версии.Поскольку я использую RabbitMQ 3.7.4, соответствующийrabbitmq_delayed_message_exchange
Плагины также должны выбирать 3.7.x.
Если вы не выберете правильную версию, вы столкнетесь со всевозможными странными проблемами при использовании отложенных сообщений, и вы не сможете найти решение в Интернете. Всю ночь бился с этой проблемой. Не забудьте выбрать правильную версию плагина.
После загрузки плагина поместите его в каталог установки RabbitMQ.plugins
каталог и запустите плагин с помощью следующей команды:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
В случае успешного запуска появится следующее сообщение:
The following plugins have been enabled: rabbitmq_delayed_message_exchange
После успешного запуска плагина не забудьте перезапустить RabbitMQ, чтобы он вступил в силу.
Интеграция RabbitMQ
Это очень просто, добавьте его прямо в файл pom.xml проекта maven.
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
Версия Spring Boot, которую я использую,2.0.1.RELEASE
.
следующий вapplication.properties
Добавьте конфигурацию Redis в файл:
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
Определите ConnectionFactory и RabbitTemplate
Это также очень просто, код выглядит следующим образом:
package com.mq.rabbitmq;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@ConfigurationProperties(prefix = "spring.rabbitmq")
public class RabbitMqConfig {
private String host;
private int port;
private String userName;
private String password;
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(host,port);
cachingConnectionFactory.setUsername(userName);
cachingConnectionFactory.setPassword(password);
cachingConnectionFactory.setVirtualHost("/");
cachingConnectionFactory.setPublisherConfirms(true);
return cachingConnectionFactory;
}
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
return rabbitTemplate;
}
public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
}
public String getUserName() {
return userName;
}
public void setUserName(String userName) {
this.userName = userName;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
}
Конфигурация Exchange и очереди
package com.mq.rabbitmq;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class QueueConfig {
@Bean
public CustomExchange delayExchange() {
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
return new CustomExchange("test_exchange", "x-delayed-message",true, false,args);
}
@Bean
public Queue queue() {
Queue queue = new Queue("test_queue_1", true);
return queue;
}
@Bean
public Binding binding() {
return BindingBuilder.bind(queue()).to(delayExchange()).with("test_queue_1").noargs();
}
}
Здесь важно отметить, что использованиеCustomExchange
,нетDirectExchange
,Кроме тогоCustomExchange
Тип должен бытьx-delayed-message
.
реализовать отправку сообщений
package com.mq.rabbitmq;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.text.SimpleDateFormat;
import java.util.Date;
@Service
public class MessageServiceImpl {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMsg(String queueName,String msg) {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
System.out.println("消息发送时间:"+sdf.format(new Date()));
rabbitTemplate.convertAndSend("test_exchange", queueName, msg, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setHeader("x-delay",3000);
return message;
}
});
}
}
Обратите внимание, что при отправке необходимо добавить заголовок
x-delay
Время задержки, которое я установил здесь, составляет 3 секунды.
потребитель сообщений
package com.mq.rabbitmq;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.text.SimpleDateFormat;
import java.util.Date;
@Component
public class MessageReceiver {
@RabbitListener(queues = "test_queue_1")
public void receive(String msg) {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
System.out.println("消息接收时间:"+sdf.format(new Date()));
System.out.println("接收到的消息:"+msg);
}
}
Запуск программ Spring Boot и отправка сообщений
Запустите программу Spring Boot непосредственно в основном методе, и Spring Boot автоматически разрешит ее.MessageReceiver
Категория.
Далее вам нужно использовать Junit только для запуска интерфейса для отправки сообщений.
package com.mq.rabbitmq;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitmqApplicationTests {
@Autowired
private MessageServiceImpl messageService;
@Test
public void send() {
messageService.sendMsg("test_queue_1","hello i am delay msg");
}
}
После запуска вы можете увидеть следующую информацию:
Время отправки сообщения: 2018-05-03 12:44:53
Через 3 секунды консоль Spring Boot выводит:
Время приема сообщения: 2018-05-03 12:44:56 Сообщение получено: привет, я задерживаю сообщение