Это 10-й день моего участия в августовском испытании обновлений. Узнайте подробности события:Испытание августовского обновления
Статьи по Теме
Краткое изложение серии RabbitMQ:Серия RabbitMQ
предисловие
-
В самом начале мы изучили циклическое распределение, которое RabbitMQ использует для распространения сообщений, но эта стратегия не очень хороша в определенных сценариях.
-
Например, есть два потребителя, обрабатывающие задачи. Один из потребителей A обрабатывает задачи очень быстро, а скорость обработки другого потребителя B очень низкая. В настоящее время мы все еще используем циклическое распределение. Быстрый потребитель бездействует большую часть времени, в то время как медленный потребитель всегда работает.
-
Этот метод распределения на самом деле не очень хорош в данном случае, но RabbitMQ не знает об этой ситуации и все равно распределяет справедливо.
-
В настоящее время мы можем использовать несправедливое распределение для достижения этого, что является способом увеличения труда для тех, кто способен! Вы можете сделать это, о, тогда вы можете сделать больше работы. Я больше ем овощи, поэтому меньше работаю.
1. Несправедливое распределение
-
режиссер
-
/** * 这是一个测试的生产者 *@author DingYongJun *@date 2021/8/1 */ public class DyProducerTest_xiaoxiyingda { /** * 这里为了方便,我们使用main函数来测试 * 纯属看你个人选择 * @param args */ public static void main(String[] args) throws Exception{ //使用工具类来创建通道 Channel channel = RabbitMqUtils.getChannel(); /** * 生成一个队列 * 1.队列名称 * 2.队列里面的消息是否持久化 默认消息存储在内存中 * 3.该队列是否只供一个消费者进行消费 是否进行共享 true 可以多个消费者消费 * 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除 * 5.其他参数 */ //这是持久化的参数。false不进行持久化,true进行持久化 boolean durable = true; channel.queueDeclare(QueueNameConstant.XIAOXIYINGDA_MODEL,durable,false,false,null); /** * 发送一个消息 * 1.发送到那个交换机 * 2.路由的 key 是哪个 * 3.其他的参数信息 * 4.发送消息的消息体 */ Scanner sc = new Scanner(System.in); System.out.println("请输入信息"); while (sc.hasNext()) { String message = sc.nextLine(); //MessageProperties.PERSISTENT_TEXT_PLAIN;这个代表消息持久化到硬盘 channel.basicPublish("",QueueNameConstant.XIAOXIYINGDA_MODEL,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes()); System.out.println("生产者发出消息" + message); } } }
-
-
потребитель
-
/** * 这是一个测试的消费者 *@author DingYongJun *@date 2021/8/1 */ public class DyConsumerTest_xiaoxiyingda01 { public static void main(String[] args) throws Exception{ //使用工具类来创建通道 Channel channel = RabbitMqUtils.getChannel(); System.out.println("我是消费者A,我在等待接收消息!"); DeliverCallback deliverCallback = (String var1, Delivery var2)->{ String message= new String(var2.getBody()); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(message); //true 代表批量应答 channel 上未应答的消息 false 单条应答 boolean multiple = false; channel.basicAck(var2.getEnvelope().getDeliveryTag(),multiple); }; CancelCallback cancelCallback = (String var1)->{ System.out.println("消息消费被中断"); }; //不公平分发 int prefetchCount = 1; channel.basicQos(prefetchCount); /** * 消费者消费消息 * 1.消费哪个队列 * 2.消费成功之后是否要自动应答 true 代表自动应答 false 手动应答 * 3.消费者未成功消费的回调 */ channel.basicConsume(QueueNameConstant.XIAOXIYINGDA_MODEL,false,deliverCallback,cancelCallback); } }
-
/** * 这是一个测试的消费者 *@author DingYongJun *@date 2021/8/1 */ public class DyConsumerTest_xiaoxiyingda02 { public static void main(String[] args) throws Exception{ //使用工具类来创建通道 Channel channel = RabbitMqUtils.getChannel(); System.out.println("我是消费者B,我在等待接收消息!"); DeliverCallback deliverCallback = (String var1, Delivery var2)->{ String message= new String(var2.getBody()); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(message); //true 代表批量应答 channel 上未应答的消息 false 单条应答 boolean multiple = false; channel.basicAck(var2.getEnvelope().getDeliveryTag(),multiple); }; CancelCallback cancelCallback = (String var1)->{ System.out.println("消息消费被中断"); }; //不公平分发 int prefetchCount = 1; channel.basicQos(prefetchCount); /** * 消费者消费消息 * 1.消费哪个队列 * 2.消费成功之后是否要自动应答 true 代表自动应答 false 手动应答 * 3.消费者未成功消费的回调 */ channel.basicConsume(QueueNameConstant.XIAOXIYINGDA_MODEL,false,deliverCallback,cancelCallback); } }
-
Результаты
-
По результатам можно сделать некоторые выводы
- Очевидно, что A выполнил 7, а B выполнил 3.
- Оба потребителя создали несправедливую модель распределения.
- Когда потребитель A эффективен (то есть выполняется за одну секунду), потребитель B работает медленно (выполняется за три секунды).
- MQ автоматически определит, кто работает быстрее всех, а затем выделит больше тем, кто быстр.
- По сути, это не отдых. Когда вы закончите, идите вперед и получите новые миссии!
- Ослики производственной бригады не посмеют так протискиваться! ха-ха~
-
Это означает, что если я не закончил это задание или не ответил вам, пожалуйста, не давайте его мне, я могу выполнить только одно задание в данный момент.
-
Затем rabbitmq назначит задачу неактивному потребителю, который не так занят, конечно, если все потребители не выполнили поставленную задачу.
-
В очередь постоянно добавляются новые задачи, и очередь может столкнуться с ситуацией, что очередь переполнена, в это время можно добавлять только новых воркеров или менять стратегию других задач хранения.
-
Код установки:
-
//不公平分发 int prefetchCount = 1; channel.basicQos(prefetchCount);
-
//源码为证 public void basicQos(int prefetchCount) throws IOException { this.basicQos(0, prefetchCount, false); }
-
Значение int по умолчанию равно 0. Это не нужно объяснять. То есть, если мы его не установим, значение равно нулю.
-
Если установлено значение 1, это доказывает, что режим недобросовестного распределения включен.
-
-
2. Распределение значений предварительной выборки
-
Отправка самого сообщения является асинхронной, поэтому в любой момент на канале должно быть более одного сообщения, и ручное подтверждение от потребителя также носит асинхронный характер.
-
Итак, здесь есть буфер неподтвержденных сообщений, поэтому я надеюсь, что разработчики смогут ограничить размер этого буфера, чтобы избежать проблемы неограниченного количества неподтвержденных сообщений в буфере.
-
Конечно, это сделает пропускную способность очень низкой, особенно если задержка клиентских соединений велика, особенно в средах с высокой задержкой потребительских соединений. Немного более высокое значение будет оптимальным для большинства приложений.
-
Производитель может быть последовательным с приведенным выше примером. Нет необходимости менять
-
потребитель
-
/** * 这是一个测试的消费者 *@author DingYongJun *@date 2021/8/1 */ public class DyConsumerTest_xiaoxiyingda01 { public static void main(String[] args) throws Exception{ //使用工具类来创建通道 Channel channel = RabbitMqUtils.getChannel(); System.out.println("我是消费者A,我在等待接收消息!"); DeliverCallback deliverCallback = (String var1, Delivery var2)->{ String message= new String(var2.getBody()); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(message); //true 代表批量应答 channel 上未应答的消息 false 单条应答 boolean multiple = false; channel.basicAck(var2.getEnvelope().getDeliveryTag(),multiple); }; CancelCallback cancelCallback = (String var1)->{ System.out.println("消息消费被中断"); }; //不公平分发 int prefetchCount = 5; channel.basicQos(prefetchCount); /** * 消费者消费消息 * 1.消费哪个队列 * 2.消费成功之后是否要自动应答 true 代表自动应答 false 手动应答 * 3.消费者未成功消费的回调 */ channel.basicConsume(QueueNameConstant.XIAOXIYINGDA_MODEL,false,deliverCallback,cancelCallback); } }
-
Код потребителя B такой же, то есть время сна больше, а значение prefetchCount установлено равным 3, что удобно видеть разницу.
-
-
Результаты
-
Из вышеприведенных результатов можно сделать вывод, что
- prefetchCount, когда для этого значения установлено значение 3, это означает, что в текущем канале есть не более трех входящих каналов, и еще больше будет поставлено в очередь снаружи.
- То есть, когда мы отправили восемь частей данных, у A было 5, а у B было 3. Когда есть еще один, тот, кто съест первый, съест новый.
- Если в очереди всегда есть данные. Тогда потребители АВ всегда будут дополняться новостями. Количество сообщений, которые сохранят полное значение prefetchCount в канале.
Впереди долгий путь, и я обязательно буду его искать вдоль и поперёк~
Если вы думаете, что я блогеры хорошо пишу! Писать нелегко, пожалуйста, ставьте лайки, подписывайтесь и комментируйте, чтобы поощрять блоггеров ~ хахах