Режим рабочей очереди (очередь задач) | Серия RabbitMQ (2)

задняя часть RabbitMQ
Режим рабочей очереди (очередь задач) | Серия RabbitMQ (2)

Это седьмой день моего участия в августовском испытании обновлений, подробности о мероприятии:Испытание августовского обновления


Статьи по Теме

Краткое изложение серии RabbitMQ:Серия RabbitMQ


предисловие

  • Основная идея очереди работ (она же очередь задач) состоит в том, чтобы избежать немедленного выполнения ресурсоемкой задачи и ожидания ее завершения.

  • Вместо этого мы планируем выполнение задачи позже. Мы инкапсулируем задачу в виде сообщения и отправляем в очередь.

  • Рабочий процесс, работающий в фоновом режиме, удалит задачу и в конечном итоге выполнит задание.

  • При наличии нескольких рабочих потоков эти рабочие потоки будут обрабатывать эти задачи вместе.

  • Как будущий старший программист, мы сначала оптимизируем предыдущий простой режим очереди.

    • Это наш производитель и потребитель, очевидно, много дублирующего кода.
    • image-20210802140857994.png
  • оптимизация

    • Сначала извлеките имя очереди
    • public class QueueNameConstant {
      ​
          //简单队列模式
          public static final String JIANDAN_MODEL = "dayu";
      ​
          //工作队列模式
          public static final String WORK_MODEL = "work_model";
      }
      
    • Извлечь фабрику соединений, установить соединение, установить канал
    • /**
       * 公用部分创建工具类
       *@author DingYongJun
       *@date 2021/8/1
       */
      public class RabbitMqUtils {
          //得到一个连接的 channel
          public static Channel getChannel() throws Exception {
              //创建一个连接工厂
              ConnectionFactory factory = new ConnectionFactory();
              factory.setHost("ip地址");
              factory.setUsername("admin");
              factory.setPassword("111111");
              Connection connection = factory.newConnection();
              Channel channel = connection.createChannel();
              return channel;
          }
      }
      
    • Примерно так устроено
    • image-20210802141547690.png
  • Приятно! Тогда используйте его, как хотите!

1. Производители

  • /**
     * 这是一个测试的生产者
     *@author DingYongJun
     *@date 2021/8/1
     */
    public class DyProducerTest_02 {
        /**
         * 这里为了方便,我们使用main函数来测试
         * 纯属看你个人选择
         * @param args
         */
        public static void main(String[] args) throws Exception{
            //使用工具类来创建通道
            Channel channel = RabbitMqUtils.getChannel();
    ​
            /**
             * 生成一个队列
             * 1.队列名称
             * 2.队列里面的消息是否持久化 默认消息存储在内存中
             * 3.该队列是否只供一个消费者进行消费 是否进行共享 true 可以多个消费者消费
             * 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除
             * 5.其他参数
             */
            channel.queueDeclare(QueueNameConstant.WORK_MODEL,false,false,false,null);
    ​
    ​
            /**
             * 发送一个消息
             * 1.发送到那个交换机
             * 2.路由的 key 是哪个
             * 3.其他的参数信息
             * 4.发送消息的消息体
             */
            for (int i=0;i<6;i++){
                String message="我是生产者,我告诉你一个好消息!"+i;
                Thread.sleep( 1000 );
                channel.basicPublish("",QueueNameConstant.WORK_MODEL,null,message.getBytes());
                System.out.println("消息发送完毕");
            }
        }
    ​
    }
    

2. Потребители

  • для тестирования轮训分发消息, здесь мы создаем двух потребителей для потребления сообщений.

  • потребитель А

  • /**
     * 这是一个测试的消费者
     *@author DingYongJun
     *@date 2021/8/1
     */
    public class DyConsumerTest_02 {
    ​
        public static void main(String[] args) throws Exception{
            //使用工具类来创建通道
            Channel channel = RabbitMqUtils.getChannel();
    ​
            System.out.println("我是消费者A,我在等待接收消息!");
            DeliverCallback deliverCallback = (String var1, Delivery var2)->{
                String message= new String(var2.getBody());
                System.out.println(message);
            };
            CancelCallback cancelCallback = (String var1)->{
                System.out.println("消息消费被中断");
            };
    ​
            /**
             * 消费者消费消息
             * 1.消费哪个队列
             * 2.消费成功之后是否要自动应答 true 代表自动应答 false 手动应答
             * 3.消费者未成功消费的回调
             */
            Thread.sleep(1000);
            channel.basicConsume(QueueNameConstant.WORK_MODEL,true,deliverCallback,cancelCallback);
        }
    }
    
  • Потребительский код B остался прежним, больше не нужно вставлять его, чтобы занимать место!

  • Запустите обоих потребителей A и B

    • image-20210802150012042.png
  • Запустите производителя, отправьте шесть сообщений

    • image-20210802150050717.png
  • потребитель А

    • image-20210802150119853.png
  • потребитель Б

    • image-20210802150129998.png
  • Результат очевиден, один для вас, один для меня и один для всех очень упорядоченно! Не было ни грабежа, ничего. Хахаха

3. Резюме

  • 222.png

  • Распределение с опросом заключается в отправке сообщений в очереди сообщений всем потребителям по очереди. Сообщение может быть получено только одним потребителем.

  • Функции

    • Сообщение может быть получено только одним потребителем.
    • Очередь использует циклический метод для равномерной отправки сообщений потребителям.
    • Потребители не получат следующее сообщение, пока не обработают определенное сообщение.
  • производственная сторона

    • объявить очередь
    • Создать соединение
    • Создать канал
    • очередь объявления канала
    • сформулировать сообщение
  • потребитель

    • объявить очередь
    • Создать соединение
    • Создать канал
    • очередь объявления канала
    • Переопределить метод потребления сообщений
    • выполнить метод сообщения

Впереди долгий путь, и я обязательно буду его искать вдоль и поперёк~

Если вы думаете, что я блогеры хорошо пишу! Писать нелегко, пожалуйста, ставьте лайки, подписывайтесь и комментируйте, чтобы поощрять блоггеров ~ хахах