RabbitMQ настолько прост в реализации отложенных сообщений, что весь плагин готов!

Java RabbitMQ
RabbitMQ настолько прост в реализации отложенных сообщений, что весь плагин готов!

Адрес фактического центра электронной коммерции SpringBoot (35k+star):GitHub.com/macro-positive/…

Резюме

Есть два способа реализовать отложенные сообщения в RabbitMQ, один из них — использовать死信队列реализации, другой - использовать延迟插件выполнить.死信队列Реализация, о которой мы говорили ранее, конкретная ссылка«торговый центр интегрирует RabbitMQ для реализации отложенных сообщений», на этот раз мы говорим о более простом, использовании延迟插件выполнить.

дошкольная подготовка

Чтобы изучить эту статью, вам нужно кое-что знать о RabbitMQ.Для тех, кто еще не знает, вы можете прочитать ее:«Практические советы по RabbitMQ подытожили за 3 дня, с чем-то! 》

Установка плагина

Сначала нам нужно скачать и установить плагин задержки для RabbitMQ.

  • Перейдите на официальный сайт RabbitMQ, чтобы загрузить подключаемый модуль, адрес подключаемого модуля:woohoo.rabbitcurrent.com/community-afraid…

  • прямой поискrabbitmq_delayed_message_exchangeВы можете найти плагин, который нам нужно скачать, скачать версию, которая соответствует RabbitMQ, не ошибитесь;

  • Скопируйте файлы плагина в каталог установки RabbitMQ.pluginsПод содержанием;

  • Войдите в каталог установки RabbitMQ.sbinкаталог, используйте следующую команду, чтобы включить плагин задержки;
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
  • После успешного включения подключаемого модуля вы можете увидеть следующую информацию, а затем перезапустить службу RabbitMQ.

Реализовать отложенные сообщения

Далее нам нужно реализовать функцию отложенного сообщения в SpringBoot, и в этот раз мы по-прежнему используем сцену оформления заказа на товары. Например, если пользователь оформляет заказ, если он не оплатил заказ в течение 60 минут, то заказ будет аннулирован — это типичный сценарий использования отложенных сообщений.

  • Сначала нам нужноpom.xmlфайл добавленAMQPсвязанные зависимости;
<!--消息队列相关依赖-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
  • позжеapplication.ymlДобавить соответствующую конфигурацию RabbitMQ;
spring:
  rabbitmq:
    host: localhost # rabbitmq的连接地址
    port: 5672 # rabbitmq的连接端口号
    virtual-host: /mall # rabbitmq的虚拟host
    username: mall # rabbitmq的用户名
    password: mall # rabbitmq的密码
    publisher-confirms: true #如果对异步消息需要回调必须设置为true
  • Затем создайте конфигурацию Java RabbitMQ, которая в основном используется для настройки переключателей, очередей и отношений привязки;
/**
 * 消息队列配置
 * Created by macro on 2018/9/14.
 */
@Configuration
public class RabbitMqConfig {

    /**
     * 订单延迟插件消息队列所绑定的交换机
     */
    @Bean
    CustomExchange  orderPluginDirect() {
        //创建一个自定义交换机,可以发送延迟消息
        Map<String, Object> args = new HashMap<>();
        args.put("x-delayed-type", "direct");
        return new CustomExchange(QueueEnum.QUEUE_ORDER_PLUGIN_CANCEL.getExchange(), "x-delayed-message",true, false,args);
    }

    /**
     * 订单延迟插件队列
     */
    @Bean
    public Queue orderPluginQueue() {
        return new Queue(QueueEnum.QUEUE_ORDER_PLUGIN_CANCEL.getName());
    }

    /**
     * 将订单延迟插件队列绑定到交换机
     */
    @Bean
    public Binding orderPluginBinding(CustomExchange orderPluginDirect,Queue orderPluginQueue) {
        return BindingBuilder
                .bind(orderPluginQueue)
                .to(orderPluginDirect)
                .with(QueueEnum.QUEUE_ORDER_PLUGIN_CANCEL.getRouteKey())
                .noargs();
    }

}
  • Создайте отправителя сообщения об отмене заказа, установив сообщениеx-delayзаголовок для установки времени задержки отправки сообщений с биржи в очередь;
/**
 * 取消订单消息的发出者
 * Created by macro on 2018/9/14.
 */
@Component
public class CancelOrderSender {
    private static Logger LOGGER =LoggerFactory.getLogger(CancelOrderSender.class);
    @Autowired
    private AmqpTemplate amqpTemplate;

    public void sendMessage(Long orderId,final long delayTimes){
        //给延迟队列发送消息
        amqpTemplate.convertAndSend(QueueEnum.QUEUE_ORDER_PLUGIN_CANCEL.getExchange(), QueueEnum.QUEUE_ORDER_PLUGIN_CANCEL.getRouteKey(), orderId, new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                //给消息设置延迟毫秒值
                message.getMessageProperties().setHeader("x-delay",delayTimes);
                return message;
            }
        });
        LOGGER.info("send delay message orderId:{}",orderId);
    }
}
  • Создайте приемник для сообщений об отмене заказа для обработки сообщений в очереди плагина задержки заказа.
/**
 * 取消订单消息的处理者
 * Created by macro on 2018/9/14.
 */
@Component
@RabbitListener(queues = "mall.order.cancel.plugin")
public class CancelOrderReceiver {
    private static Logger LOGGER =LoggerFactory.getLogger(CancelOrderReceiver.class);
    @Autowired
    private OmsPortalOrderService portalOrderService;
    @RabbitHandler
    public void handle(Long orderId){
        LOGGER.info("receive delay message orderId:{}",orderId);
        portalOrderService.cancelOrder(orderId);
    }
}
  • Затем добавьте следующую логику в наш класс бизнес-реализации заказа: перед тем, как заказ будет успешным, в очередь сообщений отправляется отложенное сообщение об отмене заказа, чтобы, если заказ не оплачен, заказ можно было отменить;
/**
 * 前台订单管理Service
 * Created by macro on 2018/8/30.
 */
@Service
public class OmsPortalOrderServiceImpl implements OmsPortalOrderService {
    private static Logger LOGGER = LoggerFactory.getLogger(OmsPortalOrderServiceImpl.class);
    @Autowired
    private CancelOrderSender cancelOrderSender;

    @Override
    public CommonResult generateOrder(OrderParam orderParam) {
        //todo 执行一系类下单操作,具体参考mall项目
        LOGGER.info("process generateOrder");
        //下单完成后开启一个延迟消息,用于当用户没有付款时取消订单(orderId应该在下单后生成)
        sendDelayMessageCancelOrder(11L);
        return CommonResult.success(null, "下单成功");
    }

    @Override
    public void cancelOrder(Long orderId) {
        //todo 执行一系类取消订单操作,具体参考mall项目
        LOGGER.info("process cancelOrder orderId:{}",orderId);
    }

    private void sendDelayMessageCancelOrder(Long orderId) {
        //获取订单超时时间,假设为60分钟(测试用的30秒)
        long delayTimes = 30 * 1000;
        //发送延迟消息
        cancelOrderSender.sendMessage(orderId, delayTimes);
    }

}
  • После запуска проекта вызовите интерфейс заказа в Swagger;

  • Глядя на журнал консоли после завершения вызова, можно обнаружить, что существует разница между отправкой сообщения и получением сообщения.30s, время задержки, которое мы устанавливаем.
2020-06-08 13:46:01.474  INFO 1644 --- [nio-8080-exec-1] c.m.m.t.s.i.OmsPortalOrderServiceImpl    : process generateOrder
2020-06-08 13:46:01.482  INFO 1644 --- [nio-8080-exec-1] c.m.m.tiny.component.CancelOrderSender   : send delay message orderId:11
2020-06-08 13:46:31.517  INFO 1644 --- [cTaskExecutor-4] c.m.m.t.component.CancelOrderReceiver    : receive delay message orderId:11
2020-06-08 13:46:31.520  INFO 1644 --- [cTaskExecutor-4] c.m.m.t.s.i.OmsPortalOrderServiceImpl    : process cancelOrder orderId:11

Сравнение двух реализаций

Ранее мы использовали метод очереди недоставленных сообщений, здесь мы сравним два метода и поговорим о принципах реализации этих двух методов.

очередь недоставленных сообщений

Очередь недоставленных сообщений — это очередь. Если сообщение отправлено в очередь и превышает установленное время, оно будет перенаправлено в установленную очередь для обработки сообщения об истечении времени ожидания. С помощью этой функции сообщение может быть задержано.

Плагин задержки

Установив плагины и настроив коммутатор, коммутатор имеет возможность задерживать отправку сообщений, тем самым реализуя отложенные сообщения.

в заключении

Поскольку для метода очереди недоставленных сообщений необходимо создать два переключателя (переключатель очереди недоставленных сообщений + переключатель очереди обработки), две очереди (очередь недоставленных сообщений + очередь обработки), а для метода подключаемого модуля задержки необходимо создать только один переключатель и одну очередь, так что последний использует Это проще.

Адрес исходного кода проекта

GitHub.com/macro-positive/…

публика

проект торгового центраПолный набор учебных пособий сериализуется,Обратите внимание на общедоступный номерПолучите это прямо сейчас.

公众号图片