Spring Boot (тринадцать): Spring Boot интегрирует RabbitMQ.

Spring Boot

1. Введение

RabbitMQ — это очередь сообщений. Когда речь идет об очередях сообщений, вы, возможно, более или менее слышали о ней. Его основная функция — реализовать асинхронность и развязку сервисов приложений, а также он может играть роль пикового бритья и распределения сообщений. . . .

Одной из основных функций очередей сообщений является отделение служб приложений. Сообщения передаются от производителей сообщений в очереди сообщений. Потребители получают сообщения из очередей сообщений и потребляют их. Производителям не нужно заботиться о том, кто потребляет. обратить внимание на того, кто создал сообщение. В распределенных системах очереди сообщений также используются в других местах, например, для поддержки распределенных транзакций, таких как RocketMQ с открытым исходным кодом от Alibaba.

Конечно, главный герой нашей статьи по-прежнему RabbitMQ.

2. Введение в RabbitMQ

RabbitMQ — это своего рода промежуточное программное обеспечение для сообщений, которое реализует AMQP (Advanced Message Queuing Protocol). Первоначально оно возникло в финансовой системе и используется для хранения и пересылки сообщений в распределенных системах. Оно хорошо работает с точки зрения простоты использования, масштабируемости и высокой производительности. доступность. . RabbitMQ в основном реализован для достижения двунаправленной развязки между системами. Когда производитель производит большое количество данных, потребитель не может их быстро использовать, поэтому необходим промежуточный слой. Сохраните эти данные.

AMQP, Advanced Message Queuing Protocol, является открытым стандартом для протоколов прикладного уровня, разработанным для промежуточного программного обеспечения, ориентированного на сообщения. Промежуточное программное обеспечение сообщений в основном используется для разделения компонентов, отправителю сообщения не нужно знать о существовании потребителя сообщения, и наоборот. Основными функциями AMQP являются ориентированность на сообщения, организация очередей, маршрутизация (включая двухточечную связь и публикацию/подписку), надежность и безопасность.

RabbitMQ — это реализация AMQP с открытым исходным кодом, серверная часть написана на языке Erlang, поддерживает множество клиентов, таких как: Python, Ruby, .NET, Java, JMS, C, PHP, ActionScript, XMPP, STOMP и т. д., поддерживает АЯКС. Он используется для хранения и пересылки сообщений в распределенных системах и хорошо работает с точки зрения простоты использования, масштабируемости и высокой доступности.

3. Введение концепции

При проектировании обычных очередей сообщений обычно существует несколько концепций: производители, потребители и наши очереди. Однако в RabbitMQ добавляется слой посередине, который называется Exchange, таким образом, доставка сообщения не определяется производителем, в какую очередь доставлять, а сообщение доставляется непосредственно на коммутатор, который определяется коммутатором в соответствии с политикой планирования Очередь, в которую должно быть доставлено сообщение. Как показано на рисунке:

  • P слева представляет производителя сообщения
  • Фиолетовый X для переключателя
  • Красная представительная очередь справа

4. Обмен

Так зачем же нам нужен Exchange вместо отправки сообщений прямо в очередь?

Основная идея протокола AMQP заключается в разделении производителей и потребителей, и производители никогда не отправляют сообщения непосредственно в очередь. Производитель обычно не знает, будет ли сообщение отправлено в очередь, он просто отправляет сообщение на биржу. Он сначала принимается Exchange, а затем пересылается Exchange в Queue для хранения в соответствии с определенной политикой.

Когда Exchange получает сообщение, как он узнает, в какую очередь его отправить? Здесь вам нужно понять концепции Binding и RoutingKey:

Binding представляет связь между Exchange и Queue.Мы также можем просто думать, что очередь заинтересована в сообщениях на exchange.Binding может сопровождаться дополнительным параметром RoutingKey. Exchange сопоставляет этот RoutingKey со всеми привязками привязки текущего Exchange.Если соответствие удовлетворено, он отправит сообщение в очередь, привязанную к Exchange, что решает проблему, связанную с отправкой сообщения RabbitMQ один раз, которое можно распространять. в разные очереди. Значение RoutingKey зависит от типа коммутатора.

Давайте рассмотрим три основных типа обмена: Fanout, Direct и Topic.

4.1 Direct Exchange

Direct Exchange — это Exchange по умолчанию RabbitMQ, маршрутизирующий сообщения полностью на основе RoutingKey. При установке Binding Exchange и Queue необходимо указать RoutingKey (обычно это Queue Name) и указать тот же RoutingKey при отправке сообщения, и сообщение будет перенаправлено в соответствующую Queue.

4.2 Topic Exchange

Подобно Direct Exchange, Topic Exchange также должен маршрутизировать сообщения через RoutingKey, разница в том, что Direct Exchange точно соответствует RoutingKey, в то время как Topic Exchange поддерживает нечеткое соответствие. поддержка отдельно*и#подстановочный знак,*Представляет совпадение,#Это означает, что ни одно или несколько слов не совпадают.

4.3 Headers Exchange

Обмен заголовками игнорирует RoutingKey и определяет, к каким очередям следует маршрутизировать, на основе заголовков в сообщении и аргументов, указанных при создании отношения привязки.

Производительность Headers Exchange относительно низкая, и Direct Exchange может полностью заменить его, поэтому использовать его не рекомендуется.

4.4 Default Exchange

Обмен по умолчанию — это особый вид прямого обмена. Когда вы вручную создаете очередь, фон автоматически привязывает очередь к пустому Direct Exchange, а связывающий RoutingKey совпадает с именем очереди. С этим переключателем и привязкой по умолчанию нас интересует только уровень очереди, который больше подходит для некоторых простых приложений.

5. Spring Boot интегрирует RabbitMQ

Интегрировать RabbitMQ со Spring Boot очень просто.Если он просто используется с очень небольшой настройкой, Spring Boot предоставляетspring-boot-starter-amqpПроект для поддержки различных сообщений.

5.1 Простота использования

импортировать зависимости

Листинг кода: spring-boot-rabbitmq/pom.xml***

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

Конфигурационный файл application.yml выглядит следующим образом:

Листинг кода: spring-boot-rabbitmq/src/main/resources/application.yml***

server:
  port: 8080
spring:
  application:
    name: spring-boot-rabbitmq
  rabbitmq:
    host: localhost
    port: 5672
    username: admin
    password: admin

конфигурация очереди

Листинг кода: spring-boot-rabbitmq/src/main/java/com/springboot/springbootrabbitmq/config/QueueConfig.java***

@Configuration
public class QueueConfig {
    @Bean
    public Queue simpleQueue() {
        return new Queue("simple");
    }

    @Bean
    public Queue simpleOneToMany() {
        return new Queue("simpleOneToMany");
    }
}

поставщик сообщений

Листинг кода: spring-boot-rabbitmq/src/main/java/com/springboot/springbootrabbitmq/simple/SimpleSend.java***

@Component
public class SimpleSend {

    private final Logger logger = LoggerFactory.getLogger(this.getClass());
  
    @Autowired
    private AmqpTemplate amqpTemplate;

    public void send() {
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        String message = "Hello Spring Boot " + simpleDateFormat.format(new Date());
        amqpTemplate.convertAndSend("simple", message);
        logger.info("消息推送成功!");
    }
}

потребитель сообщений

Листинг кода: spring-boot-rabbitmq/src/main/java/com/springboot/springbootrabbitmq/simple/SimpleReceive.java***

@Component
@RabbitListener(queues = "simple")
public class SimpleReceive {

    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    @RabbitHandler
    public void process(String message) {
        logger.info("Receive :{}", message);
    }

}

контрольная работа

Листинг кода: spring-boot-rabbitmq/src/test/java/com/springboot/springbootrabbitmq/DemoApplicationTests.java***

@RunWith(SpringRunner.class)
@SpringBootTest
public class DemoApplicationTests {

    @Autowired
    SimpleSend simpleSend;

    @Test
    public void simpleSend() {
        simpleSend.send();
    }

}

Использование 5,2-ко-многим

Что произойдет, если есть один производитель сообщений, а также потребители сообщений?

Небольшие изменения в приведенном выше коде для добавления потребителя сообщений.

Код теста выглядит следующим образом:

@Test
public void simpleOneSend() {
    for (int i = 0; i < 100; i ++) {
        simpleManySend.send(i);
    }
}

Тест показывает, что в результате два потребителя в среднем потребляют сообщения, созданные производителем.

5.3 Использование «многие ко многим»

Давайте добавим еще одного производителя сообщений, и тестовый код выглядит следующим образом:

@Test
public void simpleManySend() {
    for (int i = 0; i < 100; i ++) {
        simpleManySend.send(i);
        simpleManySend1.send(i);
    }
}

Тест показывает, что в результате два потребителя в среднем потребляют сообщения, созданные двумя производителями.

5.4 Topic Exchange

Сначала настройте тему, код конфигурации выглядит следующим образом:

Листинг кода: spring-boot-rabbitmq/src/main/java/com/springboot/springbootrabbitmq/config/TopicConfig.java***

@Configuration
public class TopicConfig {

    private final String message = "topic.message";
    private final String messages = "topic.messages";

    @Bean
    public Queue queueMessage() {
        return new Queue(this.message);
    }

    @Bean
    public Queue queueMessages() {
        return new Queue(this.messages);
    }

    @Bean
    TopicExchange exchange() {
        return new TopicExchange("topicExchange");
    }

    @Bean
    Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) {
        return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");
    }

    @Bean
    Binding bindingExchangeMessages(Queue queueMessages, TopicExchange exchange) {
        return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");
    }
}

Здесь очередь queueMessages может одновременно совпадать с двумя ключами route_key, а очередь queueMessage — только с тематикой.message.

Код производителя сообщения выглядит следующим образом:

Листинг кода: spring-boot-rabbitmq/src/main/java/com/springboot/springbootrabbitmq/topic/TopicSend.java***

@Component
public class TopicSend {

    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send1() {
        String message = "message 1";
        logger.info("send:{}", message);
        rabbitTemplate.convertAndSend("topicExchange", "topic.message", message);
    }

    public void send2() {
        String message = "message 2";
        logger.info("send:{}", message);
        rabbitTemplate.convertAndSend("topicExchange", "topic.messages", message);
    }
}

Вызовы send1() будут пересылать сообщения в обе очереди Exchange, а вызовы send2() будут пересылаться только в Receive2.

5.5 Fanout Exchange

Fanout — это знакомый нам широковещательный режим или режим подписки, который отправляет сообщение на коммутатор Fanout, и все очереди, привязанные к этому коммутатору, получают это сообщение.

Fanout настраивается следующим образом:

Листинг кода: spring-boot-rabbitmq/src/main/java/com/springboot/springbootrabbitmq/config/FanoutConfig.java***

@Configuration
public class FanoutConfig {
    @Bean
    public Queue MessageA() {
        return new Queue("fanout.A");
    }

    @Bean
    public Queue MessageB() {
        return new Queue("fanout.B");
    }

    @Bean
    public Queue MessageC() {
        return new Queue("fanout.C");
    }

    @Bean
    FanoutExchange fanoutExchange() {
        return new FanoutExchange("fanoutExchange");
    }

    @Bean
    Binding bindingExchangeA(Queue MessageA, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(MessageA).to(fanoutExchange);
    }

    @Bean
    Binding bindingExchangeB(Queue MessageB, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(MessageB).to(fanoutExchange);
    }

    @Bean
    Binding bindingExchangeC(Queue MessageC, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(MessageC).to(fanoutExchange);
    }
}

Код производителя сообщения выглядит следующим образом:

Листинг кода: spring-boot-rabbitmq/src/main/java/com/springboot/springbootrabbitmq/fanout/FanoutSend.java***

@Component
public class FanoutSend {

    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send() {
        String message = "Hello FanoutSend.";
        logger.info("send:{}", message);
        this.rabbitTemplate.convertAndSend("fanoutExchange","", message);
    }
}

Код теста выглядит следующим образом:

Листинг кода: spring-boot-rabbitmq/src/test/java/com/springboot/springbootrabbitmq/DemoApplicationTests.java***

@Test
public void fanoutSend() {
    fanoutSend.send();
}

Результатом теста является то, что все очереди, привязанные к коммутатору разветвления, получили сообщения.

6. Пример кода

Пример кода — Гитхаб

Пример кода — Gitee

7. Ссылка

http://www.ityouknow.com/springboot/2016/11/30/spring-boot-rabbitMQ.html

https://blog.csdn.net/y4x5M0nivSrJaY3X92c/article/details/80416996

Если моя статья была вам полезна, отсканируйте код и подпишитесь на официальный аккаунт автора: Получите последние новости о галантерейных товарах :)