Exchange
Exchange: Exchange используется для пересылки сообщений, но они не будут сохраняться.Если нет привязки Queue к Exchange, он будет напрямую отбрасывать сообщения, отправленные производителем. Здесь есть более важная концепция: ключи маршрутизации. Когда сообщение поступает на коммутатор, взаимодействие будет переадресовано в соответствующую очередь, поэтому очередь, в которую будет перенаправлено сообщение, зависит от ключа маршрутизации.
Основная функция коммутатора — прием сообщений и пересылка их в связанную очередь.Коммутатор не хранит сообщения.После включения режима ack, если коммутатор не может найти очередь, он вернет ошибку.
Существует четыре типа переключателей: Direct, Topic, Headers и Fanout.
* Direct:direct 类型的行为是”先匹配, 再投送”. 即在绑定时设定一个 routing_key, 消息的routing_key 匹配时, 才会被交换器投送到绑定的队列中去.
* Topic:按规则转发消息(最灵活)
* Headers:设置 header attribute 参数类型的交换机
* Fanout:转发消息到所有绑定队列(广播模式)
Ниже описано основное использование трех наиболее часто используемых режимов.
Интеграция SpringBoot
Пом-зависимости
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
файл конфигурации application.properties
# rabbitmq连接参数
spring.rabbitmq.host= # mq ip地址
spring.rabbitmq.port=5672 # 端口 默认5672
spring.rabbitmq.username=admin # 用户名
spring.rabbitmq.password=admin # 密码
# 开启发送确认(开启此模式,生产者成功发送到交换机后执行相应的回调函数)
#spring.rabbitmq.publisher-confirms=true
# 开启发送失败退(开启此模式,交换机路由不到队列时执行相应的回调函数)
#spring.rabbitmq.publisher-returns=true
# 开启消费者手动确认 ACK 默认auto
#spring.rabbitmq.listener.direct.acknowledge-mode=manual
#spring.rabbitmq.listener.simple.acknowledge-mode=manual
Direct Exchange
Правило маршрутизации Exchange прямого типа очень простое, оно направит сообщение в очередь, ключ привязки которой точно соответствует ключу маршрутизации.
- Определите классы конфигурации, зарегистрируйте коммутаторы и очереди и привяжите их
/**
* Rabbit 配置类
* @author peng
*/
@Configuration
public class DirectRabbitConfig {
@Bean
DirectExchange directExchange(){
// 注册一个 Direct 类型的交换机 默认持久化、非自动删除
return new DirectExchange("directExchange");
}
@Bean
Queue infoQueue(){
// 注册队列
return new Queue("infoMsgQueue");
}
@Bean
Queue warnQueue(){
return new Queue("warnMsgQueue");
}
@Bean
Binding infoToExchangeBinging(Queue infoQueue, DirectExchange directExchange) {
// 将队列以 info-msg 为绑定键绑定到交换机
return BindingBuilder.bind(infoQueue).to(directExchange).with("info-msg");
}
@Bean
Binding warnToExchangeBinging(Queue warnQueue, DirectExchange directExchange) {
return BindingBuilder.bind(warnQueue).to(directExchange).with("warn-msg");
}
}
- определить производителя
/**
* 生产者
* @author peng
*/
@Component
public class DirectSender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void sendInfo() {
String content = "I am Info msg!";
// 将消息以info-msg绑定键发送到directExchange交换机
this.rabbitTemplate.convertAndSend("directExchange", "info-msg", content);
System.out.println("########### SendInfoMsg : " + content);
}
public void sendWarn() {
String content = "I am Warn msg!";
this.rabbitTemplate.convertAndSend("directExchange", "warn-msg", content);
System.out.println("########### SendWarnMsg : " + content);
}
public void sendWarn(int i) {
String content = "I am Warn msg! " + i;
this.rabbitTemplate.convertAndSend("directExchange", "warn-msg", content);
System.out.println("########### SendWarnMsg : " + content);
}
public void sendError() {
String content = "I am Error msg!";
this.rabbitTemplate.convertAndSend("directExchange", "error-msg", content);
System.out.println("########### SendErrorMsg : " + content);
}
}
- определить потребителей
消费者1
/**
* @author peng
*/
@Component
// 标记此类为Rabbit消息监听类,监听队列infoMsgQueue
@RabbitListener(queues = "infoMsgQueue")
public class DirectReceiver1 {
// 定义处理消息的方法
@RabbitHandler
public void process(String message) {
System.out.println("########### DirectReceiver1 Receive InfoMsg:" + message);
}
}
消费者2
@Component
@RabbitListener(queues = "warnMsgQueue")
public class DirectReceiver2 {
@RabbitHandler
public void process(String message) {
System.out.println("########### DirectReceiver2 Receive warnMsg:" + message);
}
}
- Базовый тест использования
@RunWith(value=SpringJUnit4ClassRunner.class)
@SpringBootTest
public class DirectTest {
@Autowired
private DirectSender directSender;
@Test
public void send() {
directSender.sendInfo();
directSender.sendWarn();
directSender.sendError();
}
}
结果
########### SendInfoMsg : I am Info msg!
########### SendWarnMsg : I am Warn msg!
########### DirectReceiver2 Receive warnMsg:I am Warn msg!
########### DirectReceiver1 Receive InfoMsg:I am Info msg!
InfoMsg 以info-msg绑定键发送到directExchange交换机,交换机路由到infoMsgQueue队列,DirectReceiver1监听此队列接受消息。
WarnMsg 同理
ErrorMsg 由于没有队列的绑定键为 error-msg 所以消息会被丢弃
- тест один ко многим
消费者3
@Component
@RabbitListener(queues = "warnMsgQueue")
public class DirectReceiver3 {
@RabbitHandler
public void process(String message) {
System.out.println("########### DirectReceiver3 Receive warnMsg:" + message);
}
}
// 一对多
@Test
public void oneToMany() {
for (int i = 0; i< 100 ; i++){
directSender.sendWarn(i);
}
}
结果
########### DirectReceiver2 Receive warnMsg:I am Warn msg! 0
########### DirectReceiver3 Receive warnMsg:I am Warn msg! 1
########### DirectReceiver3 Receive warnMsg:I am Warn msg! 3
########### DirectReceiver2 Receive warnMsg:I am Warn msg! 2
########### DirectReceiver2 Receive warnMsg:I am Warn msg! 4
########### DirectReceiver2 Receive warnMsg:I am Warn msg! 6
########### DirectReceiver2 Receive warnMsg:I am Warn msg! 8
########### DirectReceiver2 Receive warnMsg:I am Warn msg! 10
########### DirectReceiver3 Receive warnMsg:I am Warn msg! 5
########### DirectReceiver3 Receive warnMsg:I am Warn msg! 7
消费者2 和 消费者3 均匀(条数上)的消费了消息
- тест "многие ко многим"
/**
* 生产者3
* @author peng
*/
@Component
public class DirectSender2 {
@Autowired
private AmqpTemplate rabbitTemplate;
public void sendWarn(int i) {
String content = "I am Warn msg! " + i +" fromSend2";
this.rabbitTemplate.convertAndSend("directExchange", "warn-msg", content);
System.out.println("########### SendWarnMsg : " + content);
}
}
// 多对多
@Test
public void manyToMany() {
for (int i = 0; i< 10 ; i++){
directSender.sendWarn(i);
directSender2.sendWarn(i);
}
}
结果
########### DirectReceiver3 Receive warnMsg:I am Warn msg! 0 fromSend2
########### DirectReceiver2 Receive warnMsg:I am Warn msg! 0 fromSend1
########### DirectReceiver2 Receive warnMsg:I am Warn msg! 1 fromSend1
########### DirectReceiver3 Receive warnMsg:I am Warn msg! 1 fromSend2
########### DirectReceiver3 Receive warnMsg:I am Warn msg! 2 fromSend2
########### DirectReceiver2 Receive warnMsg:I am Warn msg! 2 fromSend1
########### DirectReceiver3 Receive warnMsg:I am Warn msg! 3 fromSend2
########### DirectReceiver2 Receive warnMsg:I am Warn msg! 3 fromSend1
########### DirectReceiver3 Receive warnMsg:I am Warn msg! 4 fromSend2
########### DirectReceiver2 Receive warnMsg:I am Warn msg! 4 fromSend1
消费者2和消费者3分别接受了生产者1 和生产者2的消息
Fanout Exchang
Правила маршрутизации Exchange типа fanout очень просты и отправляются во все очереди, привязанные к бирже. ключи маршрутизации игнорируются
- класс конфигурации
/**
* @author peng
*/
@Configuration
public class FanoutRabbitConfig {
@Bean
FanoutExchange fanoutExchange(){
return new FanoutExchange("fanoutExchange");
}
@Bean
Queue queue1(){
return new Queue("fanout.1");
}
@Bean
Queue queue2(){
return new Queue("fanout.2");
}
@Bean
Queue queue3(){
return new Queue("fanout.3");
}
@Bean
Binding bindingExchange1(Queue queue1, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(queue1).to(fanoutExchange);
}
@Bean
Binding bindingExchange2(Queue queue2, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(queue2).to(fanoutExchange);
}
@Bean
Binding bindingExchange3(Queue queue3, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(queue3).to(fanoutExchange);
}
}
- режиссер
@Component
public class FanoutSender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send() {
String context = "hi, fanout msg ";
this.rabbitTemplate.convertAndSend("fanoutExchange", "", context);
System.out.println("######## Sender : " + context);
}
}
- потребитель
消费者1
/**
* @author peng
*/
@Component
@RabbitListener(queues = "fanout.1")
public class FanoutReceiver1 {
@RabbitHandler
public void process(String message) {
System.out.println("fanout Receiver 1 : " + message);
}
}
消费者2
@Component
@RabbitListener(queues = "fanout.2")
public class FanoutReceiver2 {
@RabbitHandler
public void process(String message) {
System.out.println("fanout Receiver 2 : " + message);
}
}
消费者3
@Component
@RabbitListener(queues = "fanout.3")
public class FanoutReceiver3 {
@RabbitHandler
public void process(String message) {
System.out.println("fanout Receiver 3 : " + message);
}
}
- тестовое задание
@RunWith(value=SpringJUnit4ClassRunner.class)
@SpringBootTest
public class FanoutTest {
@Autowired
private FanoutSender fanoutSender;
@Test
public void send() {
fanoutSender.send();
}
}
结果
######## Sender : hi, fanout msg
fanout Receiver 1 : hi, fanout msg
fanout Receiver 2 : hi, fanout msg
fanout Receiver 3 : hi, fanout msg
Topic Exchange
Тип темы соответствует наиболее широко. Ключ маршрутизации должен совпадать с ключом привязки (который может быть неопределенно сопоставлен с помощью подстановочных знаков) перед отправкой сообщения в очередь.
* соответствует одному слову, # соответствует нескольким словам, разделенным . Например, *.самец.# может соответствовать собака.самец.четыре, кролик.самец.четыре.белый и т.д.
- класс конфигурации
@Configuration
public class TopicRabbitConfig {
@Bean
TopicExchange topicExchange(){
return new TopicExchange("topicExchange");
}
@Bean
Queue topicQueue1(){
return new Queue("topicQueue1");
}
@Bean
Queue topicQueue2(){
return new Queue("topicQueue2");
}
@Bean
Queue topicQueue3(){
return new Queue("topicQueue3");
}
@Bean
Binding topicQueue1Binding(Queue topicQueue1, TopicExchange exchange) {
return BindingBuilder.bind(topicQueue1).to(exchange).with("*.male.four");
}
@Bean
Binding topicQueue2Binding(Queue topicQueue2, TopicExchange exchange) {
return BindingBuilder.bind(topicQueue2).to(exchange).with("#.four");
}
@Bean
Binding topicQueue3Binding(Queue topicQueue3, TopicExchange exchange) {
return BindingBuilder.bind(topicQueue3).to(exchange).with("hen.female.two");
}
}
- режиссер
@Component
public class TopicSender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send1() {
String context = "##### i am message 1";
System.out.println("Sender 1 : " + context);
// 路由键 rabbit.male.four 会被绑定键 *.male.four 和 #.four匹配
this.rabbitTemplate.convertAndSend("topicExchange", "rabbit.male.four", context);
}
public void send2() {
String context = "##### i am message 2";
System.out.println("Sender 2: " + context);
// 路由键 dog.male.four 会被绑定键 #.four 匹配
this.rabbitTemplate.convertAndSend("topicExchange", "dog.female.four", context);
}
public void send3() {
String context = "##### i am messages 3";
System.out.println("Sender 3: " + context);
路由键 hen.female.two 会被绑定键 hen.female.two 匹配
this.rabbitTemplate.convertAndSend("topicExchange", "hen.female.two", context);
}
}
- потребитель
消费者1
@Component
@RabbitListener(queues = "topicQueue1")
public class TopicReceiver1 {
@RabbitHandler
public void process(String msg) {
System.out.println("Topic Receiver1 : " + msg);
}
}
消费者2
@Component
@RabbitListener(queues = "topicQueue2")
public class TopicReceiver2 {
@RabbitHandler
public void process(String msg) {
System.out.println("Topic Receiver2 : " + msg);
}
}
消费者3
@Component
@RabbitListener(queues = "topicQueue3")
public class TopicReceiver3 {
@RabbitHandler
public void process(String msg) {
System.out.println("Topic Receiver3 : " + msg);
}
}
- тестовое задание
@RunWith(value=SpringJUnit4ClassRunner.class)
@SpringBootTest
public class TopicTest {
@Autowired
private TopicSender topicSender;
@Test
public void send() {
topicSender.send1();
topicSender.send2();
topicSender.send3();
}
}
结果
Sender 1: ##### i am message 1
Sender 2: ##### i am message 2
Sender 3: ##### i am messages 3
Topic Receiver1 : ##### i am message 1
Topic Receiver2 : ##### i am message 1
Topic Receiver3 : ##### i am messages 3
Topic Receiver2 : ##### i am message 2
消息1 被消费者1和2消费 路由键 rabbit.male.four 会被绑定键 *.male.four 和 #.four匹配
消息2 被消费者2消费 路由键 dog.male.four 会被绑定键 #.four 匹配
消息3 被消费者3消费 相当于 direct完全匹配