Резюме
В этой статье в основном объясняется процесс интеграции RabbitMQ с mall для реализации отложенных сообщений, на примере отправки отложенных сообщений для отмены сверхурочных заказов. RabbitMQ — это широко используемая очередь сообщений с открытым исходным кодом. Он легкий и простой в развертывании, а также поддерживает несколько протоколов обмена сообщениями. RabbitMQ можно развернуть в распределенных и федеративных конфигурациях для удовлетворения потребностей в масштабируемости и высокой доступности.
Введение в структуру использования проекта
RabbitMQ
RabbitMQ — это широко используемая очередь сообщений с открытым исходным кодом. Он легкий и простой в развертывании, а также поддерживает несколько протоколов обмена сообщениями. RabbitMQ можно развернуть в распределенных и федеративных конфигурациях для удовлетворения потребностей в масштабируемости и высокой доступности.
Установка и использование RabbitMQ
- Установите Erlang, адрес загрузки:Erlang.org/download/OT…
- Установите RabbitMQ, адрес загрузки:Авторизуйтесь.bin Tray.com/rabbit present/ali…
- После завершения установки войдите в каталог sbin в каталоге установки RabbitMQ.
- Введите cmd в адресной строке и нажмите Enter, чтобы запустить командную строку, а затем введите следующую команду, чтобы запустить функцию управления:
rabbitmq-plugins enable rabbitmq_management
- Посетите адрес, чтобы убедиться, что установка прошла успешно:http://localhost:15672/
-
Введите пароль учетной записи и войдите в систему: гость гость
-
Создайте учетную запись и назначьте ей роль администратора: торговый центр торговый центр
- Создайте новый виртуальный хост как: /mall
- Щелкните пользователя торгового центра, чтобы перейти на страницу конфигурации пользователя.
- Настройте разрешения виртуального хоста для пользователя торгового центра.
- На этом установка и настройка RabbitMQ завершена.
Модель сообщений RabbitMQ
логотип | китайское имя | английское имя | описывать |
---|---|---|---|
P | режиссер | Producer | Отправитель сообщения, который может отправить сообщение на биржу |
C | потребитель | Consumer | Получатель сообщения получает сообщение из очереди для потребления |
X | выключатель | Exchange | Получить сообщение, отправленное производителем, и отправить его в указанную очередь в соответствии с ключом маршрутизации. |
Q | очередь | Queue | Хранить сообщения, отправленные с коммутатора |
type | тип переключателя | type | direct означает отправку сообщения напрямую в соответствии с ключом маршрутизации (оранжевый/черный) |
Lombok
Lombok добавляет очень интересные дополнительные функции в язык Java, вы больше не можете писать геттеры, сеттеры и другие методы для классов сущностей, вы можете иметь их через аннотацию.
Примечание. Вам необходимо установить подключаемый модуль Lombok от idea и добавить зависимости к файлу pom в проекте.
Описание бизнес-сценариев
Он используется для решения проблемы отмены заказа после того, как пользователь разместил заказ и время ожидания заказа истекло.
- Пользователь размещает заказ (будет серия операций по блокировке инвентаря продукта, использованию купонов и начислению баллов);
- Сгенерировать заказ и получить id заказа;
- Получить установленное время ожидания заказа (при условии, что установлено 60 минут без оплаты для отмены заказа);
- Отправьте сообщение о задержке в RabbitMQ в соответствии с временем ожидания заказа, чтобы оно запускало операцию отмены заказа после истечения времени ожидания заказа;
- Если пользователь не платит, отменить заказ (освободить заблокированный товарный запас, вернуть купоны, вернуть баллы и ряд операций).
Интегрируйте RabbitMQ для реализации отложенных сообщений
Добавьте связанные зависимости в pom.xml
<!--消息队列相关依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--lombok依赖-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
Измените файл конфигурации SpringBoot.
Измените файл application.yml и добавьте конфигурацию, связанную с Mongodb, в узел Spring.
rabbitmq:
host: localhost # rabbitmq的连接地址
port: 5672 # rabbitmq的连接端口号
virtual-host: /mall # rabbitmq的虚拟host
username: mall # rabbitmq的用户名
password: mall # rabbitmq的密码
publisher-confirms: true #如果对异步消息需要回调必须设置为true
Добавьте класс конфигурации перечисления очереди сообщений QueueEnum.
Определения констант для задержки очередей сообщений и обработки очередей сообщений об отмене заказа, включая имя обмена, имя очереди и имя ключа маршрутизации.
package com.macro.mall.tiny.dto;
import lombok.Getter;
/**
* 消息队列枚举配置
* Created by macro on 2018/9/14.
*/
@Getter
public enum QueueEnum {
/**
* 消息通知队列
*/
QUEUE_ORDER_CANCEL("mall.order.direct", "mall.order.cancel", "mall.order.cancel"),
/**
* 消息通知ttl队列
*/
QUEUE_TTL_ORDER_CANCEL("mall.order.direct.ttl", "mall.order.cancel.ttl", "mall.order.cancel.ttl");
/**
* 交换名称
*/
private String exchange;
/**
* 队列名称
*/
private String name;
/**
* 路由键
*/
private String routeKey;
QueueEnum(String exchange, String name, String routeKey) {
this.exchange = exchange;
this.name = name;
this.routeKey = routeKey;
}
}
Добавить конфигурацию RabbitMQ
Используется для настройки коммутаторов, очередей и отношений привязки между очередями и коммутаторами.
package com.macro.mall.tiny.config;
import com.macro.mall.tiny.dto.QueueEnum;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* 消息队列配置
* Created by macro on 2018/9/14.
*/
@Configuration
public class RabbitMqConfig {
/**
* 订单消息实际消费队列所绑定的交换机
*/
@Bean
DirectExchange orderDirect() {
return (DirectExchange) ExchangeBuilder
.directExchange(QueueEnum.QUEUE_ORDER_CANCEL.getExchange())
.durable(true)
.build();
}
/**
* 订单延迟队列队列所绑定的交换机
*/
@Bean
DirectExchange orderTtlDirect() {
return (DirectExchange) ExchangeBuilder
.directExchange(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getExchange())
.durable(true)
.build();
}
/**
* 订单实际消费队列
*/
@Bean
public Queue orderQueue() {
return new Queue(QueueEnum.QUEUE_ORDER_CANCEL.getName());
}
/**
* 订单延迟队列(死信队列)
*/
@Bean
public Queue orderTtlQueue() {
return QueueBuilder
.durable(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getName())
.withArgument("x-dead-letter-exchange", QueueEnum.QUEUE_ORDER_CANCEL.getExchange())//到期后转发的交换机
.withArgument("x-dead-letter-routing-key", QueueEnum.QUEUE_ORDER_CANCEL.getRouteKey())//到期后转发的路由键
.build();
}
/**
* 将订单队列绑定到交换机
*/
@Bean
Binding orderBinding(DirectExchange orderDirect,Queue orderQueue){
return BindingBuilder
.bind(orderQueue)
.to(orderDirect)
.with(QueueEnum.QUEUE_ORDER_CANCEL.getRouteKey());
}
/**
* 将订单延迟队列绑定到交换机
*/
@Bean
Binding orderTtlBinding(DirectExchange orderTtlDirect,Queue orderTtlQueue){
return BindingBuilder
.bind(orderTtlQueue)
.to(orderTtlDirect)
.with(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getRouteKey());
}
}
Следующие обмены и очереди можно увидеть на странице управления RabbitMQ.
Коммутатор и очередь Описание
- mall.order.direct (биржа, связанная с очередью сообщений об отмене заказа): привязанная очередь — mall.order.cancel. После отправки сообщения с mall.order.cancel в качестве ключа маршрутизации оно будет отправлено в эту очередь. .
- mall.order.direct.ttl (биржа, связанная с очередью сообщений о задержке заказа): связанная очередь — mall.order.cancel.ttl. После отправки сообщения с mall.order.cancel.ttl в качестве ключа маршрутизации будет Оно перенаправляется в эту очередь и хранится в этой очереди в течение определенного периода времени.По истечении времени ожидания сообщение будет автоматически отправлено в mall.order.cancel (очередь потребления сообщений об отмене заказа).
Добавить отправителя задержанного сообщения CancelOrderSender
Используется для отправки сообщений в очередь сообщений о задержке заказа (mall.order.cancel.ttl).
package com.macro.mall.tiny.component;
import com.macro.mall.tiny.dto.QueueEnum;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* 取消订单消息的发出者
* Created by macro on 2018/9/14.
*/
@Component
public class CancelOrderSender {
private static Logger LOGGER =LoggerFactory.getLogger(CancelOrderSender.class);
@Autowired
private AmqpTemplate amqpTemplate;
public void sendMessage(Long orderId,final long delayTimes){
//给延迟队列发送消息
amqpTemplate.convertAndSend(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getExchange(), QueueEnum.QUEUE_TTL_ORDER_CANCEL.getRouteKey(), orderId, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
//给消息设置延迟毫秒值
message.getMessageProperties().setExpiration(String.valueOf(delayTimes));
return message;
}
});
LOGGER.info("send delay message orderId:{}",orderId);
}
}
Добавьте CancelOrderReceiver в получатель сообщения об отмене заказа.
Используется для получения сообщений из очереди сообщений об отмененных заказах (mall.order.cancel).
package com.macro.mall.tiny.component;
import com.macro.mall.tiny.service.OmsPortalOrderService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* 取消订单消息的处理者
* Created by macro on 2018/9/14.
*/
@Component
@RabbitListener(queues = "mall.order.cancel")
public class CancelOrderReceiver {
private static Logger LOGGER =LoggerFactory.getLogger(CancelOrderReceiver.class);
@Autowired
private OmsPortalOrderService portalOrderService;
@RabbitHandler
public void handle(Long orderId){
LOGGER.info("receive delay message orderId:{}",orderId);
portalOrderService.cancelOrder(orderId);
}
}
Добавить интерфейс OmsPortalOrderService
package com.macro.mall.tiny.service;
import com.macro.mall.tiny.common.api.CommonResult;
import com.macro.mall.tiny.dto.OrderParam;
import org.springframework.transaction.annotation.Transactional;
/**
* 前台订单管理Service
* Created by macro on 2018/8/30.
*/
public interface OmsPortalOrderService {
/**
* 根据提交信息生成订单
*/
@Transactional
CommonResult generateOrder(OrderParam orderParam);
/**
* 取消单个超时订单
*/
@Transactional
void cancelOrder(Long orderId);
}
Добавьте класс реализации OmsPortalOrderServiceImpl для OmsPortalOrderService.
package com.macro.mall.tiny.service.impl;
import com.macro.mall.tiny.common.api.CommonResult;
import com.macro.mall.tiny.component.CancelOrderSender;
import com.macro.mall.tiny.dto.OrderParam;
import com.macro.mall.tiny.service.OmsPortalOrderService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* 前台订单管理Service
* Created by macro on 2018/8/30.
*/
@Service
public class OmsPortalOrderServiceImpl implements OmsPortalOrderService {
private static Logger LOGGER = LoggerFactory.getLogger(OmsPortalOrderServiceImpl.class);
@Autowired
private CancelOrderSender cancelOrderSender;
@Override
public CommonResult generateOrder(OrderParam orderParam) {
//todo 执行一系类下单操作,具体参考mall项目
LOGGER.info("process generateOrder");
//下单完成后开启一个延迟消息,用于当用户没有付款时取消订单(orderId应该在下单后生成)
sendDelayMessageCancelOrder(11L);
return CommonResult.success(null, "下单成功");
}
@Override
public void cancelOrder(Long orderId) {
//todo 执行一系类取消订单操作,具体参考mall项目
LOGGER.info("process cancelOrder orderId:{}",orderId);
}
private void sendDelayMessageCancelOrder(Long orderId) {
//获取订单超时时间,假设为60分钟
long delayTimes = 30 * 1000;
//发送延迟消息
cancelOrderSender.sendMessage(orderId, delayTimes);
}
}
Добавлен интерфейс определения OmsPortalOrderController.
package com.macro.mall.tiny.controller;
import com.macro.mall.tiny.dto.OrderParam;
import com.macro.mall.tiny.service.OmsPortalOrderService;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.ResponseBody;
/**
* 订单管理Controller
* Created by macro on 2018/8/30.
*/
@Controller
@Api(tags = "OmsPortalOrderController", description = "订单管理")
@RequestMapping("/order")
public class OmsPortalOrderController {
@Autowired
private OmsPortalOrderService portalOrderService;
@ApiOperation("根据购物车信息生成订单")
@RequestMapping(value = "/generateOrder", method = RequestMethod.POST)
@ResponseBody
public Object generateOrder(@RequestBody OrderParam orderParam) {
return portalOrderService.generateOrder(orderParam);
}
}
Проведите тестирование интерфейса
Вызов интерфейса заказа
Примечание. Время задержки сообщения установлено на 30 секунд.
Адрес исходного кода проекта
публика
проект торгового центраПолный набор учебных пособий сериализуется,Обратите внимание на публичный аккаунтПолучите это прямо сейчас.