Это седьмой день моего участия в августовском испытании обновлений, подробности о мероприятии:Испытание августовского обновления
Статьи по Теме
Краткое изложение серии RabbitMQ:Серия RabbitMQ
предисловие
-
Основная идея очереди работ (она же очередь задач) состоит в том, чтобы избежать немедленного выполнения ресурсоемкой задачи и ожидания ее завершения.
-
Вместо этого мы планируем выполнение задачи позже. Мы инкапсулируем задачу в виде сообщения и отправляем в очередь.
-
Рабочий процесс, работающий в фоновом режиме, удалит задачу и в конечном итоге выполнит задание.
-
При наличии нескольких рабочих потоков эти рабочие потоки будут обрабатывать эти задачи вместе.
-
Как будущий старший программист, мы сначала оптимизируем предыдущий простой режим очереди.
- Это наш производитель и потребитель, очевидно, много дублирующего кода.
-
оптимизация
- Сначала извлеките имя очереди
-
public class QueueNameConstant { //简单队列模式 public static final String JIANDAN_MODEL = "dayu"; //工作队列模式 public static final String WORK_MODEL = "work_model"; }
- Извлечь фабрику соединений, установить соединение, установить канал
-
/** * 公用部分创建工具类 *@author DingYongJun *@date 2021/8/1 */ public class RabbitMqUtils { //得到一个连接的 channel public static Channel getChannel() throws Exception { //创建一个连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("ip地址"); factory.setUsername("admin"); factory.setPassword("111111"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); return channel; } }
- Примерно так устроено
-
Приятно! Тогда используйте его, как хотите!
1. Производители
-
/** * 这是一个测试的生产者 *@author DingYongJun *@date 2021/8/1 */ public class DyProducerTest_02 { /** * 这里为了方便,我们使用main函数来测试 * 纯属看你个人选择 * @param args */ public static void main(String[] args) throws Exception{ //使用工具类来创建通道 Channel channel = RabbitMqUtils.getChannel(); /** * 生成一个队列 * 1.队列名称 * 2.队列里面的消息是否持久化 默认消息存储在内存中 * 3.该队列是否只供一个消费者进行消费 是否进行共享 true 可以多个消费者消费 * 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除 * 5.其他参数 */ channel.queueDeclare(QueueNameConstant.WORK_MODEL,false,false,false,null); /** * 发送一个消息 * 1.发送到那个交换机 * 2.路由的 key 是哪个 * 3.其他的参数信息 * 4.发送消息的消息体 */ for (int i=0;i<6;i++){ String message="我是生产者,我告诉你一个好消息!"+i; Thread.sleep( 1000 ); channel.basicPublish("",QueueNameConstant.WORK_MODEL,null,message.getBytes()); System.out.println("消息发送完毕"); } } }
2. Потребители
-
для тестирования
轮训分发消息
, здесь мы создаем двух потребителей для потребления сообщений. -
потребитель А
-
/** * 这是一个测试的消费者 *@author DingYongJun *@date 2021/8/1 */ public class DyConsumerTest_02 { public static void main(String[] args) throws Exception{ //使用工具类来创建通道 Channel channel = RabbitMqUtils.getChannel(); System.out.println("我是消费者A,我在等待接收消息!"); DeliverCallback deliverCallback = (String var1, Delivery var2)->{ String message= new String(var2.getBody()); System.out.println(message); }; CancelCallback cancelCallback = (String var1)->{ System.out.println("消息消费被中断"); }; /** * 消费者消费消息 * 1.消费哪个队列 * 2.消费成功之后是否要自动应答 true 代表自动应答 false 手动应答 * 3.消费者未成功消费的回调 */ Thread.sleep(1000); channel.basicConsume(QueueNameConstant.WORK_MODEL,true,deliverCallback,cancelCallback); } }
-
Потребительский код B остался прежним, больше не нужно вставлять его, чтобы занимать место!
-
Запустите обоих потребителей A и B
-
Запустите производителя, отправьте шесть сообщений
-
потребитель А
-
потребитель Б
-
Результат очевиден, один для вас, один для меня и один для всех очень упорядоченно! Не было ни грабежа, ничего. Хахаха
3. Резюме
-
Распределение с опросом заключается в отправке сообщений в очереди сообщений всем потребителям по очереди. Сообщение может быть получено только одним потребителем.
-
Функции
- Сообщение может быть получено только одним потребителем.
- Очередь использует циклический метод для равномерной отправки сообщений потребителям.
- Потребители не получат следующее сообщение, пока не обработают определенное сообщение.
-
производственная сторона
- объявить очередь
- Создать соединение
- Создать канал
- очередь объявления канала
- сформулировать сообщение
-
потребитель
- объявить очередь
- Создать соединение
- Создать канал
- очередь объявления канала
- Переопределить метод потребления сообщений
- выполнить метод сообщения
Впереди долгий путь, и я обязательно буду его искать вдоль и поперёк~
Если вы думаете, что я блогеры хорошо пишу! Писать нелегко, пожалуйста, ставьте лайки, подписывайтесь и комментируйте, чтобы поощрять блоггеров ~ хахах