Что такое RabbitMQ
С десятками тысяч пользователей RabbitMQ является одним из самых популярных брокеров сообщений с открытым исходным кодом. От T-Mobile до Runtastic RabbitMQ используется небольшими стартапами и крупными предприятиями по всему миру.
RabbitMQ легковесен и прост в развертывании локально и в облаке. Он поддерживает несколько протоколов обмена сообщениями. RabbitMQ можно развернуть в распределенных и федеративных конфигурациях для удовлетворения потребностей в масштабируемости и высокой доступности.
RabbitMQ работает во многих операционных системах и облачных средах и предоставляет обширные инструменты разработки для большинства популярных языков.
Вышеприведенный контент переведен с оригинального текста официального сайта Адрес официального сайта:www.rabbitmq.com/
Что такое отложенная очередь
Очередь с задержкой также является очередью сообщений, но это очередь сообщений с функцией задержки. Проще говоря, в реальной разработке для некоторых бизнес-сценариев нам нужно позволить сообщениям в очереди потребляться потребителями в указанное время, например:
- Сценарий 1: Заказ будет автоматически отменен, если заказ не будет оплачен в течение определенного периода времени после размещения заказа.
- Сценарий 2: Сценарии с задержкой доставки, такие как T+d (задержка на рабочий день) или D+d (естественная задержка на день)
- Сценарий 3: Новый пользователь не делает заказ в течение месяца после регистрации и отправляет смс, чтобы соблазнить волну
Некоторые люди могут спросить, в приведенной выше ситуации я также могу достичь цели, запустив процесс опроса задач по времени. Да, он действительно может достичь цели, но если объем данных этого вида бизнеса велик, его будет очень сложно обрабатывать, что будет сильно нагружать сервер, и будут большие ошибки при опросе. Таких проблем можно избежать, если использовать отложенную очередь.
Сам Rabbitmq напрямую не поддерживает очереди задержки.Очереди задержки RabbitMQ реализованы на основе сообщения TTL (время жизни) и обмена недоставленными письмами DLE (обмен недоставленными письмами):
- TTL: RabbitMQ может установить время выживания для очереди и сообщения соответственно.Правило — это меньшее значение из двух, то есть время истечения срока действия сообщения, когда очередь не имеет соединения с потребителем, или время истечения срока действия, когда сообщение не было израсходовано в очереди.
- DLE: просроченные сообщения направляются в указанную очередь недоставленных сообщений через связанный переключатель недоставленных сообщений, и потребители фактически потребляют сообщения в очереди недоставленных сообщений.
Этим достигается эффект задержки.
Установите RabbitMQ
В этой статье в качестве примера используется установка среды Linux:
Поскольку rabbitmq написан на erlang, требуется среда erlang.
Установить среду erlang
-
Установите GCC GCC-C++ Openssl
yum -y install make gcc gcc-c++ kernel-devel m4 ncurses-devel openssl-devel
-
установить нкурсес
yum -y install ncurses-devel
-
установить эрланг
wget http://erlang.org/download/otp_src_18.2.1.tar.gz tar xvfz otp_src_18.2.1.tar.gz ./configure make install
Установите RabbitMQ
После установки таким способом файл в формате xz, и нужно распаковать xz
xz -d rabbitmq-server-generic-unix-3.8.3.tar.xz tar -xvf rabbitmq-server-generic-unix-3.8.3.tar
Если инструмент декомпрессии xz отсутствует, сначала необходимо установить инструмент xz.
yum install xz
Скопируйте распакованные файлы в /usr/local/
cp -r rabbitmq_server-3.8.3 /usr/local/rabbitmq
Изменить файл профиляvim /etc/profile
Добавьте следующее
export PATH=/usr/local/rabbitmq/sbin:$PATH
воплощать в жизньsource /etc/profile
сделать изменения вступившими в силу
Откройте режим управления rabbitmq
rabbitmq-plugins enable rabbitmq_management #启动后台管理界面 rabbitmq-server -detached #后台运行rabbitmq
Пользователь-гость не может получить доступ по умолчанию и должен изменить разрешения.
Добавить пользователя: rabbitmqctl add_user admin admin
Добавляем разрешения: rabbitmqctl set_permissions -p"/"pwd123"." "." ".*"
Изменить роли пользователей: rabbitmqctl set_user_tags admin администратор
Здесь вы можете увидеть следующий интерфейс, обратившись к ip:15672.
Успешный вход в систему с вновь созданным пользователем. На этом установка rabbitmq завершена. Если вы используете Alibaba Cloud Server, не забудьте открыть информацию о порте группы безопасности.
Установите плагин задержки
cd /usr/local/rabbitmq/plugins
Перейдите в каталог плагина
скачать плагин задержки rabbiimq
Адрес плагина:woohoo.rabbitcurrent.com/community-afraid…
оказатьсяrabbitmq_delayed_message_exchangeВерсия выбора плагина
Щелкните правой кнопкой мыши формат .ez, чтобы скопировать адрес ссылки, чтобы получить адрес загрузки.
выполнить в текущем каталоге
wget GitHub.com/кролик присутствует/люди… bbitmq_delayed_message_exchange-3.8.0.ez
Включить плагин задержки
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
Проверить результаты установки плагина
Если вы видите, что тип может быть выбран как тип x-delayed-message, это означает, что плагин задержки установлен успешно.
springboot2.0+ использует RabbitMQ
Войдите на официальный сайт spring и создайте структуру проекта springboot через официальный сайт. ps: Также можно использовать идею, эта статья знакомит с формой официального сайта весны
В этой статье выбраны зависимости, которые будут использоваться позже. Нажмите «Создать» ниже, чтобы сгенерировать проект, разархивировать его в рабочую среду Java и открыть с помощью idea.
простая очередь
-
Добавить информацию о файле конфигурации rabbitmq
spring.application.name=rabbitmq-delay-demo server.port=8081 spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 spring.rabbitmq.username=用户 spring.rabbitmq.password=连接密码
-
Написать класс конфигурации rabbitmq
package com.yezi.rabbitmqdelaydemo.config; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitMqConfig { /** * 初始化一个测试队列 * * @return */ @Bean public Queue helloQueue() { return new Queue("test.mq"); } }
-
Напишите класс отправки сообщений
package com.yezi.rabbitmqdelaydemo.mq; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.time.LocalDateTime; @Component public class Sender { @Autowired private AmqpTemplate rabbitTemplate; public void send() { String context = "测试发送消息 " + LocalDateTime.now(); System.out.println("Sender : " + context); this.rabbitTemplate.convertAndSend("test.mq", context); } }
-
Напишите класс приема сообщений
package com.yezi.rabbitmqdelaydemo.mq; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component @RabbitListener(queues = "test.mq") public class Receiver { @RabbitHandler public void process(String context) { System.out.println("Receiver : " + context); } }
-
тестовая отправка
package com.yezi.rabbitmqdelaydemo; import com.yezi.rabbitmqdelaydemo.mq.Sender; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; @SpringBootTest class RabbitmqDelayDemoApplicationTests { @Autowired private Sender sender; @Test void contextLoads() { sender.send(); } }
-
Результаты теста
Приведенный выше тест простого сообщения прошел успешно.
очередь задержки
-
Напишите класс конфигурации
package com.yezi.rabbitmqdelaydemo.config; import com.yezi.rabbitmqdelaydemo.mq.delayed.DelayedTopic; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.CustomExchange; import org.springframework.amqp.core.Queue; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; @Configuration public class RabbitMqDelayedConfig { /** * 初始化延时队列 * * @return */ @Bean public Queue delayedQueue() { return new Queue(DelayedTopic.DELAYED_QUEUE_NAME); } /** * 定义一个延迟交换机 * * @return */ @Bean public CustomExchange delayExchange() { Map<String, Object> args = new HashMap<String, Object>(); args.put("x-delayed-type", "direct"); return new CustomExchange(DelayedTopic.DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, args); } /** * 绑定队列到这个延迟交换机上 * * @param queue * @param customExchange * @return */ @Bean public Binding bindingNotify(@Qualifier("delayedQueue") Queue queue, @Qualifier("delayExchange") CustomExchange customExchange) { return BindingBuilder.bind(queue).to(customExchange).with(DelayedTopic.DELAYED_ROUTING_KEY).noargs(); } }
Напишите класс определения ключевого слова
package com.yezi.rabbitmqdelaydemo.mq.delayed; public interface DelayedTopic { String DELAYED_EXCHANGE_NAME = "delay_exchange"; String DELAYED_QUEUE_NAME = "delayed.queue"; String DELAYED_ROUTING_KEY = "delayed.queue.routingkey"; }
-
написать сообщение отправителю
package com.yezi.rabbitmqdelaydemo.mq.delayed; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.time.LocalDateTime; @Component public class DelayedSender { @Autowired private AmqpTemplate rabbitTemplate; public void delayedMessage() { String context = "test delay message"; System.out.println("Send time: " + LocalDateTime.now() + " Send: " + context); //延时时间6秒 rabbitTemplate.convertAndSend(DelayedTopic.DELAYED_EXCHANGE_NAME, DelayedTopic.DELAYED_ROUTING_KEY, context, a -> { a.getMessageProperties().setDelay(6000); return a; }); } }
-
Напишите сообщение потребителю
package com.yezi.rabbitmqdelaydemo.mq.delayed; import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.io.IOException; import java.time.LocalDateTime; @Component public class DelayedReceiver { @RabbitListener(queues = DelayedTopic.DELAYED_QUEUE_NAME) public void receive(Message message, Channel channel) throws IOException { String s = new String(message.getBody()); System.out.println("Received time: " + LocalDateTime.now() + " Received: " + s); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } }
-
тестовая отправка
package com.yezi.rabbitmqdelaydemo.controller; import com.yezi.rabbitmqdelaydemo.mq.delayed.DelayedSender; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; @RestController public class DemoController { @Autowired private DelayedSender delayedSender; /** * 测试发送延时消息 * * @return */ @GetMapping("/delayedSender") public String delayedSender() { delayedSender.delayedMessage(); return "ojbk"; } }
- Результаты проверки
Пока видно, что сообщение получено через 6 секунд, а задержанное сообщение отправлено и получено успешно.
Способ SCS реализовать очередь задержки
Что такое СКС
SCS — это сокращение от Spring Cloud Stream, фреймворка для создания хорошо масштабируемых управляемых событиями микросервисов, подключенных к общим системам обмена сообщениями. Spring Cloud Stream предоставляет персонализированные реализации автоматической конфигурации для продуктов промежуточного программного обеспечения для обмена сообщениями некоторых поставщиков и представляет три основные концепции публикации-подписки, групп потребителей и разделения. Используя Spring Cloud Stream, разработчики могут эффективно упростить использование промежуточного программного обеспечения для обработки сообщений, чтобы системные разработчики могли больше сосредоточиться на обработке основной бизнес-логики.
Очередь задержки SCS
-
Напишите конфигурацию канала
package com.yezi.scsdemo.mq; import org.springframework.cloud.stream.annotation.Input; import org.springframework.cloud.stream.annotation.Output; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.SubscribableChannel; public interface DelayedSink { String OUTPUT = "delayed-topic-output"; String INPUT = "delayed-topic-input"; @Output(OUTPUT) MessageChannel output(); @Input(INPUT) SubscribableChannel input(); }
-
написать сообщение отправителю
package com.yezi.scsdemo.mq; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Component; import java.time.LocalDateTime; @Component public class DelayedSender { @Autowired private DelayedSink delayedSink; public void delayedMessage() { String context = "test delay message"; System.out.println("Send time: " + LocalDateTime.now() + " Send: " + context); //延时时间20秒 delayedSink.output().send(MessageBuilder.withPayload(context).setHeader("x-delay", 20000).build()); } }
-
Напишите сообщение потребителю
package com.yezi.scsdemo.mq; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.stereotype.Component; import java.time.LocalDateTime; @EnableBinding(DelayedSink.class) @Component public class DelayedReceiver { @StreamListener(DelayedSink.INPUT) public void receive(String message) { System.out.println("Received time: " + LocalDateTime.now() + " Received: " + message); } }
Существует несколько основных аннотаций SCS. Конкретный контент можно найти в этом блоге. Здесь он не будет расширяться.
blog.brotherspace.com/spring-ugly…Автор: Программист ДД
-
Изменить файл конфигурации
application.properties
Добавьте следующую информацию:server.port=8080 spring.application.name=rabbitmq-delay-scs-demo spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 spring.rabbitmq.username=账号 spring.rabbitmq.password=密码 #以下为生产者端配置 #将发送者队列绑定到指定交换机 spring.cloud.stream.bindings.delayed-topic-output.destination=delayed-topic-demo #开启延时,生产者和消费者端都需要开启这个配置 spring.cloud.stream.rabbit.bindings.delayed-topic-output.producer.delayed-exchange=true #以下为消费者端配置 #将消费者队列绑定到指定交换机 spring.cloud.stream.bindings.delayed-topic-input.destination=delayed-topic-demo #消费默认分组,消息到达时同一个分组下多个实例情况,只会有一个实例消费这条消息 spring.cloud.stream.bindings.delayed-topic-input.group=group-1 #开启延时,生产者和消费者端都需要开启这个配置 spring.cloud.stream.rabbit.bindings.delayed-topic-input.consumer.delayed-exchange=true
-
Вставьте сюда зависимость файла pom
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency> </dependencies>
-
тестовая отправка
package com.yezi.scsdemo.controller; import com.yezi.scsdemo.mq.DelayedSender; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; @RestController public class DemoController { @Autowired private DelayedSender delayedSender; /** * 测试发送延时消息 * * @return */ @GetMapping("/scs/delayedSender") public String delayedSender() { delayedSender.delayedMessage(); return "scs ojbk"; } }
- Результаты проверки
Здесь мы видим, что пришло сообщение. Так как мы установили время задержки 20 секунд при отправке, подождав 20 секунд, просмотрите информацию в консоли идеи
Видно, что через 20 секунд сообщение потребляется, и вышеприведенный тест сообщения о задержке SCS прошел успешно.
резюме
В целом, по сравнению с использованием конфигурационных классов для реализации отложенных сообщений, SCS имеет более лаконичный код и не требует привязки переключателей, нужно только реализовать привязку очередей и переключателей в виде конфигурации в свойствах или yml Пользователи могут больше сосредоточиться на бизнесе при использовании SCS!