Использование RabbitMQ в SpringBoot (1)

RabbitMQ

Exchange

Exchange: Exchange используется для пересылки сообщений, но они не будут сохраняться.Если нет привязки Queue к Exchange, он будет напрямую отбрасывать сообщения, отправленные производителем. Здесь есть более важная концепция: ключи маршрутизации. Когда сообщение поступает на коммутатор, взаимодействие будет переадресовано в соответствующую очередь, поэтому очередь, в которую будет перенаправлено сообщение, зависит от ключа маршрутизации.

Основная функция коммутатора — прием сообщений и пересылка их в связанную очередь.Коммутатор не хранит сообщения.После включения режима ack, если коммутатор не может найти очередь, он вернет ошибку.

Существует четыре типа переключателей: Direct, Topic, Headers и Fanout.

  * Direct:direct 类型的行为是”先匹配, 再投送”. 即在绑定时设定一个 routing_key, 消息的routing_key 匹配时, 才会被交换器投送到绑定的队列中去.
  * Topic:按规则转发消息(最灵活)
  * Headers:设置 header attribute 参数类型的交换机
  * Fanout:转发消息到所有绑定队列(广播模式)

Ниже описано основное использование трех наиболее часто используемых режимов.

Интеграция SpringBoot

Пом-зависимости

 <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

файл конфигурации application.properties

# rabbitmq连接参数
spring.rabbitmq.host=  # mq ip地址
spring.rabbitmq.port=5672 # 端口 默认5672
spring.rabbitmq.username=admin # 用户名
spring.rabbitmq.password=admin # 密码
# 开启发送确认(开启此模式,生产者成功发送到交换机后执行相应的回调函数)
#spring.rabbitmq.publisher-confirms=true
# 开启发送失败退(开启此模式,交换机路由不到队列时执行相应的回调函数)
#spring.rabbitmq.publisher-returns=true
# 开启消费者手动确认 ACK 默认auto
#spring.rabbitmq.listener.direct.acknowledge-mode=manual
#spring.rabbitmq.listener.simple.acknowledge-mode=manual

Direct Exchange

Правило маршрутизации Exchange прямого типа очень простое, оно направит сообщение в очередь, ключ привязки которой точно соответствует ключу маршрутизации.

  • Определите классы конфигурации, зарегистрируйте коммутаторы и очереди и привяжите их
/**
 * Rabbit 配置类
 * @author peng
 */
@Configuration
public class DirectRabbitConfig {

    @Bean
    DirectExchange directExchange(){
        // 注册一个 Direct 类型的交换机 默认持久化、非自动删除
        return new DirectExchange("directExchange");
    }

    @Bean
    Queue infoQueue(){
        // 注册队列
        return new Queue("infoMsgQueue");
    }
    
    @Bean
    Queue warnQueue(){
        return new Queue("warnMsgQueue");
    }
    
    @Bean
    Binding infoToExchangeBinging(Queue infoQueue, DirectExchange directExchange) {
        // 将队列以 info-msg 为绑定键绑定到交换机
        return BindingBuilder.bind(infoQueue).to(directExchange).with("info-msg");
    }
    
    @Bean
    Binding warnToExchangeBinging(Queue warnQueue, DirectExchange directExchange) {
        return BindingBuilder.bind(warnQueue).to(directExchange).with("warn-msg");
    }
}
  • определить производителя
/**
 * 生产者
 * @author peng
 */
@Component
public class DirectSender {
    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void sendInfo() {
        String content = "I am Info msg!";
        // 将消息以info-msg绑定键发送到directExchange交换机
        this.rabbitTemplate.convertAndSend("directExchange", "info-msg", content);
        System.out.println("########### SendInfoMsg : " + content);
    }
    
    public void sendWarn() {
        String content = "I am Warn msg!";
        this.rabbitTemplate.convertAndSend("directExchange", "warn-msg", content);
        System.out.println("########### SendWarnMsg : " + content);
    }
    
    public void sendWarn(int i) {
        String content = "I am Warn msg! " + i;
        this.rabbitTemplate.convertAndSend("directExchange", "warn-msg", content);
        System.out.println("########### SendWarnMsg : " + content);
    }
    
    public void sendError() {
        String content = "I am Error msg!";
        this.rabbitTemplate.convertAndSend("directExchange", "error-msg", content);
        System.out.println("########### SendErrorMsg : " + content);
    }
}

  • определить потребителей
消费者1
/**
 * @author peng
 */
@Component
// 标记此类为Rabbit消息监听类,监听队列infoMsgQueue
@RabbitListener(queues = "infoMsgQueue")
public class DirectReceiver1 {

    // 定义处理消息的方法
    @RabbitHandler
    public void process(String message) {
        System.out.println("########### DirectReceiver1 Receive InfoMsg:" + message);
    }
}

消费者2 
@Component
@RabbitListener(queues = "warnMsgQueue")
public class DirectReceiver2 {

    @RabbitHandler
    public void process(String message) {
        System.out.println("########### DirectReceiver2 Receive warnMsg:" + message);
    }
}
  • Базовый тест использования
@RunWith(value=SpringJUnit4ClassRunner.class)
@SpringBootTest
public class DirectTest {
    @Autowired
    private DirectSender directSender;

    @Test
    public void send() {
        directSender.sendInfo();
        directSender.sendWarn();
        directSender.sendError();
    }
}

结果

    ########### SendInfoMsg : I am Info msg!
    ########### SendWarnMsg : I am Warn msg!
    ########### DirectReceiver2 Receive warnMsg:I am Warn msg!
    ########### DirectReceiver1 Receive InfoMsg:I am Info msg!
    
    InfoMsg 以info-msg绑定键发送到directExchange交换机,交换机路由到infoMsgQueue队列,DirectReceiver1监听此队列接受消息。
    WarnMsg 同理
    ErrorMsg 由于没有队列的绑定键为 error-msg 所以消息会被丢弃
  • тест один ко многим
消费者3
@Component
@RabbitListener(queues = "warnMsgQueue")
public class DirectReceiver3 {

    @RabbitHandler
    public void process(String message) {
        System.out.println("########### DirectReceiver3 Receive warnMsg:" + message);
    }
}

// 一对多
@Test
public void oneToMany() {
    for (int i = 0; i< 100 ; i++){
        directSender.sendWarn(i);
    }
}

结果
    ########### DirectReceiver2 Receive warnMsg:I am Warn msg! 0
    ########### DirectReceiver3 Receive warnMsg:I am Warn msg! 1
    ########### DirectReceiver3 Receive warnMsg:I am Warn msg! 3
    ########### DirectReceiver2 Receive warnMsg:I am Warn msg! 2
    ########### DirectReceiver2 Receive warnMsg:I am Warn msg! 4
    ########### DirectReceiver2 Receive warnMsg:I am Warn msg! 6
    ########### DirectReceiver2 Receive warnMsg:I am Warn msg! 8
    ########### DirectReceiver2 Receive warnMsg:I am Warn msg! 10
    ########### DirectReceiver3 Receive warnMsg:I am Warn msg! 5
    ########### DirectReceiver3 Receive warnMsg:I am Warn msg! 7
    
    消费者2 和 消费者3 均匀(条数上)的消费了消息
  • тест "многие ко многим"
/**
 * 生产者3
 * @author peng
 */
@Component
public class DirectSender2 {
    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void sendWarn(int i) {
        String content = "I am Warn msg! " + i +" fromSend2";
        this.rabbitTemplate.convertAndSend("directExchange", "warn-msg", content);
        System.out.println("########### SendWarnMsg : " + content);
    }
}

// 多对多
@Test
public void manyToMany() {
    for (int i = 0; i< 10 ; i++){
        directSender.sendWarn(i);
        directSender2.sendWarn(i);
    }
}

结果

    ########### DirectReceiver3 Receive warnMsg:I am Warn msg! 0 fromSend2
    ########### DirectReceiver2 Receive warnMsg:I am Warn msg! 0 fromSend1
    ########### DirectReceiver2 Receive warnMsg:I am Warn msg! 1 fromSend1
    ########### DirectReceiver3 Receive warnMsg:I am Warn msg! 1 fromSend2
    ########### DirectReceiver3 Receive warnMsg:I am Warn msg! 2 fromSend2
    ########### DirectReceiver2 Receive warnMsg:I am Warn msg! 2 fromSend1
    ########### DirectReceiver3 Receive warnMsg:I am Warn msg! 3 fromSend2
    ########### DirectReceiver2 Receive warnMsg:I am Warn msg! 3 fromSend1
    ########### DirectReceiver3 Receive warnMsg:I am Warn msg! 4 fromSend2
    ########### DirectReceiver2 Receive warnMsg:I am Warn msg! 4 fromSend1
   
    消费者2和消费者3分别接受了生产者1 和生产者2的消息

Fanout Exchang

Правила маршрутизации Exchange типа fanout очень просты и отправляются во все очереди, привязанные к бирже. ключи маршрутизации игнорируются

  • класс конфигурации
/**
 * @author peng
 */
@Configuration
public class FanoutRabbitConfig {

    @Bean
    FanoutExchange fanoutExchange(){
        return new FanoutExchange("fanoutExchange");
    }

    @Bean
    Queue queue1(){
        return new Queue("fanout.1");
    }
    @Bean
    Queue queue2(){
        return new Queue("fanout.2");
    }
    @Bean
    Queue queue3(){
        return new Queue("fanout.3");
    }

    @Bean
    Binding bindingExchange1(Queue queue1, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(queue1).to(fanoutExchange);
    }
    @Bean
    Binding bindingExchange2(Queue queue2, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(queue2).to(fanoutExchange);
    }
    @Bean
    Binding bindingExchange3(Queue queue3, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(queue3).to(fanoutExchange);
    }
}
  • режиссер
@Component
public class FanoutSender {
    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send() {
        String context = "hi, fanout msg ";
        this.rabbitTemplate.convertAndSend("fanoutExchange", "", context);
        System.out.println("######## Sender : " + context);
    }
}

  • потребитель
消费者1
/**
 * @author peng
 */
@Component
@RabbitListener(queues = "fanout.1")
public class FanoutReceiver1 {

    @RabbitHandler
    public void process(String message) {
        System.out.println("fanout Receiver 1  : " + message);
    }
}

消费者2
@Component
@RabbitListener(queues = "fanout.2")
public class FanoutReceiver2 {

    @RabbitHandler
    public void process(String message) {
        System.out.println("fanout Receiver 2  : " + message);
    }
}

消费者3
@Component
@RabbitListener(queues = "fanout.3")
public class FanoutReceiver3 {

    @RabbitHandler
    public void process(String message) {
        System.out.println("fanout Receiver 3  : " + message);
    }
}

  • тестовое задание
@RunWith(value=SpringJUnit4ClassRunner.class)
@SpringBootTest
public class FanoutTest {
    @Autowired
    private FanoutSender fanoutSender;

    @Test
    public void send() {
        fanoutSender.send();
    }
}

结果

    ######## Sender : hi, fanout msg 
    fanout Receiver 1  : hi, fanout msg 
    fanout Receiver 2  : hi, fanout msg 
    fanout Receiver 3  : hi, fanout msg

Topic Exchange

Тип темы соответствует наиболее широко. Ключ маршрутизации должен совпадать с ключом привязки (который может быть неопределенно сопоставлен с помощью подстановочных знаков) перед отправкой сообщения в очередь.

* соответствует одному слову, # соответствует нескольким словам, разделенным . Например, *.самец.# может соответствовать собака.самец.четыре, кролик.самец.четыре.белый и т.д.

  • класс конфигурации
@Configuration
public class TopicRabbitConfig {

    @Bean
    TopicExchange topicExchange(){
        return new TopicExchange("topicExchange");
    }

    @Bean
    Queue topicQueue1(){
        return new Queue("topicQueue1");
    }
    @Bean
    Queue topicQueue2(){
        return new Queue("topicQueue2");
    }
    @Bean
    Queue topicQueue3(){
        return new Queue("topicQueue3");
    }

    @Bean
    Binding topicQueue1Binding(Queue topicQueue1, TopicExchange exchange) {
        return BindingBuilder.bind(topicQueue1).to(exchange).with("*.male.four");
    }
    @Bean
    Binding topicQueue2Binding(Queue topicQueue2, TopicExchange exchange) {
        return BindingBuilder.bind(topicQueue2).to(exchange).with("#.four");
    }
    
    @Bean
    Binding topicQueue3Binding(Queue topicQueue3, TopicExchange exchange) {
        return BindingBuilder.bind(topicQueue3).to(exchange).with("hen.female.two");
    }
}
  • режиссер
@Component
public class TopicSender {
    @Autowired
    private AmqpTemplate rabbitTemplate;
    
    public void send1() {
        String context = "##### i am message 1";
        System.out.println("Sender 1 : " + context);
        // 路由键 rabbit.male.four 会被绑定键 *.male.four 和 #.four匹配
        this.rabbitTemplate.convertAndSend("topicExchange", "rabbit.male.four", context);
    }

    public void send2() {
        String context = "##### i am message 2";
        System.out.println("Sender 2: " + context);
        // 路由键 dog.male.four 会被绑定键 #.four 匹配
        this.rabbitTemplate.convertAndSend("topicExchange", "dog.female.four", context);
    }

    public void send3() {
        String context = "##### i am messages 3";
        System.out.println("Sender 3: " + context);
        路由键 hen.female.two 会被绑定键 hen.female.two 匹配
        this.rabbitTemplate.convertAndSend("topicExchange", "hen.female.two", context);
    }
}
  • потребитель
消费者1
@Component
@RabbitListener(queues = "topicQueue1")
public class TopicReceiver1 {
    @RabbitHandler
    public void process(String msg) {
        System.out.println("Topic Receiver1 : " + msg);
    }
}

消费者2
@Component
@RabbitListener(queues = "topicQueue2")
public class TopicReceiver2 {
    @RabbitHandler
    public void process(String msg) {
        System.out.println("Topic Receiver2 : " + msg);
    }
}

消费者3
@Component
@RabbitListener(queues = "topicQueue3")
public class TopicReceiver3 {
    @RabbitHandler
    public void process(String msg) {
        System.out.println("Topic Receiver3 : " + msg);
    }
}

  • тестовое задание
@RunWith(value=SpringJUnit4ClassRunner.class)
@SpringBootTest
public class TopicTest {
    @Autowired
    private TopicSender topicSender;

    @Test
    public void send() {
        topicSender.send1();
        topicSender.send2();
        topicSender.send3();
    }
}

结果

    Sender 1: ##### i am message 1
    Sender 2: ##### i am message 2
    Sender 3: ##### i am messages 3
    Topic Receiver1 : ##### i am message 1
    Topic Receiver2 : ##### i am message 1
    Topic Receiver3 : ##### i am messages 3
    Topic Receiver2 : ##### i am message 2

消息1 被消费者1和2消费 路由键 rabbit.male.four 会被绑定键 *.male.four 和 #.four匹配
消息2 被消费者2消费  路由键 dog.male.four 会被绑定键 #.four 匹配
消息3 被消费者3消费  相当于 direct完全匹配