Резюме:
Этот пост в блоге является десятым из «Серии реальных боевых действий системы Java Seckill». В этом посте блога мы будем использовать очередь недоставленных сообщений RabbitMQ для решения ситуации «пользователь успешно сгенерировал заказ в seckill, но не оплатил его». для этого». Давайте объединимся Оцените мощь очередей недоставленных сообщений RabbitMQ в реальной бизнес-среде!
содержание:
Что касается промежуточного программного обеспечения сообщений RabbitMQ, Debug был кратко описан и представлен в предыдущей главе, поэтому я не буду повторяться здесь! В этой статье мы будем использовать очередь недоставленных сообщений RabbitMQ для достижения таких бизнес-требований: «После того, как пользователь успешно убивает и успешно создает запись заказа, теоретически должна быть выполнена операция для оплаты, но есть ситуация, когда пользователь поздно.Задержка с оплатой~ Что касается причины, я не знаю!
Для этого сценария вы можете испытать его на некоторых платформах торговых центров, то есть после выбора продукта, добавления его в корзину, нажмите, чтобы оплатить, в это время будет обратный отсчет, напоминающий вам, что вам нужно завершить оплата в указанные сроки, иначе заказ не состоится!
Для обработки такого рода бизнес-логики традиционным методом является использование «метода таймера», регулярный опрос для получения заказов, которые превысили указанное время, а затем выполнение ряда мер по обработке (например, попытка отправки SMS-сообщений). пользователям, напоминая больше, чем Срок действия долгосрочных заказов истекает и т. д.), в этой системе seckill мы будем использовать компонент очереди недоставленных сообщений RabbitMQ для реализации мер «аннулирования» для заказа!
«Очередь недоставленных писем», — считает Гу Минси, — это особая очередь, которая может задерживать и задерживать сообщения на определенный период времени обработки, но вы можете подождать определенный период времени перед обработкой» функции! Обычная очередь не работает, то есть сообщение после попадания в очередь будет отслеживаться и потребляться соответствующим потребителем немедленно.На следующем рисунке показана базовая модель сообщения обычной очереди:
Что касается "очереди недоставленных сообщений", то ее состав и использование относительно сложны. В нормальных условиях очередь недоставленных сообщений состоит из трех основных компонентов.сочинение: переключение недоставленных сообщений + маршрутизация недоставленных сообщений + TTL (время существования сообщения ~ не требуется), а очередь недоставленных сообщений может быть настроена с помощью «базового переключения, ориентированного на производителя + базовой маршрутизации».связыватьПоэтому производитель сначала отправляет сообщение в модель сообщения, связанную «базовым коммутатором + базовой маршрутизацией», то есть косвенно попадает в очередь недоставленных сообщений. для входа на следующую станцию передачи, то есть «переключение недоставленных сообщений потребителя + маршрутизация недоставленных сообщений»связыватьСообщение от модели. Как показано ниже:
Затем мы используем фактический код для построения модели сообщений очереди недоставленных сообщений и применяем эту модель сообщений к вышеуказанным функциональным модулям системы seckill.
(1) Во-первых, необходимо создать модель сообщений очереди недоставленных сообщений в классе конфигурации RabbitmqConfig.Полный исходный код выглядит следующим образом:
//构建秒杀成功之后-订单超时未支付的死信队列消息模型
@Bean
public Queue successKillDeadQueue(){
Map<String, Object> argsMap= Maps.newHashMap();
argsMap.put("x-dead-letter-exchange",env.getProperty("mq.kill.item.success.kill.dead.exchange"));
argsMap.put("x-dead-letter-routing-key",env.getProperty("mq.kill.item.success.kill.dead.routing.key"));
return new Queue(env.getProperty("mq.kill.item.success.kill.dead.queue"),true,false,false,argsMap);
}
//基本交换机
@Bean
public TopicExchange successKillDeadProdExchange(){
return new TopicExchange(env.getProperty("mq.kill.item.success.kill.dead.prod.exchange"),true,false);
}
//创建基本交换机+基本路由 -> 死信队列 的绑定
@Bean
public Binding successKillDeadProdBinding(){
return BindingBuilder.bind(successKillDeadQueue()).to(successKillDeadProdExchange()).with(env.getProperty("mq.kill.item.success.kill.dead.prod.routing.key"));
}
//真正的队列
@Bean
public Queue successKillRealQueue(){
return new Queue(env.getProperty("mq.kill.item.success.kill.dead.real.queue"),true);
}
//死信交换机
@Bean
public TopicExchange successKillDeadExchange(){
return new TopicExchange(env.getProperty("mq.kill.item.success.kill.dead.exchange"),true,false);
}
//死信交换机+死信路由->真正队列 的绑定
@Bean
public Binding successKillDeadBinding(){
return BindingBuilder.bind(successKillRealQueue()).to(successKillDeadExchange()).with(env.getProperty("mq.kill.item.success.kill.dead.routing.key"));
}
Среди них переменные, считываемые экземпляром объекта переменной среды env, настраиваются в файле конфигурации application.properties, и их значения следующие:
#订单超时未支付自动失效-死信队列消息模型
mq.kill.item.success.kill.dead.queue=${mq.env}.kill.item.success.kill.dead.queue
mq.kill.item.success.kill.dead.exchange=${mq.env}.kill.item.success.kill.dead.exchange
mq.kill.item.success.kill.dead.routing.key=${mq.env}.kill.item.success.kill.dead.routing.key
mq.kill.item.success.kill.dead.real.queue=${mq.env}.kill.item.success.kill.dead.real.queue
mq.kill.item.success.kill.dead.prod.exchange=${mq.env}.kill.item.success.kill.dead.prod.exchange
mq.kill.item.success.kill.dead.prod.routing.key=${mq.env}.kill.item.success.kill.dead.prod.routing.key
#单位为ms
mq.kill.item.success.kill.expire=20000
(2) После того, как модель сообщения будет успешно создана, нам нужно разработать функцию «отправки сообщений в очередь недоставленных сообщений» в общем классе службы отправки сообщений RabbitMQ RabbitSenderService, В этом методе функции мы указываем время выживания TTL сообщения, значением является настраиваемая переменная: значение mq.kill.item.success.kill.expire, то есть 20 с, его полный исходный код выглядит следующим образом:
//秒杀成功后生成抢购订单-发送信息入死信队列,等待着一定时间失效超时未支付的订单
public void sendKillSuccessOrderExpireMsg(final String orderCode){
try {
if (StringUtils.isNotBlank(orderCode)){
KillSuccessUserInfo info=itemKillSuccessMapper.selectByCode(orderCode);
if (info!=null){
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
rabbitTemplate.setExchange(env.getProperty("mq.kill.item.success.kill.dead.prod.exchange"));
rabbitTemplate.setRoutingKey(env.getProperty("mq.kill.item.success.kill.dead.prod.routing.key"));
rabbitTemplate.convertAndSend(info, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
MessageProperties mp=message.getMessageProperties();
mp.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
mp.setHeader(AbstractJavaTypeMapper.DEFAULT_CONTENT_CLASSID_FIELD_NAME,KillSuccessUserInfo.class);
//TODO:动态设置TTL(为了测试方便,暂且设置20s)
mp.setExpiration(env.getProperty("mq.kill.item.success.kill.expire"));
return message;
}
});
}
}
}catch (Exception e){
log.error("秒杀成功后生成抢购订单-发送信息入死信队列,等待着一定时间失效超时未支付的订单-发生异常,消息为:{}",orderCode,e.fillInStackTrace());
}
}
Из кода «отправить сообщение в очередь недоставленных сообщений» мы видим, что сообщение сначала привязано к «базовому коммутатору + базовому маршруту».Модель сообщений для очередей недоставленных сообщенийсередина!当消息到了TTL,自然会从死信队列中出来(即“解脱了”),然后进入下一个中转站,即:“死信交换机+死信路由” 所绑定而成的Модель сообщений реальной очередиВ конце концов, это действительно отслеживается и потребляется потребителями!
На этом этапе вы можете запустить весь проект и систему на внешнем сервере tomcat, затем открыть серверное консольное приложение RabbitMQ, найти очередь недоставленных сообщений и просмотреть сведения об очереди недоставленных сообщений, как показано на следующем рисунке. :
(3) Наконец, необходимо отслеживать и обрабатывать сообщения в «реальной очереди» в общем классе службы мониторинга сообщений RabbitMQ RabbitReceiverService: здесь мы аннулируем заказ (при условии, что платеж еще не был произведен!), и его завершение исходный код выглядит следующим образом:
//用户秒杀成功后超时未支付-监听者
@RabbitListener(queues = {"${mq.kill.item.success.kill.dead.real.queue}"},containerFactory = "singleListenerContainer")
public void consumeExpireOrder(KillSuccessUserInfo info){
try {
log.info("用户秒杀成功后超时未支付-监听者-接收消息:{}",info);
if (info!=null){
ItemKillSuccess entity=itemKillSuccessMapper.selectByPrimaryKey(info.getCode());
if (entity!=null && entity.getStatus().intValue()==0){
itemKillSuccessMapper.expireOrder(info.getCode());
}
}
}catch (Exception e){
log.error("用户秒杀成功后超时未支付-监听者-发生异常:",e.fillInStackTrace());
}
}
Среди них операция обновления записей некорректного ордера реализована с помощью itemKillSuccessMapper.expireOrder(info.getCode());, а соответствующий динамический Sql записывается следующим образом:
<!--失效更新订单信息-->
<update id="expireOrder">
UPDATE item_kill_success
SET status = -1
WHERE code = #{code} AND status = 0
</update>
(4) На данный момент битва кода модели сообщений очереди недоставленных сообщений RabbitMQ завершена! Наконец, мне нужно вызвать его только в том месте, где «в тот момент, когда пользователь успешно создает заказ в Seckill, отправить сообщение в очередь недоставленных сообщений», и код вызова выглядит следующим образом:
/**
* 通用的方法-记录用户秒杀成功后生成的订单-并进行异步邮件消息的通知
* @param kill
* @param userId
* @throws Exception
*/
private void commonRecordKillSuccessInfo(ItemKill kill, Integer userId) throws Exception{
//TODO:记录抢购成功后生成的秒杀订单记录
ItemKillSuccess entity=new ItemKillSuccess();
String orderNo=String.valueOf(snowFlake.nextId());
//entity.setCode(RandomUtil.generateOrderCode()); //传统时间戳+N位随机数
entity.setCode(orderNo); //雪花算法
entity.setItemId(kill.getItemId());
entity.setKillId(kill.getId());
entity.setUserId(userId.toString());
entity.setStatus(SysConstant.OrderStatus.SuccessNotPayed.getCode().byteValue());
entity.setCreateTime(DateTime.now().toDate());
//TODO:学以致用,举一反三 -> 仿照单例模式的双重检验锁写法
if (itemKillSuccessMapper.countByKillUserId(kill.getId(),userId) <= 0){
int res=itemKillSuccessMapper.insertSelective(entity);
if (res>0){
//TODO:进行异步邮件消息的通知=rabbitmq+mail
rabbitSenderService.sendKillSuccessEmailMsg(orderNo);
//TODO:入死信队列,用于 “失效” 超过指定的TTL时间时仍然未支付的订单
rabbitSenderService.sendKillSuccessOrderExpireMsg(orderNo);
}
}
}
Наконец, это самопроверка: нажмите кнопку «Купить», после успешного второго уничтожения пользователя сообщение будет отправлено в очередь недоставленных сообщений (это можно увидеть в бэкэнд-консоли RabbitMQ). хорошее сообщение о готовности), подождите 20 секунд. Вы можете видеть, что сообщение передается в реальную очередь и отслеживается и потребляется реальным потребителем, как показано ниже:
Что ж, введение «очереди недоставленных сообщений RabbitMQ» и применение реального боя будет представлено здесь на данный момент.Этот метод может быть очень гибким для «неоплаченных заказов с истекшим временем ожидания», и весь процесс «автоматический». ., естественно», без необходимости вручную нажимать кнопку, чтобы запустить его! Конечно, не все идеально, как и с очередями недоставленных сообщений, в одной статье мы опишем подводные камни этого метода и используем соответствующие решения для их решения!
Пополнить:
1. В настоящее время общая конструкция и кодирование этой системы seckill завершены.Полный адрес базы данных исходного кода можно скачать здесь:git ee.com/steady jack/… Помните Вилка и Звезда!!
2. Наконец, обратите внимание на технический публичный аккаунт Debug в WeChat: