Это 8-й день моего участия в августовском испытании обновлений.Подробности о событии:Испытание августовского обновления
Статьи по Теме
Краткое изложение серии RabbitMQ:Серия RabbitMQ
предисловие
- Несколько вопросов, над которыми следует подумать, прежде чем начинать отвечать на сообщение
- Потребителям может потребоваться некоторое время для завершения задачи, что произойдет, если один из потребителей обрабатывает длинную задачу и только частично завершает ее, а затем внезапно умирает?
- Как только RabbitMQ доставляет сообщение потребителю, он немедленно помечает сообщение для удаления. В этом случае внезапно умирает потребитель, и мы теряем сообщение, которое обрабатывали. и последующие сообщения, отправленные потребителю, потому что он не мог их получить.
- Чтобы гарантировать, что сообщение не будет потеряно в процессе отправки, rabbitmq вводит механизм ответа на сообщение, а ответ на сообщение
- После того, как потребитель получит сообщение и обработает его, он сообщает RabbitMQ, что оно было обработано, и RabbitMQ может удалить сообщение.
1. Автоматический ответ
- Сообщение считается успешно доставленным сразу после его отправки.Этот режим требует компромисса между высокой пропускной способностью и безопасностью передачи данных, так как в этом режиме при наличии соединения на стороне потребителя или закрытии канала до сообщение получено, затем сообщение потеряно.
- Конечно, с другой стороны, потребители в этом режиме могут доставлять перегруженные сообщения, а количество доставляемых сообщений не ограничено.Конечно, это может привести к тому, что потребители будут получать слишком много сообщений, которые слишком поздно обрабатывать. , что приводит к задержке этих сообщений, в конечном итоге к нехватке памяти, и в конечном итоге эти потребительские потоки уничтожаются операционной системой.
- Таким образом, этот шаблон подходит для использования только в тех случаях, когда потребитель может эффективно и с определенной скоростью обрабатывать эти сообщения.
①, продюсер
-
/** * 这是一个测试的生产者 *@author DingYongJun *@date 2021/8/1 */ public class DyProducerTest_xiaoxiyingda { /** * 这里为了方便,我们使用main函数来测试 * 纯属看你个人选择 * @param args */ public static void main(String[] args) throws Exception{ //使用工具类来创建通道 Channel channel = RabbitMqUtils.getChannel(); /** * 生成一个队列 * 1.队列名称 * 2.队列里面的消息是否持久化 默认消息存储在内存中 * 3.该队列是否只供一个消费者进行消费 是否进行共享 true 可以多个消费者消费 * 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除 * 5.其他参数 */ channel.queueDeclare(QueueNameConstant.XIAOXIYINGDA_MODEL,false,false,false,null); /** * 发送一个消息 * 1.发送到那个交换机 * 2.路由的 key 是哪个 * 3.其他的参数信息 * 4.发送消息的消息体 */ for (int i=0;i<6;i++){ String message="我是生产者,我告诉你一个好消息!"+i; Thread.sleep( 1000 ); channel.basicPublish("",QueueNameConstant.XIAOXIYINGDA_MODEL,null,message.getBytes()); System.out.println("消息发送完毕"); } } }
②, потребители
-
/** * 这是一个测试的消费者 *@author DingYongJun *@date 2021/8/1 */ public class DyConsumerTest_xiaoxiyingda01 { public static void main(String[] args) throws Exception{ //使用工具类来创建通道 Channel channel = RabbitMqUtils.getChannel(); System.out.println("我是消费者A,我在等待接收消息!"); DeliverCallback deliverCallback = (String var1, Delivery var2)->{ String message= new String(var2.getBody()); System.out.println(message); }; CancelCallback cancelCallback = (String var1)->{ System.out.println("消息消费被中断"); }; /** * 消费者消费消息 * 1.消费哪个队列 * 2.消费成功之后是否要自动应答 true 代表自动应答 false 手动应答 * 3.消费者未成功消费的回调 */ Thread.sleep(1000); channel.basicConsume(QueueNameConstant.XIAOXIYINGDA_MODEL,true,deliverCallback,cancelCallback); } }
③、Тест
-
Результаты
-
в заключении
-
public String basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException { return this.basicConsume(queue, autoAck, "", this.consumerFromDeliverCancelCallbacks(deliverCallback, cancelCallback)); }
-
Когда autoAck истинно, это автоматический ответ
-
Когда autoAck ложно, это ручной ответ
-
-
Не рекомендуется использовать автоматический ответ. В реальных бизнес-сценариях мы обычно используем ручной ответ.
2. Ручной ответ
-
режиссер
-
/** * 这是一个测试的生产者 *@author DingYongJun *@date 2021/8/1 */ public class DyProducerTest_xiaoxiyingda { /** * 这里为了方便,我们使用main函数来测试 * 纯属看你个人选择 * @param args */ public static void main(String[] args) throws Exception{ //使用工具类来创建通道 Channel channel = RabbitMqUtils.getChannel(); /** * 生成一个队列 * 1.队列名称 * 2.队列里面的消息是否持久化 默认消息存储在内存中 * 3.该队列是否只供一个消费者进行消费 是否进行共享 true 可以多个消费者消费 * 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除 * 5.其他参数 */ channel.queueDeclare(QueueNameConstant.XIAOXIYINGDA_MODEL,false,false,false,null); /** * 发送一个消息 * 1.发送到那个交换机 * 2.路由的 key 是哪个 * 3.其他的参数信息 * 4.发送消息的消息体 */ Scanner sc = new Scanner(System.in); System.out.println("请输入信息"); while (sc.hasNext()) { String message = sc.nextLine(); channel.basicPublish("",QueueNameConstant.XIAOXIYINGDA_MODEL,null,message.getBytes()); System.out.println("生产者发出消息" + message); } } }
-
-
потребитель А
-
/** * 这是一个测试的消费者 *@author DingYongJun *@date 2021/8/1 */ public class DyConsumerTest_xiaoxiyingda01 { public static void main(String[] args) throws Exception{ //使用工具类来创建通道 Channel channel = RabbitMqUtils.getChannel(); System.out.println("我是消费者A,我在等待接收消息!"); DeliverCallback deliverCallback = (String var1, Delivery var2)->{ String message= new String(var2.getBody()); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(message); //true 代表批量应答 channel 上未应答的消息 false 单条应答 boolean multiple = false; channel.basicAck(var2.getEnvelope().getDeliveryTag(),multiple); }; CancelCallback cancelCallback = (String var1)->{ System.out.println("消息消费被中断"); }; /** * 消费者消费消息 * 1.消费哪个队列 * 2.消费成功之后是否要自动应答 true 代表自动应答 false 手动应答 * 3.消费者未成功消费的回调 */ channel.basicConsume(QueueNameConstant.XIAOXIYINGDA_MODEL,false,deliverCallback,cancelCallback); } }
-
Установите ручной ответ и уменьшите время ожидания, что означает, что бизнес обрабатывается очень быстро.
-
-
потребитель Б
-
/** * 这是一个测试的消费者 *@author DingYongJun *@date 2021/8/1 */ public class DyConsumerTest_xiaoxiyingda02 { public static void main(String[] args) throws Exception{ //使用工具类来创建通道 Channel channel = RabbitMqUtils.getChannel(); System.out.println("我是消费者B,我在等待接收消息!"); DeliverCallback deliverCallback = (String var1, Delivery var2)->{ String message= new String(var2.getBody()); try { Thread.sleep(30000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(message); //true 代表批量应答 channel 上未应答的消息 false 单条应答 boolean multiple = false; channel.basicAck(var2.getEnvelope().getDeliveryTag(),multiple); }; CancelCallback cancelCallback = (String var1)->{ System.out.println("消息消费被中断"); }; /** * 消费者消费消息 * 1.消费哪个队列 * 2.消费成功之后是否要自动应答 true 代表自动应答 false 手动应答 * 3.消费者未成功消费的回调 */ channel.basicConsume(QueueNameConstant.XIAOXIYINGDA_MODEL,false,deliverCallback,cancelCallback); } }
-
Установка ручного ответа и установка более длительного времени ожидания означает более медленную обработку услуг.
-
-
исполнительный лист
- 1. Консоль отправляет два сообщения соответственно
- 2. Теоретически у каждого из A и B есть сообщение. Если B отключится до его обработки в это время, что произойдет с сообщением?
-
Результаты
3. Резюме
- Правда и ложь множественного представляют разные значения
- ture означает пакетный ответ, который находится в единицах каналов.Например, если вы заполните один, я отвечу все завершено. Эффективно, но не безопасно.
- ложь означает один ответ, полный ответ я отвечаю один. Безопаснее и надежнее. Но менее эффективен.
- Сообщения автоматически помещаются в очередь
- Если потребитель по какой-либо причине теряет соединение (его канал закрыт, соединение закрыто или соединение TCP потеряно), из-за чего сообщение не отправляет подтверждение ACK, RabbitMQ узнает, что сообщение не было полностью обработано, и будет повторно поставить его в очередь.
- Если другие потребители могут справиться с этим в этот момент, он вскоре перераспределит его другому потребителю. Это гарантирует, что никакие сообщения не будут потеряны, даже если потребитель время от времени умирает.
Впереди долгий путь, и я обязательно буду его искать вдоль и поперёк~
Если вы думаете, что я блогеры хорошо пишу! Писать нелегко, пожалуйста, ставьте лайки, подписывайтесь и комментируйте, чтобы поощрять блоггеров ~ хахах