Серия боевой системы Java Spike ~ Очередь недоставленных писем RabbitMQ обрабатывает неоплаченные сверхурочные заказы

RabbitMQ

Резюме:

Этот пост в блоге является десятым из «Серии реальных боевых действий системы Java Seckill». В этом посте блога мы будем использовать очередь недоставленных сообщений RabbitMQ для решения ситуации «пользователь успешно сгенерировал заказ в seckill, но не оплатил его». для этого». Давайте объединимся Оцените мощь очередей недоставленных сообщений RabbitMQ в реальной бизнес-среде!

содержание:

Что касается промежуточного программного обеспечения сообщений RabbitMQ, Debug был кратко описан и представлен в предыдущей главе, поэтому я не буду повторяться здесь! В этой статье мы будем использовать очередь недоставленных сообщений RabbitMQ для достижения таких бизнес-требований: «После того, как пользователь успешно убивает и успешно создает запись заказа, теоретически должна быть выполнена операция для оплаты, но есть ситуация, когда пользователь поздно.Задержка с оплатой~ Что касается причины, я не знаю!

Для этого сценария вы можете испытать его на некоторых платформах торговых центров, то есть после выбора продукта, добавления его в корзину, нажмите, чтобы оплатить, в это время будет обратный отсчет, напоминающий вам, что вам нужно завершить оплата в указанные сроки, иначе заказ не состоится!

Для обработки такого рода бизнес-логики традиционным методом является использование «метода таймера», регулярный опрос для получения заказов, которые превысили указанное время, а затем выполнение ряда мер по обработке (например, попытка отправки SMS-сообщений). пользователям, напоминая больше, чем Срок действия долгосрочных заказов истекает и т. д.), в этой системе seckill мы будем использовать компонент очереди недоставленных сообщений RabbitMQ для реализации мер «аннулирования» для заказа!

«Очередь недоставленных писем», — считает Гу Минси, — это особая очередь, которая может задерживать и задерживать сообщения на определенный период времени обработки, но вы можете подождать определенный период времени перед обработкой» функции! Обычная очередь не работает, то есть сообщение после попадания в очередь будет отслеживаться и потребляться соответствующим потребителем немедленно.На следующем рисунке показана базовая модель сообщения обычной очереди:

89276ac961f259926a62454e30ffc33e4d1.jpg

Что касается "очереди недоставленных сообщений", то ее состав и использование относительно сложны. В нормальных условиях очередь недоставленных сообщений состоит из трех основных компонентов.сочинение: переключение недоставленных сообщений + маршрутизация недоставленных сообщений + TTL (время существования сообщения ~ не требуется), а очередь недоставленных сообщений может быть настроена с помощью «базового переключения, ориентированного на производителя + базовой маршрутизации».связыватьПоэтому производитель сначала отправляет сообщение в модель сообщения, связанную «базовым коммутатором + базовой маршрутизацией», то есть косвенно попадает в очередь недоставленных сообщений. для входа на следующую станцию ​​передачи, то есть «переключение недоставленных сообщений потребителя + маршрутизация недоставленных сообщений»связыватьСообщение от модели. Как показано ниже:

Затем мы используем фактический код для построения модели сообщений очереди недоставленных сообщений и применяем эту модель сообщений к вышеуказанным функциональным модулям системы seckill.

(1) Во-первых, необходимо создать модель сообщений очереди недоставленных сообщений в классе конфигурации RabbitmqConfig.Полный исходный код выглядит следующим образом:

//构建秒杀成功之后-订单超时未支付的死信队列消息模型

@Bean
public Queue successKillDeadQueue(){
    Map<String, Object> argsMap= Maps.newHashMap();
    argsMap.put("x-dead-letter-exchange",env.getProperty("mq.kill.item.success.kill.dead.exchange"));
    argsMap.put("x-dead-letter-routing-key",env.getProperty("mq.kill.item.success.kill.dead.routing.key"));
    return new Queue(env.getProperty("mq.kill.item.success.kill.dead.queue"),true,false,false,argsMap);
}

//基本交换机
@Bean
public TopicExchange successKillDeadProdExchange(){
    return new TopicExchange(env.getProperty("mq.kill.item.success.kill.dead.prod.exchange"),true,false);
}
//创建基本交换机+基本路由 -> 死信队列 的绑定
@Bean
public Binding successKillDeadProdBinding(){
    return BindingBuilder.bind(successKillDeadQueue()).to(successKillDeadProdExchange()).with(env.getProperty("mq.kill.item.success.kill.dead.prod.routing.key"));
}
//真正的队列
@Bean
public Queue successKillRealQueue(){
    return new Queue(env.getProperty("mq.kill.item.success.kill.dead.real.queue"),true);
}
//死信交换机
@Bean
public TopicExchange successKillDeadExchange(){
    return new TopicExchange(env.getProperty("mq.kill.item.success.kill.dead.exchange"),true,false);
}
//死信交换机+死信路由->真正队列 的绑定
@Bean
public Binding successKillDeadBinding(){
    return BindingBuilder.bind(successKillRealQueue()).to(successKillDeadExchange()).with(env.getProperty("mq.kill.item.success.kill.dead.routing.key"));
}

Среди них переменные, считываемые экземпляром объекта переменной среды env, настраиваются в файле конфигурации application.properties, и их значения следующие:

#订单超时未支付自动失效-死信队列消息模型
mq.kill.item.success.kill.dead.queue=${mq.env}.kill.item.success.kill.dead.queue
mq.kill.item.success.kill.dead.exchange=${mq.env}.kill.item.success.kill.dead.exchange
mq.kill.item.success.kill.dead.routing.key=${mq.env}.kill.item.success.kill.dead.routing.key

mq.kill.item.success.kill.dead.real.queue=${mq.env}.kill.item.success.kill.dead.real.queue
mq.kill.item.success.kill.dead.prod.exchange=${mq.env}.kill.item.success.kill.dead.prod.exchange
mq.kill.item.success.kill.dead.prod.routing.key=${mq.env}.kill.item.success.kill.dead.prod.routing.key

#单位为ms
mq.kill.item.success.kill.expire=20000


(2) После того, как модель сообщения будет успешно создана, нам нужно разработать функцию «отправки сообщений в очередь недоставленных сообщений» в общем классе службы отправки сообщений RabbitMQ RabbitSenderService, В этом методе функции мы указываем время выживания TTL сообщения, значением является настраиваемая переменная: значение mq.kill.item.success.kill.expire, то есть 20 с, его полный исходный код выглядит следующим образом:

//秒杀成功后生成抢购订单-发送信息入死信队列,等待着一定时间失效超时未支付的订单
public void sendKillSuccessOrderExpireMsg(final String orderCode){
    try {
        if (StringUtils.isNotBlank(orderCode)){
            KillSuccessUserInfo info=itemKillSuccessMapper.selectByCode(orderCode);
            if (info!=null){
                rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
                rabbitTemplate.setExchange(env.getProperty("mq.kill.item.success.kill.dead.prod.exchange"));
                rabbitTemplate.setRoutingKey(env.getProperty("mq.kill.item.success.kill.dead.prod.routing.key"));
                rabbitTemplate.convertAndSend(info, new MessagePostProcessor() {
                    @Override
                    public Message postProcessMessage(Message message) throws AmqpException {
                        MessageProperties mp=message.getMessageProperties();
                        mp.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                        mp.setHeader(AbstractJavaTypeMapper.DEFAULT_CONTENT_CLASSID_FIELD_NAME,KillSuccessUserInfo.class);

                        //TODO:动态设置TTL(为了测试方便,暂且设置20s)
                        mp.setExpiration(env.getProperty("mq.kill.item.success.kill.expire"));
                        return message;
                    }
                });
            }
        }
    }catch (Exception e){
        log.error("秒杀成功后生成抢购订单-发送信息入死信队列,等待着一定时间失效超时未支付的订单-发生异常,消息为:{}",orderCode,e.fillInStackTrace());
    }
}

Из кода «отправить сообщение в очередь недоставленных сообщений» мы видим, что сообщение сначала привязано к «базовому коммутатору + базовому маршруту».Модель сообщений для очередей недоставленных сообщенийсередина!当消息到了TTL,自然会从死信队列中出来(即“解脱了”),然后进入下一个中转站,即:“死信交换机+死信路由” 所绑定而成的Модель сообщений реальной очередиВ конце концов, это действительно отслеживается и потребляется потребителями!

На этом этапе вы можете запустить весь проект и систему на внешнем сервере tomcat, затем открыть серверное консольное приложение RabbitMQ, найти очередь недоставленных сообщений и просмотреть сведения об очереди недоставленных сообщений, как показано на следующем рисунке. :


(3) Наконец, необходимо отслеживать и обрабатывать сообщения в «реальной очереди» в общем классе службы мониторинга сообщений RabbitMQ RabbitReceiverService: здесь мы аннулируем заказ (при условии, что платеж еще не был произведен!), и его завершение исходный код выглядит следующим образом:

//用户秒杀成功后超时未支付-监听者
@RabbitListener(queues = {"${mq.kill.item.success.kill.dead.real.queue}"},containerFactory = "singleListenerContainer")
public void consumeExpireOrder(KillSuccessUserInfo info){
    try {
        log.info("用户秒杀成功后超时未支付-监听者-接收消息:{}",info);

        if (info!=null){
            ItemKillSuccess entity=itemKillSuccessMapper.selectByPrimaryKey(info.getCode());
            if (entity!=null && entity.getStatus().intValue()==0){
                itemKillSuccessMapper.expireOrder(info.getCode());
            }
        }
    }catch (Exception e){
        log.error("用户秒杀成功后超时未支付-监听者-发生异常:",e.fillInStackTrace());
    }
}

Среди них операция обновления записей некорректного ордера реализована с помощью itemKillSuccessMapper.expireOrder(info.getCode());, а соответствующий динамический Sql записывается следующим образом:

<!--失效更新订单信息-->
<update id="expireOrder">
  UPDATE item_kill_success
  SET status = -1
  WHERE code = #{code} AND status = 0
</update>


(4) На данный момент битва кода модели сообщений очереди недоставленных сообщений RabbitMQ завершена! Наконец, мне нужно вызвать его только в том месте, где «в тот момент, когда пользователь успешно создает заказ в Seckill, отправить сообщение в очередь недоставленных сообщений», и код вызова выглядит следующим образом:

/**
 * 通用的方法-记录用户秒杀成功后生成的订单-并进行异步邮件消息的通知
 * @param kill
 * @param userId
 * @throws Exception
 */
private void commonRecordKillSuccessInfo(ItemKill kill, Integer userId) throws Exception{
    //TODO:记录抢购成功后生成的秒杀订单记录

    ItemKillSuccess entity=new ItemKillSuccess();
    String orderNo=String.valueOf(snowFlake.nextId());

    //entity.setCode(RandomUtil.generateOrderCode());   //传统时间戳+N位随机数
    entity.setCode(orderNo); //雪花算法
    entity.setItemId(kill.getItemId());
    entity.setKillId(kill.getId());
    entity.setUserId(userId.toString());
    entity.setStatus(SysConstant.OrderStatus.SuccessNotPayed.getCode().byteValue());
    entity.setCreateTime(DateTime.now().toDate());
    //TODO:学以致用,举一反三 -> 仿照单例模式的双重检验锁写法
    if (itemKillSuccessMapper.countByKillUserId(kill.getId(),userId) <= 0){
        int res=itemKillSuccessMapper.insertSelective(entity);

        if (res>0){
            //TODO:进行异步邮件消息的通知=rabbitmq+mail
            rabbitSenderService.sendKillSuccessEmailMsg(orderNo);

            //TODO:入死信队列,用于 “失效” 超过指定的TTL时间时仍然未支付的订单
            rabbitSenderService.sendKillSuccessOrderExpireMsg(orderNo);
        }
    }
}


Наконец, это самопроверка: нажмите кнопку «Купить», после успешного второго уничтожения пользователя сообщение будет отправлено в очередь недоставленных сообщений (это можно увидеть в бэкэнд-консоли RabbitMQ). хорошее сообщение о готовности), подождите 20 секунд. Вы можете видеть, что сообщение передается в реальную очередь и отслеживается и потребляется реальным потребителем, как показано ниже:

Что ж, введение «очереди недоставленных сообщений RabbitMQ» и применение реального боя будет представлено здесь на данный момент.Этот метод может быть очень гибким для «неоплаченных заказов с истекшим временем ожидания», и весь процесс «автоматический». ., естественно», без необходимости вручную нажимать кнопку, чтобы запустить его! Конечно, не все идеально, как и с очередями недоставленных сообщений, в одной статье мы опишем подводные камни этого метода и используем соответствующие решения для их решения!

Пополнить:

1. В настоящее время общая конструкция и кодирование этой системы seckill завершены.Полный адрес базы данных исходного кода можно скачать здесь:git ee.com/steady jack/… Помните Вилка и Звезда!!

2. Наконец, обратите внимание на технический публичный аккаунт Debug в WeChat: