MQ означает очередь сообщений, а очередь сообщений (MQ) — это метод связи между приложениями. MQ — типичный представитель модели потребитель-производитель: один конец непрерывно записывает сообщения в очередь сообщений, а другой конец может читать сообщения в очереди. Основная функция ПО промежуточного слоя для сообщений — разделение. Наиболее стандартное использование ПО промежуточного слоя – это то, что производители создают сообщения и отправляют их в очереди, а потребители берут сообщения из очередей и обрабатывают их. не важно, кто производит сообщение, чтобы достичь цели разъединения. В распределенных системах очереди сообщений также используются во многих других аспектах, таких как: поддержка распределенных транзакций, вызовы RPC и так далее.
Введение в RabbitMQ
Rabbitmq - это своего рода промежуточное программное обеспечение, которое реализует AMQP (продвинутый протокол о передовой очереди сообщений). Это изначально возникло из финансовой системы и используется для хранения и пересылки сообщений в распределенных системах. Он хорошо работает с точки зрения простоты использования, масштабируемости и высокой наличие., Rabbitmq в основном реализуется для достижения двунаправленного развязки между системами. Когда производитель производит большое количество данных, потребитель не может быстро потреблять его быстро, поэтому требуется промежуточный слой. Сохраните эти данные.
AMQP, Advanced Message Queuing Protocol, является открытым стандартом для протоколов прикладного уровня, разработанным для промежуточного программного обеспечения, ориентированного на сообщения. Промежуточное программное обеспечение сообщений в основном используется для разделения компонентов, отправителю сообщения не нужно знать о существовании потребителя сообщения, и наоборот. Основными функциями AMQP являются ориентированность на сообщения, организация очередей, маршрутизация (включая двухточечную связь и публикацию/подписку), надежность и безопасность.
RabbitMQ — это реализация AMQP с открытым исходным кодом, серверная часть написана на языке Erlang, поддерживает множество клиентов, таких как: Python, Ruby, .NET, Java, JMS, C, PHP, ActionScript, XMPP, STOMP и т. д., поддерживает АЯКС. Он используется для хранения и пересылки сообщений в распределенных системах и хорошо работает с точки зрения простоты использования, масштабируемости и высокой доступности.
Связанные концепции
Обычно, когда мы говорим о службах очередей, есть три понятия: отправитель, очередь и получатель. RabbitMQ добавляет дополнительный уровень абстракции поверх этой базовой концепции. Между отправителем и очередью добавляется обмен (Exchange). Таким образом, нет прямой связи между отправителем и очередью, вместо этого отправитель отправляет сообщение на биржу, а биржа отправляет сообщение в очередь в соответствии с политикой планирования.
Ну и еще 4 важных понятия, а именно: виртуальный хост, свитч, очередь и привязка.
- Виртуальный хост: виртуальный хост содержит набор коммутаторов, очередей и привязок. Зачем вам несколько виртуальных хостов? Проще говоря, в RabbitMQ пользователи могут управлять разрешениями только на уровне детализации виртуальных хостов. Следовательно, если группе A необходимо запретить доступ к коммутаторам/очередям/привязкам группы B, необходимо создать виртуальный хост для A и B соответственно. Каждый сервер RabbitMQ имеет виртуальный хост по умолчанию «/».
- Exchange: Exchange используется для пересылки сообщений, но они не будут сохраняться.Если нет привязки Queue к Exchange, он будет напрямую отбрасывать сообщения, отправленные производителем. Здесь есть более важная концепция: ключи маршрутизации. Когда сообщение поступает на коммутатор, взаимодействие будет переадресовано в соответствующую очередь, поэтому очередь, в которую будет перенаправлено сообщение, зависит от ключа маршрутизации.
- Привязка: то есть коммутатор должен быть привязан к очереди, которая представляет собой отношение «многие ко многим», как показано на рисунке выше.
Четыре вида переключателей (Exchange)
Основная функция коммутатора — прием сообщений и пересылка их в связанную очередь.Коммутатор не хранит сообщения.После включения режима ack, если коммутатор не может найти очередь, он вернет ошибку. Существует четыре типа переключателей: Direct, Topic, Headers и Fanout.
1. Direct Exchange
Поведение прямого типа: «сначала сопоставить, а затем отправить». То есть при привязке устанавливается ключ routing_key, и при совпадении ключа routing_key оно будет отправлено обменом в связанную очередь. Это значение по умолчанию. обмен RabbitMQ Шаблон, который также является самым простым шаблоном, ищет очередь в соответствии с полным текстовым совпадением ключа.
настроить: установить ключ маршрутизации
public static final String QUEUE="queue";
/**
* direct 交换机模式
*/
@Bean
public Queue queue(){
return new Queue(QUEUE,true);
}
Отправить услугу:
@Service
@Slf4j
public class MQSender {
@Autowired
AmqpTemplate amqpTemplate;
public void send(Object message){
String msg = (String) message;
log.info("send msg"+message);
amqpTemplate.convertAndSend(MQConfig.QUEUE,msg);
}
}
Получить услугу:
@Service
@Slf4j
public class MQReceiver {
//监听的queue
@RabbitListener(queues = MQConfig.QUEUE)
public void receive(String msg){
log.info("receive msg "+msg);
}
}
тестовое задание:
@Autowired
private MQSender sender;
sender.send("hello direct Exchange");
2. Topic Exchange
Пересылка сообщений по правилам (наиболее гибкие) Пересылка сообщений в основном основана на подстановочных знаках. При таком типе коммутатора привязка очереди и коммутатора определяет режим маршрутизации, после чего коммутатор может пересылать сообщение только после совпадения подстановочного знака между режимом маршрутизации и ключом маршрутизации.
Ключ маршрутизации должен представлять собой строку символов, разделенных точками (.),
Шаблон маршрутизации должен содержать звездочку (*), которая в основном используется для соответствия слову в указанной позиции ключа маршрутизации, а знак решетки (#) эквивалентен одному или нескольким словам.
Класс конфигурации:
public static final String TOPIC_QUEUE1="topic.queue1";
public static final String TOPIC_QUEUE2="topic.queue2";
public static final String ROUTING_KEY1="topic.key1";
public static final String ROUTING_KEY2="topic.#";
/**
* Topic 交换机模式 可以用通配符
*/
@Bean
public Queue topicQueue1(){
return new Queue(TOPIC_QUEUE1,true);
}
@Bean
public Queue topicQueue2(){
return new Queue(TOPIC_QUEUE2,true);
}
@Bean
public TopicExchange topicExchange(){
return new TopicExchange(TOPIC_EXCHANGE);
}
@Bean
public Binding topicBinding1(){
return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with(ROUTING_KEY1);
}
@Bean
public Binding topicBinding2(){
return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with(ROUTING_KEY2);
}
Отправить класс:
public void sendTopic(Object message){
String msg = (String) message;
log.info("send topic message"+msg);
amqpTemplate.convertAndSend(MQConfig.TOPIC_EXCHANGE,"topic.key1",msg+"1");
amqpTemplate.convertAndSend(MQConfig.TOPIC_EXCHANGE,"topic.key2",msg+"2");
}
Получить класс:
@RabbitListener(queues = MQConfig.TOPIC_QUEUE1)
public void receiveTopic1(String msg){
log.info("receive topic1 msg "+msg);
}
тестовое задание:
@Autowired
private MQSender sender;
sender.sendTopic("hello topic Exchange");
3. Headers Exchange
Обмен, который устанавливает тип параметра атрибута заголовка, по сравнению с прямым и топиком с фиксированным использованием routing_key, заголовки представляют собой тип настраиваемых правил сопоставления.Когда очередь привязана к обмену, набор правил пары ключ-значение, сообщение Он также включает набор пар ключ-значение (атрибут заголовков), когда одна или все эти пары ключ-значение совпадают, сообщение отправляется в соответствующую очередь.
public static final String HEADER_EXCHANGE="headerExchange";
/**
* Header 交换机模式
*/
@Bean
public HeadersExchange headersExchange(){
return new HeadersExchange(HEADER_EXCHANGE);
}
@Bean
public Queue headerQueue(){
return new Queue(HEADER_QUEUE2,true);
}
// 绑定需要指定header,如果不匹配 则不能使用
@Bean
public Binding headerBinding(){
Map<String,Object> map = new HashMap();
map.put("header1","value1");
map.put("header2","value2");
return BindingBuilder.bind(headerQueue()).to(headersExchange()).whereAll(map).match();
}
public void sendHeader(Object massage){
String msg = (String) massage;
log.info("send fanout message: "+msg);
MessageProperties properties = new MessageProperties();
properties.setHeader("header1","value1");
properties.setHeader("header2","value2");
Message obj = new Message(msg.getBytes(),properties);
amqpTemplate.convertAndSend(MQConfig.HEADER_EXCHANGE,"",obj);
}
Используйте MessageProperties, чтобы добавить информацию о заголовке, а затем сравните ее с заголовком получателя. Я установил «заголовок1», «значение1», «заголовок2», «значение2».
//监听 header模式的queue
@RabbitListener(queues = MQConfig.HEADER_QUEUE2)
//因为发送的是 byte 类型,所以接受也是该数据类型
public void receiveHeader(byte[] message){
log.info("header queue message"+new String(message));
}
тестовое задание:
@Autowired
private MQSender sender;
sender.sendHeader("hello header Exchange");
4. Fanout Exchange
Перенаправляет сообщения во все связанные очереди. Режим широковещательной рассылки сообщений, независимо от ключа маршрутизации или режима маршрутизации, будет отправлять сообщение во все связанные с ним очереди. Если параметр routing_key настроен, он будет игнорироваться.
public static final String FANOUT_EXCHANGE="fanoutExchange";
/**
* Fanout 交换机模式(广播模式),不用绑定key
*/
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange(FANOUT_EXCHANGE);
}
@Bean
public Binding fanoutBinding1(){
return BindingBuilder.bind(topicQueue1()).to(fanoutExchange());
}
@Bean
public Binding fanoutBinding2(){
return BindingBuilder.bind(topicQueue2()).to(fanoutExchange());
}
public void sendFanout(Object massage){
String msg = (String) massage;
log.info("send fanout message: "+msg);
amqpTemplate.convertAndSend(MQConfig.FANOUT_EXCHANGE,"",msg);
}
тестовое задание:
@Autowired
private MQSender sender;
sender.sendFanout("hello fanout Exchange");
Пополнить
Этот пример основан на springboot.
пом-зависимость
<!--rabbbitMQ相关依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
конфигурационный файл
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
# 这个账号密码只能连接本地的mq,远程的话需要配置
virtual-host: /
listener:
simple:
concurrency: 10
max-concurrency: 10
prefetch: 1 # 从队列每次取一个
auto-startup: true
default-requeue-rejected: true # 失败后重试
В будущем он будет использоваться для реализации небольшой функции захвата билетов.
Если вам нравятся мои статьи, обратите внимание на мой паблик аккаунтPlayInJava
, официальная учетная запись посвящена анализу технологий архитектора, официальная учетная запись все еще находится на начальной стадии, спасибо за вашу поддержку.
java架构
Получите архитектурные видеоресурсы (другие высококачественные ресурсы будут опубликованы позже).