Это 9-й день моего участия в августовском испытании обновлений, подробности о событии:Испытание августовского обновления
Статьи по Теме
Краткое изложение серии RabbitMQ:Серия RabbitMQ
предисловие
- Мы только что увидели, как поступить в ситуации, когда задача не потеряна, но как гарантировать, что сообщение, отправленное производителем сообщения, не будет потеряно при остановке службы RabbitMQ.
- По умолчанию RabbitMQ игнорирует очереди и сообщения, когда он выходит или аварийно завершает работу по какой-либо причине, если только не указано не делать этого.
- Чтобы сообщения не были потеряны, необходимы две вещи: нам нужно пометить и очередь, и сообщение как долговременные.
1. Сохранение очереди
-
Очереди, которые мы создали ранее, все непостоянны. Если RabbitMQ будет перезапущен, очередь будет удалена. Если вы хотите, чтобы очередь была постоянной, вам нужно установить постоянный параметр на постоянный при объявлении очереди.
-
Перезапускаем RabbitMQ
-
Успешно перезапустите и обновите страницу управления
-
Да, наша очередь потеряна~
-
Очевидно, что в производственной среде мы не можем гарантировать, что сервер никогда не перезапустится и никогда не выключится. Поэтому, если очередь не сохраняется, данные легко теряются.
-
Постоянные данные — это то, что мы должны сделать.
-
Но следует отметить, что если объявленная ранее очередь не является постоянной, необходимо сначала удалить исходную очередь или создать постоянную очередь заново, иначе произойдет ошибка.
-
Исходный код выглядит следующим образом
-
public com.rabbitmq.client.impl.AMQImpl.Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException { validateQueueNameLength(queue); return (com.rabbitmq.client.impl.AMQImpl.Queue.DeclareOk)this.exnWrappingRpc((new com.rabbitmq.client.AMQP.Queue.Declare.Builder()).queue(queue).durable(durable).exclusive(exclusive).autoDelete(autoDelete).arguments(arguments).build()).getMethod(); }
-
Изменить код производителя
-
/** * 这是一个测试的生产者 *@author DingYongJun *@date 2021/8/1 */ public class DyProducerTest_xiaoxiyingda { /** * 这里为了方便,我们使用main函数来测试 * 纯属看你个人选择 * @param args */ public static void main(String[] args) throws Exception{ //使用工具类来创建通道 Channel channel = RabbitMqUtils.getChannel(); /** * 生成一个队列 * 1.队列名称 * 2.队列里面的消息是否持久化 默认消息存储在内存中 * 3.该队列是否只供一个消费者进行消费 是否进行共享 true 可以多个消费者消费 * 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除 * 5.其他参数 */ //这是持久化的参数。false不进行持久化,true进行持久化 boolean durable = true; channel.queueDeclare(QueueNameConstant.XIAOXIYINGDA_MODEL,durable,false,false,null); /** * 发送一个消息 * 1.发送到那个交换机 * 2.路由的 key 是哪个 * 3.其他的参数信息 * 4.发送消息的消息体 */ Scanner sc = new Scanner(System.in); System.out.println("请输入信息"); while (sc.hasNext()) { String message = sc.nextLine(); channel.basicPublish("",QueueNameConstant.XIAOXIYINGDA_MODEL,null,message.getBytes()); System.out.println("生产者发出消息" + message); } } }
-
Как упоминалось выше, исходная очередь должна быть удалена, иначе будет сообщено об ошибке.
-
Подробное сообщение об ошибке
-
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'xiaoxiyingda' in vhost '/': received 'true' but current is 'false', class-id=50, method-id=10)
-
-
ОК, удалите исходную очередь и выполните ее снова.
-
Буква D появляется в Features, что доказывает, что очередь успешно сохранена!
-
Попробуйте снова перезапустить RabbitMQ. Экономьте место, не наклеивайте картинки.
-
Идеально! Очередь еще есть! Приятно!
2. Сохранение сообщения
-
Изменить код производителя
-
/** * 这是一个测试的生产者 *@author DingYongJun *@date 2021/8/1 */ public class DyProducerTest_xiaoxiyingda { /** * 这里为了方便,我们使用main函数来测试 * 纯属看你个人选择 * @param args */ public static void main(String[] args) throws Exception{ //使用工具类来创建通道 Channel channel = RabbitMqUtils.getChannel(); /** * 生成一个队列 * 1.队列名称 * 2.队列里面的消息是否持久化 默认消息存储在内存中 * 3.该队列是否只供一个消费者进行消费 是否进行共享 true 可以多个消费者消费 * 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除 * 5.其他参数 */ //这是持久化的参数。false不进行持久化,true进行持久化 boolean durable = true; channel.queueDeclare(QueueNameConstant.XIAOXIYINGDA_MODEL,durable,false,false,null); /** * 发送一个消息 * 1.发送到那个交换机 * 2.路由的 key 是哪个 * 3.其他的参数信息 * 4.发送消息的消息体 */ Scanner sc = new Scanner(System.in); System.out.println("请输入信息"); while (sc.hasNext()) { String message = sc.nextLine(); //MessageProperties.PERSISTENT_TEXT_PLAIN;这个代表消息持久化到硬盘 channel.basicPublish("",QueueNameConstant.XIAOXIYINGDA_MODEL,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes()); System.out.println("生产者发出消息" + message); } } }
-
MessageProperties.PERSISTENT_TEXT_PLAIN
-
Посмотрите на исходный код, что это?
-
public void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException { this.basicPublish(exchange, routingKey, false, props, body); }
-
public class MessageProperties { public static final BasicProperties MINIMAL_BASIC = new BasicProperties((String)null, (String)null, (Map)null, (Integer)null, (Integer)null, (String)null, (String)null, (String)null, (String)null, (Date)null, (String)null, (String)null, (String)null, (String)null); public static final BasicProperties MINIMAL_PERSISTENT_BASIC = new BasicProperties((String)null, (String)null, (Map)null, 2, (Integer)null, (String)null, (String)null, (String)null, (String)null, (Date)null, (String)null, (String)null, (String)null, (String)null); public static final BasicProperties BASIC = new BasicProperties("application/octet-stream", (String)null, (Map)null, 1, 0, (String)null, (String)null, (String)null, (String)null, (Date)null, (String)null, (String)null, (String)null, (String)null); public static final BasicProperties PERSISTENT_BASIC = new BasicProperties("application/octet-stream", (String)null, (Map)null, 2, 0, (String)null, (String)null, (String)null, (String)null, (Date)null, (String)null, (String)null, (String)null, (String)null); public static final BasicProperties TEXT_PLAIN = new BasicProperties("text/plain", (String)null, (Map)null, 1, 0, (String)null, (String)null, (String)null, (String)null, (Date)null, (String)null, (String)null, (String)null, (String)null); public static final BasicProperties PERSISTENT_TEXT_PLAIN = new BasicProperties("text/plain", (String)null, (Map)null, 2, 0, (String)null, (String)null, (String)null, (String)null, (Date)null, (String)null, (String)null, (String)null, (String)null); public MessageProperties() { } }
-
То есть при отправке сообщения нам нужно записать параметр, и тогда mq сохранит его на диск.
-
-
Впереди долгий путь, и я обязательно буду его искать вдоль и поперёк~
Если вы думаете, что я блогеры хорошо пишу! Писать нелегко, пожалуйста, ставьте лайки, подписывайтесь и комментируйте, чтобы поощрять блоггеров ~ хахах