Очередь задержки RabbitMQ

RabbitMQ

Что такое RabbitMQ

С десятками тысяч пользователей RabbitMQ является одним из самых популярных брокеров сообщений с открытым исходным кодом. От T-Mobile до Runtastic RabbitMQ используется небольшими стартапами и крупными предприятиями по всему миру.

RabbitMQ легковесен и прост в развертывании локально и в облаке. Он поддерживает несколько протоколов обмена сообщениями. RabbitMQ можно развернуть в распределенных и федеративных конфигурациях для удовлетворения потребностей в масштабируемости и высокой доступности.

RabbitMQ работает во многих операционных системах и облачных средах и предоставляет обширные инструменты разработки для большинства популярных языков.

Вышеприведенный контент переведен с оригинального текста официального сайта Адрес официального сайта:www.rabbitmq.com/

Что такое отложенная очередь

Очередь с задержкой также является очередью сообщений, но это очередь сообщений с функцией задержки. Проще говоря, в реальной разработке для некоторых бизнес-сценариев нам нужно позволить сообщениям в очереди потребляться потребителями в указанное время, например:

  • Сценарий 1: Заказ будет автоматически отменен, если заказ не будет оплачен в течение определенного периода времени после размещения заказа.
  • Сценарий 2: Сценарии с задержкой доставки, такие как T+d (задержка на рабочий день) или D+d (естественная задержка на день)
  • Сценарий 3: Новый пользователь не делает заказ в течение месяца после регистрации и отправляет смс, чтобы соблазнить волну

Некоторые люди могут спросить, в приведенной выше ситуации я также могу достичь цели, запустив процесс опроса задач по времени. Да, он действительно может достичь цели, но если объем данных этого вида бизнеса велик, его будет очень сложно обрабатывать, что будет сильно нагружать сервер, и будут большие ошибки при опросе. Таких проблем можно избежать, если использовать отложенную очередь.

Сам Rabbitmq напрямую не поддерживает очереди задержки.Очереди задержки RabbitMQ реализованы на основе сообщения TTL (время жизни) и обмена недоставленными письмами DLE (обмен недоставленными письмами):

  1. TTL: RabbitMQ может установить время выживания для очереди и сообщения соответственно.Правило — это меньшее значение из двух, то есть время истечения срока действия сообщения, когда очередь не имеет соединения с потребителем, или время истечения срока действия, когда сообщение не было израсходовано в очереди.
  2. DLE: просроченные сообщения направляются в указанную очередь недоставленных сообщений через связанный переключатель недоставленных сообщений, и потребители фактически потребляют сообщения в очереди недоставленных сообщений.

Этим достигается эффект задержки.

Установите RabbitMQ

В этой статье в качестве примера используется установка среды Linux:

Поскольку rabbitmq написан на erlang, требуется среда erlang.

Установить среду erlang

  1. Установите GCC GCC-C++ Openssl

    yum -y install make gcc gcc-c++ kernel-devel m4 ncurses-devel openssl-devel

  2. установить нкурсес

    yum -y install ncurses-devel

  3. установить эрланг

    wget http://erlang.org/download/otp_src_18.2.1.tar.gz
    tar xvfz otp_src_18.2.1.tar.gz 
    ./configure 
    make install
    

Установите RabbitMQ

wget GitHub.com/кролик присутствует/люди…

После установки таким способом файл в формате xz, и нужно распаковать xz

xz -d  rabbitmq-server-generic-unix-3.8.3.tar.xz
tar -xvf rabbitmq-server-generic-unix-3.8.3.tar

Если инструмент декомпрессии xz отсутствует, сначала необходимо установить инструмент xz.

yum install xz

Скопируйте распакованные файлы в /usr/local/

cp -r rabbitmq_server-3.8.3 /usr/local/rabbitmq

Изменить файл профиляvim /etc/profileДобавьте следующее

export PATH=/usr/local/rabbitmq/sbin:$PATH

воплощать в жизньsource /etc/profileсделать изменения вступившими в силу

Откройте режим управления rabbitmq

rabbitmq-plugins enable rabbitmq_management   #启动后台管理界面
rabbitmq-server -detached   #后台运行rabbitmq

Пользователь-гость не может получить доступ по умолчанию и должен изменить разрешения.

Добавить пользователя: rabbitmqctl add_user admin admin

Добавляем разрешения: rabbitmqctl set_permissions -p"/"pwd123"." "." ".*"

Изменить роли пользователей: rabbitmqctl set_user_tags admin администратор

Здесь вы можете увидеть следующий интерфейс, обратившись к ip:15672.

Успешный вход в систему с вновь созданным пользователем. На этом установка rabbitmq завершена. Если вы используете Alibaba Cloud Server, не забудьте открыть информацию о порте группы безопасности.

Установите плагин задержки

cd /usr/local/rabbitmq/pluginsПерейдите в каталог плагина

скачать плагин задержки rabbiimq

Адрес плагина:woohoo.rabbitcurrent.com/community-afraid…

оказатьсяrabbitmq_delayed_message_exchangeВерсия выбора плагина

Щелкните правой кнопкой мыши формат .ez, чтобы скопировать адрес ссылки, чтобы получить адрес загрузки.

выполнить в текущем каталоге

wget GitHub.com/кролик присутствует/люди… bbitmq_delayed_message_exchange-3.8.0.ez

Включить плагин задержки

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

Проверить результаты установки плагина

Если вы видите, что тип может быть выбран как тип x-delayed-message, это означает, что плагин задержки установлен успешно.

springboot2.0+ использует RabbitMQ

Войдите на официальный сайт spring и создайте структуру проекта springboot через официальный сайт. ps: Также можно использовать идею, эта статья знакомит с формой официального сайта весны

start.spring.io/

В этой статье выбраны зависимости, которые будут использоваться позже. Нажмите «Создать» ниже, чтобы сгенерировать проект, разархивировать его в рабочую среду Java и открыть с помощью idea.

простая очередь

  1. Добавить информацию о файле конфигурации rabbitmq

    spring.application.name=rabbitmq-delay-demo
    server.port=8081
    
    spring.rabbitmq.host=localhost
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=用户
    spring.rabbitmq.password=连接密码
    
  2. Написать класс конфигурации rabbitmq

    package com.yezi.rabbitmqdelaydemo.config;
    
    import org.springframework.amqp.core.Queue;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class RabbitMqConfig {
    
        /**
         * 初始化一个测试队列
         *
         * @return
         */
        @Bean
        public Queue helloQueue() {
            return new Queue("test.mq");
        }
    }
    
    
  3. Напишите класс отправки сообщений

    package com.yezi.rabbitmqdelaydemo.mq;
    
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    import java.time.LocalDateTime;
    
    @Component
    public class Sender {
    
        @Autowired
        private AmqpTemplate rabbitTemplate;
    
        public void send() {
            String context = "测试发送消息 " + LocalDateTime.now();
            System.out.println("Sender : " + context);
            this.rabbitTemplate.convertAndSend("test.mq", context);
        }
    
    }
    
    
  4. Напишите класс приема сообщений

    package com.yezi.rabbitmqdelaydemo.mq;
    
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    @Component
    @RabbitListener(queues = "test.mq")
    public class Receiver {
    
        @RabbitHandler
        public void process(String context) {
            System.out.println("Receiver : " + context);
        }
    
    }
    
  5. тестовая отправка

    package com.yezi.rabbitmqdelaydemo;
    
    import com.yezi.rabbitmqdelaydemo.mq.Sender;
    import org.junit.jupiter.api.Test;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    
    @SpringBootTest
    class RabbitmqDelayDemoApplicationTests {
    
    	@Autowired
    	private Sender sender;
    
    	@Test
    	void contextLoads() {
    		sender.send();
    	}
    
    }
    
    
  6. Результаты теста

Приведенный выше тест простого сообщения прошел успешно.

очередь задержки

  1. Напишите класс конфигурации

    package com.yezi.rabbitmqdelaydemo.config;
    
    import com.yezi.rabbitmqdelaydemo.mq.delayed.DelayedTopic;
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.CustomExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    import java.util.HashMap;
    import java.util.Map;
    
    @Configuration
    public class RabbitMqDelayedConfig {
    
        /**
         * 初始化延时队列
         *
         * @return
         */
        @Bean
        public Queue delayedQueue() {
            return new Queue(DelayedTopic.DELAYED_QUEUE_NAME);
        }
    
        /**
         * 定义一个延迟交换机
         *
         * @return
         */
        @Bean
        public CustomExchange delayExchange() {
            Map<String, Object> args = new HashMap<String, Object>();
            args.put("x-delayed-type", "direct");
            return new CustomExchange(DelayedTopic.DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, args);
        }
    
        /**
         * 绑定队列到这个延迟交换机上
         *
         * @param queue
         * @param customExchange
         * @return
         */
        @Bean
        public Binding bindingNotify(@Qualifier("delayedQueue") Queue queue,
                                     @Qualifier("delayExchange") CustomExchange customExchange) {
            return BindingBuilder.bind(queue).to(customExchange).with(DelayedTopic.DELAYED_ROUTING_KEY).noargs();
        }
    }
    
    

    Напишите класс определения ключевого слова

    package com.yezi.rabbitmqdelaydemo.mq.delayed;
    
    public interface DelayedTopic {
        String DELAYED_EXCHANGE_NAME = "delay_exchange";
        String DELAYED_QUEUE_NAME = "delayed.queue";
        String DELAYED_ROUTING_KEY = "delayed.queue.routingkey";
    }
    
    
  2. написать сообщение отправителю

    package com.yezi.rabbitmqdelaydemo.mq.delayed;
    
    
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    import java.time.LocalDateTime;
    
    @Component
    public class DelayedSender {
    
        @Autowired
        private AmqpTemplate rabbitTemplate;
    
    
        public void delayedMessage() {
            String context = "test delay message";
            System.out.println("Send time: " + LocalDateTime.now() + "  Send: " + context);
            //延时时间6秒
            rabbitTemplate.convertAndSend(DelayedTopic.DELAYED_EXCHANGE_NAME, DelayedTopic.DELAYED_ROUTING_KEY, context, a -> {
                a.getMessageProperties().setDelay(6000);
                return a;
            });
        }
    
    
    }
    
    
  3. Напишите сообщение потребителю

    package com.yezi.rabbitmqdelaydemo.mq.delayed;
    
    
    import com.rabbitmq.client.Channel;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    import java.io.IOException;
    import java.time.LocalDateTime;
    
    @Component
    public class DelayedReceiver {
    
        @RabbitListener(queues = DelayedTopic.DELAYED_QUEUE_NAME)
        public void receive(Message message, Channel channel) throws IOException {
            String s = new String(message.getBody());
            System.out.println("Received time: " + LocalDateTime.now() + "  Received: " + s);
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        }
    
    }
    
    
  4. тестовая отправка

    package com.yezi.rabbitmqdelaydemo.controller;
    
    
    import com.yezi.rabbitmqdelaydemo.mq.delayed.DelayedSender;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    @RestController
    public class DemoController {
    
        @Autowired
        private DelayedSender delayedSender;
    
        /**
         * 测试发送延时消息
         *
         * @return
         */
        @GetMapping("/delayedSender")
        public String delayedSender() {
            delayedSender.delayedMessage();
            return "ojbk";
        }
    }
    
    

  1. Результаты проверки

Пока видно, что сообщение получено через 6 секунд, а задержанное сообщение отправлено и получено успешно.

Способ SCS реализовать очередь задержки

Что такое СКС

SCS — это сокращение от Spring Cloud Stream, фреймворка для создания хорошо масштабируемых управляемых событиями микросервисов, подключенных к общим системам обмена сообщениями. Spring Cloud Stream предоставляет персонализированные реализации автоматической конфигурации для продуктов промежуточного программного обеспечения для обмена сообщениями некоторых поставщиков и представляет три основные концепции публикации-подписки, групп потребителей и разделения. Используя Spring Cloud Stream, разработчики могут эффективно упростить использование промежуточного программного обеспечения для обработки сообщений, чтобы системные разработчики могли больше сосредоточиться на обработке основной бизнес-логики.

Очередь задержки SCS
  1. Напишите конфигурацию канала

    package com.yezi.scsdemo.mq;
    
    import org.springframework.cloud.stream.annotation.Input;
    import org.springframework.cloud.stream.annotation.Output;
    import org.springframework.messaging.MessageChannel;
    import org.springframework.messaging.SubscribableChannel;
    
    public interface DelayedSink {
        String OUTPUT = "delayed-topic-output";
        String INPUT = "delayed-topic-input";
    
        @Output(OUTPUT)
        MessageChannel output();
    
        @Input(INPUT)
        SubscribableChannel input();
    }
    
    
  2. написать сообщение отправителю

    package com.yezi.scsdemo.mq;
    
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.messaging.support.MessageBuilder;
    import org.springframework.stereotype.Component;
    
    import java.time.LocalDateTime;
    
    @Component
    public class DelayedSender {
    
        @Autowired
        private DelayedSink delayedSink;
    
    
        public void delayedMessage() {
            String context = "test delay message";
            System.out.println("Send time: " + LocalDateTime.now() + "  Send: " + context);
            //延时时间20秒
            delayedSink.output().send(MessageBuilder.withPayload(context).setHeader("x-delay", 20000).build());
        }
    
    
    }
    
    
  3. Напишите сообщение потребителю

    package com.yezi.scsdemo.mq;
    
    
    import org.springframework.cloud.stream.annotation.EnableBinding;
    import org.springframework.cloud.stream.annotation.StreamListener;
    import org.springframework.stereotype.Component;
    
    import java.time.LocalDateTime;
    
    @EnableBinding(DelayedSink.class)
    @Component
    public class DelayedReceiver {
    
        @StreamListener(DelayedSink.INPUT)
        public void receive(String message) {
            System.out.println("Received time: " + LocalDateTime.now() + "  Received: " + message);
        }
    
    
    }
    
    

    Существует несколько основных аннотаций SCS. Конкретный контент можно найти в этом блоге. Здесь он не будет расширяться.

    blog.brotherspace.com/spring-ugly…Автор: Программист ДД

  4. Изменить файл конфигурацииapplication.propertiesДобавьте следующую информацию:

    server.port=8080
    spring.application.name=rabbitmq-delay-scs-demo
    
    
    spring.rabbitmq.host=localhost
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=账号
    spring.rabbitmq.password=密码
    
    #以下为生产者端配置
    #将发送者队列绑定到指定交换机
    spring.cloud.stream.bindings.delayed-topic-output.destination=delayed-topic-demo
    #开启延时,生产者和消费者端都需要开启这个配置
    spring.cloud.stream.rabbit.bindings.delayed-topic-output.producer.delayed-exchange=true
    
    #以下为消费者端配置
    #将消费者队列绑定到指定交换机
    spring.cloud.stream.bindings.delayed-topic-input.destination=delayed-topic-demo
    #消费默认分组,消息到达时同一个分组下多个实例情况,只会有一个实例消费这条消息
    spring.cloud.stream.bindings.delayed-topic-input.group=group-1
    #开启延时,生产者和消费者端都需要开启这个配置
    spring.cloud.stream.rabbit.bindings.delayed-topic-input.consumer.delayed-exchange=true
    
  5. Вставьте сюда зависимость файла pom

      <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
            </dependency>
        </dependencies>
    
  6. тестовая отправка

    package com.yezi.scsdemo.controller;
    
    
    import com.yezi.scsdemo.mq.DelayedSender;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    @RestController
    public class DemoController {
    
        @Autowired
        private DelayedSender delayedSender;
    
        /**
         * 测试发送延时消息
         *
         * @return
         */
        @GetMapping("/scs/delayedSender")
        public String delayedSender() {
            delayedSender.delayedMessage();
            return "scs ojbk";
        }
    }
    
    

  1. Результаты проверки

Здесь мы видим, что пришло сообщение. Так как мы установили время задержки 20 секунд при отправке, подождав 20 секунд, просмотрите информацию в консоли идеи

Видно, что через 20 секунд сообщение потребляется, и вышеприведенный тест сообщения о задержке SCS прошел успешно.

резюме

В целом, по сравнению с использованием конфигурационных классов для реализации отложенных сообщений, SCS имеет более лаконичный код и не требует привязки переключателей, нужно только реализовать привязку очередей и переключателей в виде конфигурации в свойствах или yml Пользователи могут больше сосредоточиться на бизнесе при использовании SCS!