Для очень подробного ознакомления с RabbitMQ этого достаточно!

Java
Для очень подробного ознакомления с RabbitMQ этого достаточно!

карта разума

1. Что такое очередь сообщений

ИнформацияОтносится к данным, передаваемым между двумя приложениями. Типы данных бывают разных форм и могут содержать только текстовые строки или внедренные объекты.

«Очередь сообщений» — это контейнер, в котором хранятся сообщения во время их передачи.. В очереди сообщений обычно есть две роли производителя и потребителя. Производитель отвечает только за отправку данных в очередь сообщений, и ему все равно, кто выносит обработку данных из очереди сообщений. Потребитель отвечает только за обработку данных из очереди сообщений, ему все равно, кто отправил данные.

2. Зачем использовать очереди сообщений

Есть три основные функции:

  • разъединение. как показано на рисунке. Предполагая, что всем системам B, C и D нужны данные из системы A, система A вызывает три метода для отправки данных в B, C и D. В настоящее время система D больше не нужна, поэтому соответствующий код необходимо удалить в системе A. Предположим, что есть новая система E, которой в данный момент нужны данные, а затем системе A нужно добавить код, вызывающий систему E. Чтобы уменьшить эту сильную связь, вы можете использовать MQ,Системе А нужно только отправлять данные в MQ. Если другим системам нужны данные, они могут получить их из MQ..

  • асинхронный. как показано на рисунке. Когда клиентский запрос отправлен, система A вызовет три системы системы B, C и D. Если запрос синхронизирован, время ответа равно сумме систем A, B, C и D, что составляет 800 мс. .Если используется MQ, система A отправляет данные в MQ, а затем может вернуть ответ клиенту, не дожидаясь ответов от систем B, C и D, что может значительно повысить производительность.. Для некоторых второстепенных услуг, таких как отправка текстовых сообщений, отправка электронных писем и т. д., можно использовать MQ.

  • Отсечение пиков. как показано на рисунке. На самом деле это очень важное приложение MQ. Предположим, что количество запросов системы А резко возрастает в определенный период времени, и отправлено 5000 запросов.В это время система А отправит в MySQL на выполнение 5000 SQL-запросов.Конечно, MySQL не может обработать такой огромный запрос, и MySQL рухнет, что приведет к краху системы.Если используется MQ, система A больше не отправляет SQL напрямую в базу данных, а отправляет данные в MQ. Для MQ допустимо иметь невыполненные данные в течение короткого периода времени, и тогда потребители будут извлекать 2000 элементов для обработки каждого время для предотвращения пиковых запросов Период большое количество запросов отправляется непосредственно в MySQL, что приводит к сбою системы.

3. Возможности RabbitMQ

RabbitMQ — это промежуточное ПО для сообщений с открытым исходным кодом, разработанное с использованием языка Erlang и реализующее AMQP (Advanced Message Queuing Protocol). Прежде всего, вам нужно знать некоторые особенности RabbitMQ,Официальный сайтДоступно для проверки:

  • надежность. Поддержка сохранения, подтверждения передачи, подтверждения выпуска и т. д. обеспечивает надежность MQ.
  • Гибкая стратегия рассылки сообщений. Это должно быть основной функцией RabbitMQ. Сообщение маршрутизируется Exchange (обменом) до того, как сообщение войдет в MQ. Стратегии распространения сообщений включают: простой режим, режим рабочей очереди, режим публикации-подписки, режим маршрутизации и режим подстановочных знаков.
  • Кластеризация поддерживается. Несколько серверов RabbitMQ могут формировать кластер для создания логического брокера.
  • Различные протоколы. RabbitMQ поддерживает различные протоколы очередей сообщений, такие как STOMP, MQTT и другие.
  • Поддерживает несколько языковых клиентов. RabbitMQ поддерживает почти все распространенные языки программирования, включая Java, .NET, Ruby и другие.
  • Визуальный интерфейс управления. RabbitMQ предоставляет простой в использовании пользовательский интерфейс, который позволяет пользователям отслеживать брокеры сообщений и управлять ими.
  • Механизм плагина. RabbitMQ предоставляет множество плагинов, которые можно расширить с помощью плагинов, или вы можете написать свои собственные плагины.

В-четвертых, первый опыт RabbitMQ

4.1 Установите RabbitMQ (система Win10)

Так как это только для учебных нужд, лень открывать виртуальную машину, когда она установлена ​​в системе win10. Если вы устанавливаете его в системе Linux, я рекомендую использовать Docker для извлечения образа RabbitMQ, что будет более удобно.

4.1.1 Установите язык erLang и настройте переменные среды

первый в эрлангеОфициальный сайтЗагрузите версию установочного пакета для win10.

После скачивания вы получаете вот это:

Затем дважды щелкните установку, и все время нажимайте «Далее» (Next).После завершения установки настройте переменные среды.

С помощью команды cmd введите erl -version для проверки:

4.1.2 Установка сервера RabbitMQ

в RabbitMQпроект gitHub, загрузите установочный пакет сервера версии для Windows.

После скачивания вы получаете вот это:

Затем дважды щелкните, чтобы установить, и нажмите Далее, чтобы установить.После завершения установки найдите каталог установки:

Откройте команду cmd в этом каталоге и введите команду rabbitmq-plugins enable rabbitmq_management, чтобы установить плагин для страницы управления:

Затем дважды щелкните rabbitmq-server.bat, чтобы запустить сценарий, а затем откройте управление службами, чтобы увидеть, что RabbitMQ работает:

В этот момент откройте браузер и введитеhttp://localhost:15672, пароль учетной записи по умолчанию: guest/guest

На этом установка завершена!

4.2 Навсегда Привет Слово

После того, как сервер построен, клиент должен использоваться для его работы.Далее мы будем использовать Java, чтобы сделать простую демонстрацию HelloWord.

Поскольку я использую SpringBoot, поэтому всторона производителяПросто добавьте соответствующие стартовые зависимости:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

Как правило, необходимо создать общий проект для совместного использования некоторой конфигурации, такой как темы очередей, имена обмена, имена ключей сопоставления маршрутов и т. д.

Сначала добавьте информацию о конфигурации RabbitMQ в файл application.yml:

spring:
    rabbitmq:
        host: 127.0.0.1
        port: 5672
        username: guest
        password: guest

Затем на стороне производителя добавьте зависимость maven общего пакета, а затем создайте класс конфигурации для прямого переключателя и очереди:

@Configuration
public class DirectRabbitConfig {
    @Bean
    public Queue rabbitmqDemoDirectQueue() {
        /**
         * 1、name:    队列名称
         * 2、durable: 是否持久化
         * 3、exclusive: 是否独享、排外的。如果设置为true,定义为排他队列。则只有创建者可以使用此队列。也就是private私有的。
         * 4、autoDelete: 是否自动删除。也就是临时队列。当最后一个消费者断开连接后,会自动删除。
         * */
        return new Queue(RabbitMQConfig.RABBITMQ_DEMO_TOPIC, true, false, false);
    }
    
    @Bean
    public DirectExchange rabbitmqDemoDirectExchange() {
        //Direct交换机
        return new DirectExchange(RabbitMQConfig.RABBITMQ_DEMO_DIRECT_EXCHANGE, true, false);
    }

    @Bean
    public Binding bindDirect() {
        //链式写法,绑定交换机和队列,并设置匹配键
        return BindingBuilder
                //绑定队列
                .bind(rabbitmqDemoDirectQueue())
                //到交换机
                .to(rabbitmqDemoDirectExchange())
                //并设置匹配键
                .with(RabbitMQConfig.RABBITMQ_DEMO_DIRECT_ROUTING);
    }
}

Затем создайте класс службы, который отправляет сообщения:

@Service
public class RabbitMQServiceImpl implements RabbitMQService {
    //日期格式化
    private static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

    @Resource
    private RabbitTemplate rabbitTemplate;

    @Override
    public String sendMsg(String msg) throws Exception {
        try {
            String msgId = UUID.randomUUID().toString().replace("-", "").substring(0, 32);
            String sendTime = sdf.format(new Date());
            Map<String, Object> map = new HashMap<>();
            map.put("msgId", msgId);
            map.put("sendTime", sendTime);
            map.put("msg", msg);
            rabbitTemplate.convertAndSend(RabbitMQConfig.RABBITMQ_DEMO_DIRECT_EXCHANGE, RabbitMQConfig.RABBITMQ_DEMO_DIRECT_ROUTING, map);
            return "ok";
        } catch (Exception e) {
            e.printStackTrace();
            return "error";
        }
    }
}

Затем разместите его там, где он должен использоваться в соответствии с бизнесом, например, в запланированных задачах или интерфейсах. Здесь я просто буду использовать слой Controller для отправки:

@RestController
@RequestMapping("/mall/rabbitmq")
public class RabbitMQController {
    @Resource
    private RabbitMQService rabbitMQService;
    /**
     * 发送消息
     * @author java技术爱好者
     */
    @PostMapping("/sendMsg")
    public String sendMsg(@RequestParam(name = "msg") String msg) throws Exception {
        return rabbitMQService.sendMsg(msg);
    }
}

После того, как производитель закончит писать, напишите код на стороне потребителя, а потребитель очень прост. maven, конфигурация файла yml такая же, как у производителя. Просто создайте класс и напишите имя очереди прослушивания в аннотации @RabbitListener, как показано на рисунке:

Есть небольшая дыра, очередь не была создана на сервере RabbitMQ в начале:

В это время, если потребитель запущен, будет сообщено об ошибке:

Чтобы сначала запустить производителя, отправьте сообщение:

Наконец, снова запустите потребителя и потребляйте:

В это время сообщение очереди будет постоянно отслеживаться.Пока производитель отправляет сообщение в MQ, потребитель будет потреблять его. Я пытаюсь отправить 4 здесь:

Поскольку очереди не существует, сообщается о проблеме запуска ошибки потребителя. Лучше всего, чтобы и производитель, и потребитель пытались создать очередь. Как это написать? Есть много способов. Я использую относительно простой здесь:

Класс конфигурации производителя добавляет что-то:

//实现BeanPostProcessor类,使用Bean的生命周期函数
@Component
public class DirectRabbitConfig implements BeanPostProcessor {
    //这是创建交换机和队列用的rabbitAdmin对象
    @Resource
    private RabbitAdmin rabbitAdmin;
    
    //初始化rabbitAdmin对象
    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        // 只有设置为 true,spring 才会加载 RabbitAdmin 这个类
        rabbitAdmin.setAutoStartup(true);
        return rabbitAdmin;
    }
    
    //实例化bean后,也就是Bean的后置处理器
    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        //创建交换机
        rabbitAdmin.declareExchange(rabbitmqDemoDirectExchange());
        //创建队列
        rabbitAdmin.declareQueue(rabbitmqDemoDirectQueue());
        return null;
    }
}

Таким образом, запуск производителя автоматически создаст обмен и очередь, не дожидаясь отправки сообщения.

Потребителю нужно добавить небольшой код:

@Component
//使用queuesToDeclare属性,如果不存在则会创建队列
@RabbitListener(queuesToDeclare = @Queue(RabbitMQConfig.RABBITMQ_DEMO_TOPIC))
public class RabbitDemoConsumer {
    //...省略
}

Таким образом, не возникнет проблем, кто начинает первым: производитель или потребитель~

Кодовый адрес:GitHub.com/yehongqin/no…

В-пятых, компоненты RabbitMQ

Из приведенного выше примера HelloWord мы, вероятно, можем увидеть некоторые из них, то есть состав RabbitMQ, который состоит из следующих частей:

  • Брокер: процесс службы очереди сообщений. Этот процесс состоит из двух частей: Exchange и Queue.
  • Exchange: обмен очередями сообщений.Направлять сообщения в очередь в соответствии с определенными правилами.
  • Queue: очередь сообщений, очередь для хранения сообщений.
  • Производитель: производитель сообщения. Клиент производителя направляет сообщение в очередь с маршрутом обмена.
  • Потребитель: потребитель сообщений. Использовать сообщения, хранящиеся в очереди.

Как эти компоненты работают вместе?Общий процесс выглядит следующим образом, см. следующий рисунок:

  • Производитель сообщений подключается к RabbitMQ Broker, создает соединение и открывает канал.
  • Производитель объявляет тип переключателя, имя, постоянство и т. д.
  • Производитель отправляет сообщение и указывает, является ли сообщение постоянным, а также другие атрибуты и ключи маршрутизации.
  • После того, как обмен получает сообщение,Маршрутизация в соответствующую очередь, привязанную к текущему коммутатору в соответствии с ключом маршрутизациив.
  • Потребитель начинает бизнес-обработку после прослушивания полученного сообщения.

Шесть, четыре типа и использование Exchange

Как видно из приведенного выше рабочего процесса, на самом деле есть ключевой компонент Exchange, т.к.После отправки сообщения в RabbitMQ оно должно сначала пройти маршрутизацию Exchange, чтобы найти соответствующую очередь..

На самом деле существует четыре типа Exchange, и они работают по-разному в зависимости от типа. В примере с HelloWord мы используем относительно простойDirect Exchange, трансляция напрямую связана с коммутатором. Остальные три:Обмен ответвлениями, Обмен темами, Обмен заголовками.

6.1 Direct Exchange

Как видно из текста, напрямую подключенный коммутатор означает, что этот коммутатор должен быть привязан к очереди, для чего требуетсяСообщение точно соответствует определенному ключу маршрутизации. Проще говоря, это отправка «один к одному», «точка-точка».

Полный код — это приведенный выше пример HelloWord, без повторения кода.

6.2 Fanout exchange

Этот тип обмена требует, чтобы очередь была привязана к обмену.Сообщение, отправленное на биржу, перенаправляется во все очереди, связанные с биржой.. Как и при широковещании подсети, узлы в каждой подсети получают копию сообщения. Проще говоря, это публикация и подписка.

Как писать код, покажите:

Сначала настройте имена коммутаторов и очередей:

public class RabbitMQConfig {
    /**
     * RabbitMQ的FANOUT_EXCHANG交换机类型的队列 A 的名称
     */
    public static final String FANOUT_EXCHANGE_QUEUE_TOPIC_A = "fanout.A";

    /**
     * RabbitMQ的FANOUT_EXCHANG交换机类型的队列 B 的名称
     */
    public static final String FANOUT_EXCHANGE_QUEUE_TOPIC_B = "fanout.B";

    /**
     * RabbitMQ的FANOUT_EXCHANG交换机类型的名称
     */
    public static final String FANOUT_EXCHANGE_DEMO_NAME = "fanout.exchange.demo.name";

}

Затем настройте переключатель типа FanoutExchange и две очереди A и B и привяжите их. Этот тип не требует настройки ключа маршрутизации:

@Component
public class DirectRabbitConfig implements BeanPostProcessor {
    @Resource
    private RabbitAdmin rabbitAdmin;
    
    @Bean
    public Queue fanoutExchangeQueueA() {
        //队列A
        return new Queue(RabbitMQConfig.FANOUT_EXCHANGE_QUEUE_TOPIC_A, true, false, false);
    }

    @Bean
    public Queue fanoutExchangeQueueB() {
        //队列B
        return new Queue(RabbitMQConfig.FANOUT_EXCHANGE_QUEUE_TOPIC_B, true, false, false);
    }

    @Bean
    public FanoutExchange rabbitmqDemoFanoutExchange() {
        //创建FanoutExchange类型交换机
        return new FanoutExchange(RabbitMQConfig.FANOUT_EXCHANGE_DEMO_NAME, true, false);
    }

    @Bean
    public Binding bindFanoutA() {
        //队列A绑定到FanoutExchange交换机
        return BindingBuilder.bind(fanoutExchangeQueueA()).to(rabbitmqDemoFanoutExchange());
    }

    @Bean
    public Binding bindFanoutB() {
        //队列B绑定到FanoutExchange交换机
        return BindingBuilder.bind(fanoutExchangeQueueB()).to(rabbitmqDemoFanoutExchange());
    }
    
    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        //启动项目即创建交换机和队列
        rabbitAdmin.declareExchange(rabbitmqDemoFanoutExchange());
        rabbitAdmin.declareQueue(fanoutExchangeQueueB());
        rabbitAdmin.declareQueue(fanoutExchangeQueueA());
        return null;
    }
}

Способ создания сервиса для публикации сообщения:

@Service
public class RabbitMQServiceImpl implements RabbitMQService {
    private static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

    @Resource
    private RabbitTemplate rabbitTemplate;
    
    //发布消息
    @Override
    public String sendMsgByFanoutExchange(String msg) throws Exception {
        Map<String, Object> message = getMessage(msg);
        try {
            rabbitTemplate.convertAndSend(RabbitMQConfig.FANOUT_EXCHANGE_DEMO_NAME, "", message);
            return "ok";
        } catch (Exception e) {
            e.printStackTrace();
            return "error";
        }
    }
    //组装消息体
    private Map<String, Object> getMessage(String msg) {
        String msgId = UUID.randomUUID().toString().replace("-", "").substring(0, 32);
        String sendTime = sdf.format(new Date());
        Map<String, Object> map = new HashMap<>();
        map.put("msgId", msgId);
        map.put("sendTime", sendTime);
        map.put("msg", msg);
        return map;
    }
}

Интерфейс контроллера:

@RestController
@RequestMapping("/mall/rabbitmq")
public class RabbitMQController {
    /**
     * 发布消息
     *
     * @author java技术爱好者
     */
    @PostMapping("/publish")
    public String publish(@RequestParam(name = "msg") String msg) throws Exception {
        return rabbitMQService.sendMsgByFanoutExchange(msg);
    }
}

Затем на стороне потребительского проекта создайте два класса прослушивателя очереди, чтобы прослушивать очередь для потребления:

@Component
@RabbitListener(queuesToDeclare = @Queue(RabbitMQConfig.FANOUT_EXCHANGE_QUEUE_TOPIC_A))
public class FanoutExchangeConsumerA {

    @RabbitHandler
    public void process(Map<String, Object> map) {
        System.out.println("队列A收到消息:" + map.toString());
    }

}
@Component
@RabbitListener(queuesToDeclare = @Queue(RabbitMQConfig.FANOUT_EXCHANGE_QUEUE_TOPIC_B))
public class FanoutExchangeConsumerB {

    @RabbitHandler
    public void process(Map<String, Object> map) {
        System.out.println("队列B收到消息:" + map.toString());
    }
}

Затем запустите проекты производителя и потребителя, вы увидите, что интерфейс управления создал коммутатор FanoutExchange и две очереди и связал их:

Используйте POSTMAN для отправки сообщений, проверьте:

Затем вы можете увидеть консоль, обе очереди получили одно и то же сообщение одновременно, формируя эффект публикации и подписки:

6.3 Topic Exchange

Если он переводится напрямую, это называется обменом темами, если он переводится из употребления, его можно назвать обменом подстановочными знаками, что более уместно. Этот переключатель использует подстановочные знаки для сопоставления и маршрутизации в соответствующую очередь. Существует два типа подстановочных знаков: «*» и «#». Следует отметить, что перед подстановочным знаком должен стоять символ «.».

*Символ: соответствует одному и только одному слову. Напримерa.*Может соответствовать "ab", "ac", но не "abc".

#Символ: соответствует одному или нескольким словам. Например, «rabbit.#» может соответствовать «rabbit.a.b», «rabbit.a» или «rabbit.a.b.c».

Без лишних слов давайте продемонстрируем код:

Еще настройте имя TopicExchange и имена трех очередей:

    /**
     * RabbitMQ的TOPIC_EXCHANGE交换机名称
     */
    public static final String TOPIC_EXCHANGE_DEMO_NAME = "topic.exchange.demo.name";

    /**
     * RabbitMQ的TOPIC_EXCHANGE交换机的队列A的名称
     */
    public static final String TOPIC_EXCHANGE_QUEUE_A = "topic.queue.a";

    /**
     * RabbitMQ的TOPIC_EXCHANGE交换机的队列B的名称
     */
    public static final String TOPIC_EXCHANGE_QUEUE_B = "topic.queue.b";

    /**
     * RabbitMQ的TOPIC_EXCHANGE交换机的队列C的名称
     */
    public static final String TOPIC_EXCHANGE_QUEUE_C = "topic.queue.c";

Тогда еще старый рецепт, настроить свичи и очереди, потом биндить и создавать:

@Component
public class DirectRabbitConfig implements BeanPostProcessor {
    //省略...
    
    @Bean
    public TopicExchange rabbitmqDemoTopicExchange() {
        //配置TopicExchange交换机
        return new TopicExchange(RabbitMQConfig.TOPIC_EXCHANGE_DEMO_NAME, true, false);
    }

    @Bean
    public Queue topicExchangeQueueA() {
        //创建队列1
        return new Queue(RabbitMQConfig.TOPIC_EXCHANGE_QUEUE_A, true, false, false);
    }

    @Bean
    public Queue topicExchangeQueueB() {
        //创建队列2
        return new Queue(RabbitMQConfig.TOPIC_EXCHANGE_QUEUE_B, true, false, false);
    }

    @Bean
    public Queue topicExchangeQueueC() {
        //创建队列3
        return new Queue(RabbitMQConfig.TOPIC_EXCHANGE_QUEUE_C, true, false, false);
    }

    @Bean
    public Binding bindTopicA() {
        //队列A绑定到FanoutExchange交换机
        return BindingBuilder.bind(topicExchangeQueueB())
                .to(rabbitmqDemoTopicExchange())
                .with("a.*");
    }

    @Bean
    public Binding bindTopicB() {
        //队列A绑定到FanoutExchange交换机
        return BindingBuilder.bind(topicExchangeQueueC())
                .to(rabbitmqDemoTopicExchange())
                .with("a.*");
    }

    @Bean
    public Binding bindTopicC() {
        //队列A绑定到FanoutExchange交换机
        return BindingBuilder.bind(topicExchangeQueueA())
                .to(rabbitmqDemoTopicExchange())
                .with("rabbit.#");
    }
    
    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        rabbitAdmin.declareExchange(rabbitmqDemoTopicExchange());
        rabbitAdmin.declareQueue(topicExchangeQueueA());
        rabbitAdmin.declareQueue(topicExchangeQueueB());
        rabbitAdmin.declareQueue(topicExchangeQueueC());
        return null;
    }
}

Затем напишите сервисный метод, который отправляет сообщение:

@Service
public class RabbitMQServiceImpl implements RabbitMQService {
    @Override
    public String sendMsgByTopicExchange(String msg, String routingKey) throws Exception {
        Map<String, Object> message = getMessage(msg);
        try {
            //发送消息
            rabbitTemplate.convertAndSend(RabbitMQConfig.TOPIC_EXCHANGE_DEMO_NAME, routingKey, message);
            return "ok";
        } catch (Exception e) {
            e.printStackTrace();
            return "error";
        }
    }
}

Напишите интерфейс контроллера:

@RestController
@RequestMapping("/mall/rabbitmq")
public class RabbitMQController {
    @Resource
    private RabbitMQService rabbitMQService;
    
    /**
     * 通配符交换机发送消息
     *
     * @author java技术爱好者
     */
    @PostMapping("/topicSend")
    public String topicSend(@RequestParam(name = "msg") String msg, @RequestParam(name = "routingKey") String routingKey) throws Exception {
        return rabbitMQService.sendMsgByTopicExchange(msg, routingKey);
    }
}

После того, как производитель закончит писать, напишите потребительскую сторону.Потребительская сторона относительно проста, и пишутся три класса мониторинга:

@Component
@RabbitListener(queuesToDeclare = @Queue(RabbitMQConfig.TOPIC_EXCHANGE_QUEUE_A))
public class TopicExchangeConsumerA {

    @RabbitHandler
    public void process(Map<String, Object> map) {
        System.out.println("队列[" + RabbitMQConfig.TOPIC_EXCHANGE_QUEUE_A + "]收到消息:" + map.toString());
    }
}

@Component
@RabbitListener(queuesToDeclare = @Queue(RabbitMQConfig.TOPIC_EXCHANGE_QUEUE_B))
public class TopicExchangeConsumerB {

    @RabbitHandler
    public void process(Map<String, Object> map) {
        System.out.println("队列[" + RabbitMQConfig.TOPIC_EXCHANGE_QUEUE_B+ "]收到消息:" + map.toString());
    }
}

@Component
@RabbitListener(queuesToDeclare = @Queue(RabbitMQConfig.TOPIC_EXCHANGE_QUEUE_C))
public class TopicExchangeConsumerC {

    @RabbitHandler
    public void process(Map<String, Object> map) {
        System.out.println("队列[" + RabbitMQConfig.TOPIC_EXCHANGE_QUEUE_C + "]收到消息:" + map.toString());
    }
}

Готово, затем запустите проект и начните отладку. После успешного запуска вы можете увидеть взаимосвязь между очередью и привязкой ключа маршрутизации:

Протестируйте через POSTMAN, чтобы проверить, можно ли успешно сопоставить ключ маршрутизации rabbit.#:

Тест прошел успешно, очередь A потребляет сообщение:

Затем проверьте ключи маршрутизации a.*, отправив routingKey = a.b :

Чаще используются три вышеуказанных: прямое подключение (DirectExchange), публикация и подписка (FanoutExchange), подстановочный знак (TopicExchange). Умелое использование этих трех типов коммутаторов может решить большинство бизнес-сценариев.

На самом деле, если вы немного подумаете об этом, вы обнаружите, что режим подстановочных знаков (TopicExchange) фактически может достигать двух эффектов: прямого подключения (DirectExchange) и публикации и подписки (FanoutExchange).

FanoutExchange не нужно привязывать routingKey, поэтому производительность будет лучше, чем у TopicExchange.

6.4 Headers Exchange

Таких выключателей относительно немного.Он немного отличается от трех предыдущих: его маршрутизация осуществляется не с помощью routingKey, а путем сопоставления значения ключа в заголовке запроса.. как показано на рисунке:

Чтобы создать очередь, вам нужно установить связанную информацию заголовка.Есть два режима:Полное совпадение и частичное совпадение. Как показано на рисунке выше, коммутатор будет сопоставлять значение ключа, связанное с очередью, в соответствии со значением ключа, содержащимся в информации заголовка, отправленной производителем, и направлять в соответствующую очередь. Как реализовать код, посмотрите на демо-код ниже:

Во-первых, вам все еще нужно определить имя коммутатора и имя очереди:

    /**
     * HEADERS_EXCHANGE交换机名称
     */
    public static final String HEADERS_EXCHANGE_DEMO_NAME = "headers.exchange.demo.name";

    /**
     * RabbitMQ的HEADERS_EXCHANGE交换机的队列A的名称
     */
    public static final String HEADERS_EXCHANGE_QUEUE_A = "headers.queue.a";

    /**
     * RabbitMQ的HEADERS_EXCHANGE交换机的队列B的名称
     */
    public static final String HEADERS_EXCHANGE_QUEUE_B = "headers.queue.b";

Затем настройте переключатели, очереди и привязку:

@Component
public class DirectRabbitConfig implements BeanPostProcessor {
    @Bean
    public Queue headersQueueA() {
        return new Queue(RabbitMQConfig.HEADERS_EXCHANGE_QUEUE_A, true, false, false);
    }

    @Bean
    public Queue headersQueueB() {
        return new Queue(RabbitMQConfig.HEADERS_EXCHANGE_QUEUE_B, true, false, false);
    }

    @Bean
    public HeadersExchange rabbitmqDemoHeadersExchange() {
        return new HeadersExchange(RabbitMQConfig.HEADERS_EXCHANGE_DEMO_NAME, true, false);
    }

    @Bean
    public Binding bindHeadersA() {
        Map<String, Object> map = new HashMap<>();
        map.put("key_one", "java");
        map.put("key_two", "rabbit");
        //全匹配
        return BindingBuilder.bind(headersQueueA())
                .to(rabbitmqDemoHeadersExchange())
                .whereAll(map).match();
    }

    @Bean
    public Binding bindHeadersB() {
        Map<String, Object> map = new HashMap<>();
        map.put("headers_A", "coke");
        map.put("headers_B", "sky");
        //部分匹配
        return BindingBuilder.bind(headersQueueB())
                .to(rabbitmqDemoHeadersExchange())
                .whereAny(map).match();
    }

    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        rabbitAdmin.declareExchange(rabbitmqDemoHeadersExchange());
        rabbitAdmin.declareQueue(headersQueueA());
        rabbitAdmin.declareQueue(headersQueueB());
        return null;
    }
}

Напишите еще один метод службы для отправки сообщения:

@Service
public class RabbitMQServiceImpl implements RabbitMQService {
    @Resource
    private RabbitTemplate rabbitTemplate;
    
    @Override
    public String sendMsgByHeadersExchange(String msg, Map<String, Object> map) throws Exception {
        try {
            MessageProperties messageProperties = new MessageProperties();
            //消息持久化
            messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
            messageProperties.setContentType("UTF-8");
            //添加消息
            messageProperties.getHeaders().putAll(map);
            Message message = new Message(msg.getBytes(), messageProperties);
            rabbitTemplate.convertAndSend(RabbitMQConfig.HEADERS_EXCHANGE_DEMO_NAME, null, message);
            return "ok";
        } catch (Exception e) {
            e.printStackTrace();
            return "error";
        }
    }
}

Напишите другой интерфейс контроллера:

@RestController
@RequestMapping("/mall/rabbitmq")
public class RabbitMQController {
    @Resource
    private RabbitMQService rabbitMQService;
    
    @PostMapping("/headersSend")
    @SuppressWarnings("unchecked")
    public String headersSend(@RequestParam(name = "msg") String msg,
                              @RequestParam(name = "json") String json) throws Exception {
        ObjectMapper mapper = new ObjectMapper();
        Map<String, Object> map = mapper.readValue(json, Map.class);
        return rabbitMQService.sendMsgByHeadersExchange(msg, map);
    }
}

После того, как производитель закончит запись, напишите два класса прослушивателя очереди для потребления:

@Component
public class HeadersExchangeConsumerA {
    @RabbitListener(queuesToDeclare = @Queue(RabbitMQConfig.HEADERS_EXCHANGE_QUEUE_A))
    public void process(Message message) throws Exception {
        MessageProperties messageProperties = message.getMessageProperties();
        String contentType = messageProperties.getContentType();
        System.out.println("队列[" + RabbitMQConfig.HEADERS_EXCHANGE_QUEUE_A + "]收到消息:" + new String(message.getBody(), contentType));
    }
}

@Component
public class HeadersExchangeConsumerB {
    @RabbitListener(queuesToDeclare = @Queue(RabbitMQConfig.HEADERS_EXCHANGE_QUEUE_B))
    public void process(Message message) throws Exception {
        MessageProperties messageProperties = message.getMessageProperties();
        String contentType = messageProperties.getContentType();
        System.out.println("队列[" + RabbitMQConfig.HEADERS_EXCHANGE_QUEUE_B + "]收到消息:" + new String(message.getBody(), contentType));
    }
}

Готово ~ Запустите проект, откройте интерфейс управления, мы можем увидеть информацию об очереди привязки обмена:

Это то же самое, что схематическая диаграмма выше ~ это доказывает, что проблем нет, все под контролем. Отправьте с помощью POSTMAN для проверки полностью соответствующей очереди A:

Снова протестируйте частично совпавшую очередь B:

Суммировать

Эта статья написана здесь первой. Повторите то, что вы узнали:

  • Что такое очередь сообщений? Зачем использовать очереди сообщений?
  • Возможности RabbitMQ, компоненты, рабочий процесс
  • Установите RabbitMQ и завершите небольшой кейс HelloWord.
  • Особенности четырех типов переключателей RabbitMQ и способы их использования

На самом деле RabbitMQ тоже имеет механизм транзакций и балансировку нагрузки, потому что он немного длинный, почти 5000 слов. Так что давайте поговорим об этом в следующем выпуске, ждите с нетерпением.

Коды всех приведенных выше примеров выложены на гитхаб:

GitHub.com/yehongqin/no…

Если вы считаете, что эта статья была вам полезна, ставьте лайк~

Ваши лайки - самая большая мотивация для моего творчества~

Если вы хотите впервые увидеть мои обновленные статьи, вы можете выполнить поиск в общедоступной учетной записи на WeChat "java技术爱好者",Не хочу быть соленой рыбой, я программист, стремящийся запомниться всем. Увидимся в следующий раз! ! ! 在这里插入图片描述

Возможности ограничены, если есть какие-то ошибки или неуместности, просьба критиковать и исправлять их, учиться и обмениваться вместе!