Интеграция с SpringBoot

Spring Boot

1. Сценарии применения ПО промежуточного слоя сообщений

Асинхронная обработка

Сцена: регистрация пользователя, после того, как информация написана в базе данных, вам нужно отправить пользователя отправить успешное сообщение, а затем отправить зарегистрированную успешную электронную почту.

1. Синхронный вызов: после успешной регистрации метод отправки электронной почты и отправки SMS выполняется последовательно и, наконец, отвечает пользователю.

2. Параллельный вызов: после успешной регистрации метод отправки электронных писем и текстовых сообщений выполняется одновременно в многопоточном режиме и, наконец, отвечает пользователю.

3. Очередь сообщений: после успешной регистрации сообщение для отправки записывается в очередь сообщений за короткое время, а затем отвечает пользователю, служба отправки почты и служба отправки коротких сообщений могут быть асинхронно прочитаны из очереди сообщений, а затем отправил Task.

Разделение приложений

Сценарий: после совершения покупок и размещения заказа позвоните в систему инвентаризации, чтобы обновить инвентарь.

1. Метод соединения: система заказов, запись логики для вызова системы инвентаризации.

2. Сердечный способ: система заказа, выданная сообщение написано в очередь сообщений, система инвентаризации читает сообщение из очереди сообщений, обновляя инвентаризацию.

Отсечение трафика

В сценарии seckill мы можем настроить очередь сообщений фиксированной длины, когда начинается seckill, тот, кто быстрее, входит в очередь первым, а затем быстро возвращается, если пользователь приходит за секунды, а затем плавно обрабатывает дело после seckill.

2. Обзор промежуточного программного обеспечения службы сообщений

    1. В большинстве приложений промежуточное программное обеспечение службы сообщений можно использовать для улучшения системной асинхронной связи и расширения возможностей развязки.
    1. Две важные концепции в службах сообщений: Брокер сообщений (message Broker) и пункт назначения (destination) Когда отправитель сообщения отправляет сообщение, оно передается брокеру сообщений, который гарантирует, что сообщение будет доставлено в указанное место назначения. земля.
    1. Существует две основные формы назначения очередей сообщений.
    1. Очередь: двухточечная передача сообщений
    1. Тема: публикация/подписка сообщения сообщения
    1. Точка-точка:
    1. Отправитель сообщения отправляет сообщение, брокер сообщений помещает его в очередь, получатель сообщения получает содержимое сообщения из очереди, и сообщение удаляется из очереди после прочтения.
    1. Сообщения имеют только одного отправителя и получателя, но это не значит, что может быть только один получатель.
    1. Публикация-подписка:
  • Отправитель (издатель) отправляет сообщение в тему, а несколько получателей (подписчиков) слушают (подписываются) в тему, тогда они получат сообщение одновременно, когда сообщение прибудет
    1. JMS (служба сообщений Java) Служба сообщений JAVA:
  • На основе спецификации брокера сообщений JVM. ActiveMQ и HornetMQ являются реализациями JMS.
    1. AMQP(Advanced Message Queuing Protocol)
  • Расширенный протокол очереди сообщений, также спецификация брокера сообщений, совместимая с JMS.
  • RabbitMQ — это реализация AMQP.

    1. Пружинная поддержка
  • spring-jms обеспечивает поддержку JMS
  • spring-rabbit обеспечивает поддержку AMQP
  • Для подключения к брокеру сообщений требуется реализация ConnectionFactory.
  • Предоставьте JmsTemplate, RabbitTemplate для отправки сообщений
  • Аннотации @JmsListener(JMS), @RabbitListener(AMQP) отслеживают сообщения, публикуемые брокером сообщений в методе.
  • @EnableJms, @EnableRabbit включить поддержку
    1. Автоматическая настройка Spring Boot
  • JmsAutoConfiguration
  • RabbitAutoConfiguration

3. Введение в RabbitMQ

RabbitMQ — это реализация AMQP (расширенный протокол очереди сообщений) с открытым исходным кодом, разработанная erlang.

1. Основные понятия

  • Message: сообщение, сообщение является анонимным, оно состоит из заголовка сообщения и тела сообщения. Тело сообщения непрозрачно, а заголовок сообщения состоит из ряда необязательных атрибутов, включая ключ маршрутизации (ключ маршрутизации), приоритет (приоритет по отношению к другим сообщениям), режим доставки (указывающий, что сообщению может потребоваться постоянное хранение), и Т. Д.
  • Publisher: Производитель сообщения, который также является клиентским приложением, которое публикует сообщения для обмена.
  • Exchange: Обмен, который получает сообщения, отправленные производителями, и направляет их в очереди на сервере. Exchange имеет 4 типа: прямой (по умолчанию), разветвленный, раздел и заголовки.Разные типы Exchange имеют разные стратегии пересылки сообщений.
  • Queue: очередь сообщений, используемая для хранения сообщений до тех пор, пока они не будут отправлены потребителям. Это контейнер для сообщения и пункт назначения для сообщения. Сообщение может быть помещено в одну или несколько очередей. Сообщение было в очереди, ожидая, пока потребитель подключится к очереди, чтобы забрать его.
  • Binding: привязка для ассоциации между очередями сообщений и обменами. Привязка — это правило маршрутизации, которое связывает обмен и очередь сообщений на основе ключа маршрутизации, поэтому обмен можно понимать как таблицу маршрутизации, состоящую из привязок. Связь между Exchange и Queue может быть отношением «многие ко многим».
  • Connection: Сетевое подключение, например TCP-соединение.
  • Channel: канал, независимый двунаправленный канал потока данных в мультиплексном соединении. Канал — это виртуальное соединение, установленное внутри реального TCP-соединения. Все команды AMQP отправляются через канал. Будь то публикация сообщений, подписка на очереди или получение сообщений, все эти действия выполняются через канал. Поскольку установление и уничтожение TCP очень дорого обходится операционной системе, для повторного использования TCP-соединения вводится понятие канала.
  • Consumer: Потребитель сообщений, представляющий клиентское приложение, которое получает сообщения из очереди сообщений.
  • Virtual Host: виртуальный хост, представляющий набор обменов, очередей сообщений и связанных объектов. Виртуальные хосты — это отдельные серверные домены, использующие одну и ту же среду аутентификации и шифрования. Каждый виртуальный хост по сути представляет собой мини-версию сервера RabbitMQ со своими собственными очередями, обменами, привязками и механизмами разрешений. vhost является основой концепции AMQP и должен быть указан при подключении.Vhost по умолчанию для RabbitMQ — / .
  • Broker: представляет сущность сервера очереди сообщений.

В-четвертых, рабочий механизм RabbitMQ

Маршруты сообщения в AMQP

Процесс маршрутизации сообщений AMQP и Java-разработчики знакомы с JMS, есть некоторые различия, AMQP добавлен к роли Exchange и Binding. Производители помещают сообщения, размещенные на бирже, в очередь сообщений и в конечном итоге достигают получателя, а обязательное решение о переключении сообщения должно быть отправлено в очередь.

Тип обмена

Exchange распределяет сообщения в соответствии с различными стратегиями распространения различных типов. В настоящее время существует четыре типа: прямое, разветвление, тема и заголовки. Заголовки соответствуют заголовку сообщения AMQP, а не ключу маршрутизации.Обмен заголовками точно такой же, как и прямой обмен, но производительность намного хуже, и он почти не используется в настоящее время, поэтому давайте посмотрим на остальные три типа. напрямую:

Пять, установка RabbitMQ

Мы используем докер для установки RabbitMQ.
Мы выбираем официальную последнюю версию с интерфейсом управления на Docker Hub.

#获取rabbitmq镜像
docker pull rabbitmq:3-management
#启动 rabbitmq镜像,5672是mq通信端口,15672是mq的web管理界面端口
run -d -p 5672:5672 -p 15672:15672 --name myrabbitmq 镜像ID

Посетите 127.0.0.1:15672, войдите в систему с учетной записью: guest пароль: guest, интерфейс выглядит следующим образом:

Подробное использование rabbitmq здесь не объясняется здесь. Фокус нашего раздела - интегрировать rabbitmq.

6. Интегрируйте RabbitMQ

Создайте проект для внедрения зависимостей rabbitmq.

1. pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<groupId>com.gf</groupId>
	<artifactId>springboot-rabbitmq</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<packaging>jar</packaging>

	<name>springboot-rabbitmq</name>
	<description>Demo project for Spring Boot</description>

	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.0.5.RELEASE</version>
		<relativePath/> <!-- lookup parent from repository -->
	</parent>

	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
		<java.version>1.8</java.version>
	</properties>

	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-amqp</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>

</project>

2. MyAMQPConfig

package com.gf.config;

import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.amqp.support.converter.MessageConverter;

/**
 * 自定义消息转换器,默认是jdk的序列化转换器,我们自定义为json的
 */
@Configuration
public class MyAMQPConfig {

    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }

}

3. тестовый класс весенней загрузки

Мы тестируем создание конфигурации администратора, отправку сообщения, получение сообщения

package com.gf;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringbootRabbitmqApplicationTests {

	@Autowired
	RabbitTemplate rabbitTemplate;

	@Autowired
	AmqpAdmin amqpAdmin;

	@Test
	public void contextLoads() {
	}

	@Test
	public void create(){
		//创建Exchange
		amqpAdmin.declareExchange( new DirectExchange( "exchange.direct") );
		amqpAdmin.declareExchange( new FanoutExchange( "exchange.fanout") );
		amqpAdmin.declareExchange( new TopicExchange( "exchange.topic") );

		//创建Queue
		amqpAdmin.declareQueue( new Queue( "direct.queue" , true ) );
		amqpAdmin.declareQueue( new Queue( "fanout.queue" , true ) );

		//绑定Queue
		amqpAdmin.declareBinding( new Binding( "direct.queue" , Binding.DestinationType.QUEUE , "exchange.direct" , "direct.queue" , null ) );
		amqpAdmin.declareBinding( new Binding( "fanout.queue" , Binding.DestinationType.QUEUE , "exchange.direct" , "fanout.queue" , null ) );
		amqpAdmin.declareBinding( new Binding( "direct.queue" , Binding.DestinationType.QUEUE , "exchange.fanout" , "" , null ) );
		amqpAdmin.declareBinding( new Binding( "fanout.queue" , Binding.DestinationType.QUEUE , "exchange.fanout" , "" , null ) );
		amqpAdmin.declareBinding( new Binding( "direct.queue" , Binding.DestinationType.QUEUE , "exchange.topic" , "direct.#" , null ) );
		amqpAdmin.declareBinding( new Binding( "fanout.queue" , Binding.DestinationType.QUEUE , "exchange.topic" , "direct.*" , null ) );


	}

	@Test
	public void send2Direct() {
		Map<String , Object> map = new HashMap<>();
		map.put( "msg" , "这是一条点对点消息" );
		map.put( "data" , Arrays.asList("helloworld" , 123 , true) );

		rabbitTemplate.convertAndSend( "exchange.direct" , "direct.queue" , map );

	}

	@Test
	public void send2Topic() {
		Map<String , Object> map = new HashMap<>();
		map.put( "msg" , "这是一条广播消息" );
		map.put( "data" , Arrays.asList("topic消息" , 123 , true) );

		rabbitTemplate.convertAndSend( "exchange.fanout" , "", map );

	}

	@Test
	public void receive() {
		Object o = rabbitTemplate.receiveAndConvert( "direct.queue" );
		o.getClass();
		System.out.println(o.getClass());
		System.out.println(o);
	}

}

слушать сообщения

4. класс запуска

package com.gf;

import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 * 自动配置
 * 1. RabbitAutoConfiguration
 * 2. 自动配置了连接工厂ConnectionFactory
 * 3. RabbitProperties 封装了RabbitMQ的配置
 * 4. RabbitTemplate : 给RabbitMQ发送和接受消息
 * 5. AmqpAdmin : RabbitMQ系统管理功能组件
 * 6. @EnableRabbit + @RabbitListener
 */
@EnableRabbit
@SpringBootApplication
public class SpringbootRabbitmqApplication {

	public static void main(String[] args) {
		SpringApplication.run(SpringbootRabbitmqApplication.class, args);
	}
}

5. MQService

package com.gf.service;


import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

@Service
public class MQService {

    @RabbitListener(queues = "fanout.queue")
    public void receive(Message message) {
        System.out.println("收到消息 : " + new String(message.getBody()));

    }

}

Загрузка исходного кода:GitHub.com/Kung Fu-Change сенсорный экран…