Режим простой очереди (точка-точка) | Серия RabbitMQ (1)

задняя часть RabbitMQ
Режим простой очереди (точка-точка) | Серия RabbitMQ (1)

Это 6-й день моего участия в августовском испытании обновлений.Подробности о мероприятии:Испытание августовского обновления


Статьи по Теме

Краткое изложение серии RabbitMQ:Серия RabbitMQ


предисловие

  • Что такое очередь сообщений?

    • Сообщение (Message) относится к данным, передаваемым между приложениями. Сообщения могут быть очень простыми, например, содержать только текстовые строки, или более сложными, возможно, содержащими встроенные объекты.
    • Очередь сообщений (Message Queue) — это метод связи между приложениями.После того как сообщение отправлено, оно может быть немедленно возвращено, а система сообщений может обеспечить надежную доставку сообщения. Издателю сообщения нужно только опубликовать сообщение в MQ, и ему все равно, кто его получит, а потребителю сообщения нужно только получить сообщение от MQ, независимо от того, кто его публикует. Таким образом, ни издатель, ни пользователь не должны знать о существовании друг друга.
  • Зачем использовать очереди сообщений?

    • Основная причина в том, что в среде с высокой степенью параллелизма запросы часто блокируются из-за отсутствия времени на синхронную обработку, например, в MySQL одновременно поступает большое количество запросов на вставку, обновление и другие запросы, что напрямую приводит к бесчисленные блокировки строк и таблиц, и даже окончательный запрос будет заблокирован.Накопление слишком много, что вызывает ошибку слишком много соединений. Используя очереди сообщений, мы можем обрабатывать запросы асинхронно, что снижает нагрузку на систему.
  • Особенности RabbitMQ

    • Надежность
      • RabbitMQ использует некоторые механизмы для обеспечения надежности, такие как постоянство, подтверждение передачи и подтверждение публикации.
    • Гибкая маршрутизация
      • Сообщения направляются через Exchange до того, как они попадут в очередь. Для типичной функциональности маршрутизации RabbitMQ уже предоставляет некоторые встроенные возможности Exchange для реализации. Для более сложных функций маршрутизации несколько обменов могут быть связаны вместе, а их собственные обмены также могут быть реализованы с помощью механизма подключаемых модулей.
    • Кластеры сообщений (кластеризация)
      • Несколько серверов RabbitMQ могут образовывать кластер для формирования логического брокера.
    • Высокодоступные очереди
      • Очереди можно зеркально отразить на компьютерах в кластере, чтобы очереди оставались доступными в случае сбоя некоторых узлов.
    • Мультипротокол
      • RabbitMQ поддерживает различные протоколы очередей сообщений, такие как STOMP, MQTT и другие.
    • Много клиентов
      • RabbitMQ поддерживает практически все распространенные языки, такие как Java, .NET, Ruby и т. д.
    • Интерфейс управления
      • RabbitMQ предоставляет простой в использовании пользовательский интерфейс, который позволяет пользователям отслеживать и управлять многими аспектами брокера сообщений.
    • Отслеживание
      • Если сообщение является ненормальным, RabbitMQ предоставляет механизм отслеживания сообщения, потребитель может узнать, что произошло.
    • Система плагинов (Система плагинов)
      • RabbitMQ предоставляет множество плагинов, которые можно расширять разными способами, или вы можете написать свои собственные плагины.
  • Концепция вещей звучит загадочно, что ж, давайте начнем наше путешествие по изучению RabbitMQ с самого простого режима.

Простой режим очереди (одноранговый режим)

  • Создайте новый проект maven, примерная структура каталогов выглядит следующим образом.

    • image-20210802131330986.png
  • POM импорт JAR.

    • <!--指定 jdk 编译版本-->
          <build>
              <plugins>
                  <plugin>
                      <groupId>org.apache.maven.plugins</groupId>
                      <artifactId>maven-compiler-plugin</artifactId>
                      <configuration>
                          <source>8</source>
                          <target>8</target>
                      </configuration>
                  </plugin>
              </plugins>
          </build>
          <dependencies>
              <!--rabbitmq 依赖客户端-->
              <dependency>
                  <groupId>com.rabbitmq</groupId>
                  <artifactId>amqp-client</artifactId>
                  <version>5.8.0</version>
              </dependency>
              <!--操作文件流的一个依赖-->
              <dependency>
                  <groupId>commons-io</groupId>
                  <artifactId>commons-io</artifactId>
                  <version>2.6</version>
              </dependency>
          </dependencies>
      

    1. Производители

    • package com.dy.producer;
      
      import com.rabbitmq.client.Channel;
      import com.rabbitmq.client.Connection;
      import com.rabbitmq.client.ConnectionFactory;
      
      /**
       * 这是一个测试的生产者
       *@author DingYongJun
       *@date 2021/8/1
       */
      public class DyProducerTest_01 {
          public static final String Queue_name = "dayu";
      
          /**
           * 这里为了方便,我们使用main函数来测试
           * 纯属看你个人选择
           * @param args
           */
          public static void main(String[] args) throws Exception{
              //创建一个连接工厂
              ConnectionFactory factory = new ConnectionFactory();
              factory.setHost("120.48.29.41");
              factory.setUsername("admin");
              factory.setPassword("111111");
      
              //创建一个新连接
              Connection connection = factory.newConnection();
              //创建一个通道 channel 实现了自动 close 接口 自动关闭 不需要显示关闭
              Channel channel = connection.createChannel();
      
              /**
               * 生成一个队列
               * 1.队列名称
               * 2.队列里面的消息是否持久化 默认消息存储在内存中
               * 3.该队列是否只供一个消费者进行消费 是否进行共享 true 可以多个消费者消费
               * 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除
               * 5.其他参数
               */
              channel.queueDeclare(Queue_name,false,false,false,null);
              String message="我是生产者,我告诉你一个好消息!";
      
              /**
               * 发送一个消息
               * 1.发送到那个交换机
               * 2.路由的 key 是哪个
               * 3.其他的参数信息
               * 4.发送消息的消息体
               */
              channel.basicPublish("",Queue_name,null,message.getBytes());
              System.out.println("消息发送完毕");
          }
      }
      
    • Выполните его, чтобы увидеть, можно ли успешно отправить сообщение в очередь.

      • image-20210802131620133.png
    • Посетите страницу управления RabbitMQ.

      • image-20210802131719159.png
    • Докажите, что производитель был успешно установлен и успешно отправил сообщение.

2. Потребители

  • package com.dy.consumer;
    
    import com.rabbitmq.client.*;
    
    /**
     * 这是一个测试的消费者
     *@author DingYongJun
     *@date 2021/8/1
     */
    public class DyConsumerTest_01 {
        public static final String Queue_name = "dayu";
    
        public static void main(String[] args) throws Exception{
            //创建一个连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("120.48.29.41");
            factory.setUsername("admin");
            factory.setPassword("111111");
    
            //建立连接
            Connection connection = factory.newConnection();
            //建立通道
            Channel channel = connection.createChannel();
    
            System.out.println("我是消费者,我在等待接收消息!");
            DeliverCallback deliverCallback = (String var1, Delivery var2)->{
                String message= new String(var2.getBody());
                System.out.println(message);
            };
            CancelCallback cancelCallback = (String var1)->{
                System.out.println("消息消费被中断");
            };
    
            /**
             * 消费者消费消息
             * 1.消费哪个队列
             * 2.消费成功之后是否要自动应答 true 代表自动应答 false 手动应答
             * 3.消费者未成功消费的回调
             */
            channel.basicConsume(Queue_name,true,deliverCallback,cancelCallback);
        }
    }
    
  • Объяснение, DeliverCallback и CancelCallback

    • Посмотрите, какие параметры необходимы

    • image-20210802132008385.png

    • Взгляните на DeliverCallback, который является типичным функциональным интерфейсом, поэтому мы можем использовать лямбда-выражения для создания его объектов.

      • import java.io.IOException;
        
        @FunctionalInterface
        public interface DeliverCallback {
            void handle(String var1, Delivery var2) throws IOException;
        }
        
    • CancelCallback — то же самое

      • import java.io.IOException;
        
        @FunctionalInterface
        public interface CancelCallback {
            void handle(String var1) throws IOException;
        }
        
  • Запустите потребителя, чтобы увидеть, может ли он получить сообщение из указанной очереди.

    • image-20210802132237803.png
    • Сообщение было успешно использовано.
  • Посетите страницу управления RabbitMQ.

    • image-20210802132334611.png
    • Сообщение равно нулю, что доказывает, что сообщение было успешно использовано!

3. Резюме

  • Простейшая рабочая очередь с одним производителем сообщений, одним потребителем сообщений и одной очередью. также известен как点对点模式.

  • 111.png

  • Грубые шаги продюсера

    • присоединиться
    • Создать канал
    • Создать объявление очереди
    • отправлять сообщения
    • закрыть очередь
  • Общие шаги для потребителей

    • присоединиться
    • получить канал
    • очередь мониторинга
    • Использование сообщений
  • Что ж, после прочтения этого простого режима очереди у меня есть общее представление о публикации и подписке mq!


Впереди долгий путь, и я обязательно буду его искать вдоль и поперёк~

Если вы думаете, что я блогеры хорошо пишу! Писать нелегко, пожалуйста, ставьте лайки, подписывайтесь и комментируйте, чтобы поощрять блоггеров ~ хахах