1. Сценарии применения ПО промежуточного слоя сообщений
Асинхронная обработка
Сцена: регистрация пользователя, после того, как информация написана в базе данных, вам нужно отправить пользователя отправить успешное сообщение, а затем отправить зарегистрированную успешную электронную почту.
1. Синхронный вызов: после успешной регистрации метод отправки электронной почты и отправки SMS выполняется последовательно и, наконец, отвечает пользователю.
2. Параллельный вызов: после успешной регистрации метод отправки электронных писем и текстовых сообщений выполняется одновременно в многопоточном режиме и, наконец, отвечает пользователю.
3. Очередь сообщений: после успешной регистрации сообщение для отправки записывается в очередь сообщений за короткое время, а затем отвечает пользователю, служба отправки почты и служба отправки коротких сообщений могут быть асинхронно прочитаны из очереди сообщений, а затем отправил Task.
Разделение приложений
Сценарий: после совершения покупок и размещения заказа позвоните в систему инвентаризации, чтобы обновить инвентарь.
1. Метод соединения: система заказов, запись логики для вызова системы инвентаризации.
2. Сердечный способ: система заказа, выданная сообщение написано в очередь сообщений, система инвентаризации читает сообщение из очереди сообщений, обновляя инвентаризацию.
Отсечение трафика
В сценарии seckill мы можем настроить очередь сообщений фиксированной длины, когда начинается seckill, тот, кто быстрее, входит в очередь первым, а затем быстро возвращается, если пользователь приходит за секунды, а затем плавно обрабатывает дело после seckill.
2. Обзор промежуточного программного обеспечения службы сообщений
-
- В большинстве приложений промежуточное программное обеспечение службы сообщений можно использовать для улучшения системной асинхронной связи и расширения возможностей развязки.
-
- Две важные концепции в службах сообщений: Брокер сообщений (message Broker) и пункт назначения (destination) Когда отправитель сообщения отправляет сообщение, оно передается брокеру сообщений, который гарантирует, что сообщение будет доставлено в указанное место назначения. земля.
-
- Существует две основные формы назначения очередей сообщений.
-
- Очередь: двухточечная передача сообщений
-
- Тема: публикация/подписка сообщения сообщения
-
- Точка-точка:
-
- Отправитель сообщения отправляет сообщение, брокер сообщений помещает его в очередь, получатель сообщения получает содержимое сообщения из очереди, и сообщение удаляется из очереди после прочтения.
-
- Сообщения имеют только одного отправителя и получателя, но это не значит, что может быть только один получатель.
-
- Публикация-подписка:
- Отправитель (издатель) отправляет сообщение в тему, а несколько получателей (подписчиков) слушают (подписываются) в тему, тогда они получат сообщение одновременно, когда сообщение прибудет
-
- JMS (служба сообщений Java) Служба сообщений JAVA:
- На основе спецификации брокера сообщений JVM. ActiveMQ и HornetMQ являются реализациями JMS.
-
- AMQP(Advanced Message Queuing Protocol)
- Расширенный протокол очереди сообщений, также спецификация брокера сообщений, совместимая с JMS.
- RabbitMQ — это реализация AMQP.
-
- Пружинная поддержка
- spring-jms обеспечивает поддержку JMS
- spring-rabbit обеспечивает поддержку AMQP
- Для подключения к брокеру сообщений требуется реализация ConnectionFactory.
- Предоставьте JmsTemplate, RabbitTemplate для отправки сообщений
- Аннотации @JmsListener(JMS), @RabbitListener(AMQP) отслеживают сообщения, публикуемые брокером сообщений в методе.
- @EnableJms, @EnableRabbit включить поддержку
-
- Автоматическая настройка Spring Boot
- JmsAutoConfiguration
- RabbitAutoConfiguration
3. Введение в RabbitMQ
RabbitMQ — это реализация AMQP (расширенный протокол очереди сообщений) с открытым исходным кодом, разработанная erlang.
1. Основные понятия
- Message: сообщение, сообщение является анонимным, оно состоит из заголовка сообщения и тела сообщения. Тело сообщения непрозрачно, а заголовок сообщения состоит из ряда необязательных атрибутов, включая ключ маршрутизации (ключ маршрутизации), приоритет (приоритет по отношению к другим сообщениям), режим доставки (указывающий, что сообщению может потребоваться постоянное хранение), и Т. Д.
- Publisher: Производитель сообщения, который также является клиентским приложением, которое публикует сообщения для обмена.
- Exchange: Обмен, который получает сообщения, отправленные производителями, и направляет их в очереди на сервере. Exchange имеет 4 типа: прямой (по умолчанию), разветвленный, раздел и заголовки.Разные типы Exchange имеют разные стратегии пересылки сообщений.
- Queue: очередь сообщений, используемая для хранения сообщений до тех пор, пока они не будут отправлены потребителям. Это контейнер для сообщения и пункт назначения для сообщения. Сообщение может быть помещено в одну или несколько очередей. Сообщение было в очереди, ожидая, пока потребитель подключится к очереди, чтобы забрать его.
- Binding: привязка для ассоциации между очередями сообщений и обменами. Привязка — это правило маршрутизации, которое связывает обмен и очередь сообщений на основе ключа маршрутизации, поэтому обмен можно понимать как таблицу маршрутизации, состоящую из привязок. Связь между Exchange и Queue может быть отношением «многие ко многим».
- Connection: Сетевое подключение, например TCP-соединение.
- Channel: канал, независимый двунаправленный канал потока данных в мультиплексном соединении. Канал — это виртуальное соединение, установленное внутри реального TCP-соединения. Все команды AMQP отправляются через канал. Будь то публикация сообщений, подписка на очереди или получение сообщений, все эти действия выполняются через канал. Поскольку установление и уничтожение TCP очень дорого обходится операционной системе, для повторного использования TCP-соединения вводится понятие канала.
- Consumer: Потребитель сообщений, представляющий клиентское приложение, которое получает сообщения из очереди сообщений.
- Virtual Host: виртуальный хост, представляющий набор обменов, очередей сообщений и связанных объектов. Виртуальные хосты — это отдельные серверные домены, использующие одну и ту же среду аутентификации и шифрования. Каждый виртуальный хост по сути представляет собой мини-версию сервера RabbitMQ со своими собственными очередями, обменами, привязками и механизмами разрешений. vhost является основой концепции AMQP и должен быть указан при подключении.Vhost по умолчанию для RabbitMQ — / .
- Broker: представляет сущность сервера очереди сообщений.
В-четвертых, рабочий механизм RabbitMQ
Маршруты сообщения в AMQP
Процесс маршрутизации сообщений AMQP и Java-разработчики знакомы с JMS, есть некоторые различия, AMQP добавлен к роли Exchange и Binding. Производители помещают сообщения, размещенные на бирже, в очередь сообщений и в конечном итоге достигают получателя, а обязательное решение о переключении сообщения должно быть отправлено в очередь.
Тип обмена
Exchange распределяет сообщения в соответствии с различными стратегиями распространения различных типов. В настоящее время существует четыре типа: прямое, разветвление, тема и заголовки. Заголовки соответствуют заголовку сообщения AMQP, а не ключу маршрутизации.Обмен заголовками точно такой же, как и прямой обмен, но производительность намного хуже, и он почти не используется в настоящее время, поэтому давайте посмотрим на остальные три типа. напрямую:
Пять, установка RabbitMQ
Мы используем докер для установки RabbitMQ.
Мы выбираем официальную последнюю версию с интерфейсом управления на Docker Hub.
#获取rabbitmq镜像
docker pull rabbitmq:3-management
#启动 rabbitmq镜像,5672是mq通信端口,15672是mq的web管理界面端口
run -d -p 5672:5672 -p 15672:15672 --name myrabbitmq 镜像ID
Посетите 127.0.0.1:15672, войдите в систему с учетной записью: guest пароль: guest, интерфейс выглядит следующим образом:
Подробное использование rabbitmq здесь не объясняется здесь. Фокус нашего раздела - интегрировать rabbitmq.
6. Интегрируйте RabbitMQ
Создайте проект для внедрения зависимостей rabbitmq.
1. pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.gf</groupId>
<artifactId>springboot-rabbitmq</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>springboot-rabbitmq</name>
<description>Demo project for Spring Boot</description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.5.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
2. MyAMQPConfig
package com.gf.config;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.amqp.support.converter.MessageConverter;
/**
* 自定义消息转换器,默认是jdk的序列化转换器,我们自定义为json的
*/
@Configuration
public class MyAMQPConfig {
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
}
3. тестовый класс весенней загрузки
Мы тестируем создание конфигурации администратора, отправку сообщения, получение сообщения
package com.gf;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringbootRabbitmqApplicationTests {
@Autowired
RabbitTemplate rabbitTemplate;
@Autowired
AmqpAdmin amqpAdmin;
@Test
public void contextLoads() {
}
@Test
public void create(){
//创建Exchange
amqpAdmin.declareExchange( new DirectExchange( "exchange.direct") );
amqpAdmin.declareExchange( new FanoutExchange( "exchange.fanout") );
amqpAdmin.declareExchange( new TopicExchange( "exchange.topic") );
//创建Queue
amqpAdmin.declareQueue( new Queue( "direct.queue" , true ) );
amqpAdmin.declareQueue( new Queue( "fanout.queue" , true ) );
//绑定Queue
amqpAdmin.declareBinding( new Binding( "direct.queue" , Binding.DestinationType.QUEUE , "exchange.direct" , "direct.queue" , null ) );
amqpAdmin.declareBinding( new Binding( "fanout.queue" , Binding.DestinationType.QUEUE , "exchange.direct" , "fanout.queue" , null ) );
amqpAdmin.declareBinding( new Binding( "direct.queue" , Binding.DestinationType.QUEUE , "exchange.fanout" , "" , null ) );
amqpAdmin.declareBinding( new Binding( "fanout.queue" , Binding.DestinationType.QUEUE , "exchange.fanout" , "" , null ) );
amqpAdmin.declareBinding( new Binding( "direct.queue" , Binding.DestinationType.QUEUE , "exchange.topic" , "direct.#" , null ) );
amqpAdmin.declareBinding( new Binding( "fanout.queue" , Binding.DestinationType.QUEUE , "exchange.topic" , "direct.*" , null ) );
}
@Test
public void send2Direct() {
Map<String , Object> map = new HashMap<>();
map.put( "msg" , "这是一条点对点消息" );
map.put( "data" , Arrays.asList("helloworld" , 123 , true) );
rabbitTemplate.convertAndSend( "exchange.direct" , "direct.queue" , map );
}
@Test
public void send2Topic() {
Map<String , Object> map = new HashMap<>();
map.put( "msg" , "这是一条广播消息" );
map.put( "data" , Arrays.asList("topic消息" , 123 , true) );
rabbitTemplate.convertAndSend( "exchange.fanout" , "", map );
}
@Test
public void receive() {
Object o = rabbitTemplate.receiveAndConvert( "direct.queue" );
o.getClass();
System.out.println(o.getClass());
System.out.println(o);
}
}
слушать сообщения
4. класс запуска
package com.gf;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* 自动配置
* 1. RabbitAutoConfiguration
* 2. 自动配置了连接工厂ConnectionFactory
* 3. RabbitProperties 封装了RabbitMQ的配置
* 4. RabbitTemplate : 给RabbitMQ发送和接受消息
* 5. AmqpAdmin : RabbitMQ系统管理功能组件
* 6. @EnableRabbit + @RabbitListener
*/
@EnableRabbit
@SpringBootApplication
public class SpringbootRabbitmqApplication {
public static void main(String[] args) {
SpringApplication.run(SpringbootRabbitmqApplication.class, args);
}
}
5. MQService
package com.gf.service;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
@Service
public class MQService {
@RabbitListener(queues = "fanout.queue")
public void receive(Message message) {
System.out.println("收到消息 : " + new String(message.getBody()));
}
}
Загрузка исходного кода:GitHub.com/Kung Fu-Change сенсорный экран…