1: Сводный обзор
После изучения предыдущих трех статей промежуточное ПО RabbitMQ должно быть в стадии очистки облаков и наблюдения за небом. Эта статья будет ковать железо, пока горячо, и улучшит последний раздел потребления базового приложения RabbitMQ. Конечно, в статье будет и дальше объясняться подробная информация о распространении сообщений, подтверждении на стороне потребителя и других функциях среднего уровня.
Второе: потребление сообщений
Очередь MQ можно понимать как центр хранения предметов.Когда вы его кладете, вы должны забрать его для использования.Если вы держите его там без процентов, это будет продолжать увеличивать затраты и вызывать ряд проблем. Существует два способа использования сообщений, хранящихся в MQ: отправка службы RabbitMQ, получение клиентом потребителя.
2.1 Вытягивающие сообщения
Извлечение сообщений сервера RabbitMQ через baiscGet() имеет следующие особенности:
一次只能消费一条消息,千万别使用用循环代替后面的baiscConsume()
- Упомянутый ранее параметр создания очереди имеет AutoDelete, но обратите внимание на это
自动删除前提为至少有一个消费者连接到队列
, и удалить, когда все потребители будут отключены, здесьbasicGet()消费不包含在内
// 设置队列自动删除
channel.queueDeclare("autoDeleteQueue", true, false, true, null);
channel.basicPublish("", "autoDeleteQueue", null, "测试".getBytes());
// 验证basicGet不触发队列自动删除
channel.basicGet("autoDeleteQueue", true);
КонечноbasicGet()
API самого метода относительно простое, первый параметр задает очередь потребления, а второй параметр задает, отвечать ли автоматически, а именно AutoAck. Возвращаемый объект также инкапсулируетсяEnvelope
,BasicProperties
,body
Тело сообщения и т. д., конкретная информация показана в следующей таблице:
серийный номер | параметры метода | значение |
---|---|---|
1 | queue | Имя очереди, указывающее очередь потребления потребителя |
2 | autoAck | Автоматический ответ, после того, как открытие станет истинным, сообщение, отправленное приложением RabbitMQ, будет немедленно удалено. |
серийный номер | возвращаемое значение | Примечание |
---|---|---|
1 | envelope | Содержит тег доставки, обмен, ключ маршрутизации и другую информацию. |
2 | props | Объект BasicProperties, то есть свойства объекта, установленные при создании сообщения. |
3 | body | массив байтов тела сообщения |
4 | messageCount | количество сообщений |
2.2 push-сообщения
По сравнению с вытягивающими сообщениями,basicConsume()
Push-сообщения больше соответствуют потребностям производственной среды, а очередь потребления постоянно контролируется. Естественно, его API также более сложный.Часто используемые серии перегруженных параметров показаны в следующей таблице:
серийный номер | параметры метода | значение |
---|---|---|
1 | queue | Имя очереди потребления |
2 | autoAck | Автоматически подтверждать отправку |
3 | consumerTag | Уникальный идентификатор потребителя |
4 | noLocal | Не использовать сообщения, созданные одним и тем же соединением Connection. |
5 | consumer | Конкретно организуйте объекты логики потребления, которые предоставляют ряд перегруженных методов для сборки логики потребления пользователем. |
Использование push-сообщений обычно реализуется в концеConsumer
интерфейс или наследованиеDefaultConsumer
Класс DefaultConsumer реализует интерфейс Consumer, но большинство методов — это пустые реализации, и логику в них нужно переписывать. Время выполнения важных методов показано в следующей таблице:
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
@Override
public void handleDelivery (String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// 处理消息逻辑
}
};
channel.basicConsume("queueName",true,"consumerTag",defaultConsumer);
серийный номер | метод | время исполнения |
---|---|---|
1 | handleDelivery() | Логика сообщения о потреблении |
2 | handleConsumeOk() | В первой статье упоминалось, что команда Consume-Ok выполнит этот метод один раз перед командой доставки, то есть до того, как каждый потребитель начнет потреблять |
3 | handleShutdownSignal() | Выполнить один раз, когда соединение Соединение/канал Канал отключен и закрыт |
4 | handleRecoverOk() | Очередь команд baiscRecover() повторно отправляет неподтвержденные сообщения Этот метод выполняется перед повторной отправкой неподтвержденных сообщений. |
5 | handleCancelOk() | baiscCancel() показывает отмену потребителя, укажите этот метод при отмене потребителя |
2.3 Ретрансляция очереди
Упомянутый выше метод ConsumerhandleRecoverOk()
Он будет вызываться при повторной отправке сообщения, и отображаемый метод вызова повторной отправки сообщенияbasicRecover()
. Метод имеет только один параметр:
-
true
: Указывает, что сообщения повторной отправки могут быть отправлены другим потребителям. -
false
: указывает, что сообщения могут быть отправлены только одному и тому же потребителю.
Три: подтверждение сообщения
В первой статье есть команда, которую клиент использует Basic.Ack для сообщения серверу, чтобы подтвердить, что сообщение было использовано нормально.Когда команда получена, сервер RabbityMQ удалит сообщение, чтобы гарантировать, что сообщение не будут потеряны для клиента-потребителя. Естественно, если есть подтверждение, будет и подтверждение отказа.В этом разделе представлены функции basicAck(), basicReject() и basicNack().
3.1 Подтверждение использования basicAck
Обратная связь RabbitMQ подтверждает, что потребление проходит командуbasicAck()
Реализация, метод имеет два параметраdeliveryTag
иmultiple
channel.basicAck(envelope.getDeliveryTag(),false);
- deliveryTag: номер подтверждающего сообщения, который является возрастающим уникальным номером, назначаемым каждому сообщению при его использовании.
- множественное: пакетное подтверждение, true означает, что будут подтверждены все ожидающие сообщения, число которых меньше, чем номер текущего подтверждающего сообщения, false означает, что подтверждается только текущее сообщение
обращать внимание:消息的编码是每个信道Channel范围的,批量确认操作也是针对当前Channel信道的操作。请务必记住这个范围
3.2 Отказаться от подтверждения basicReject
Программа выдает исключение в процессе потребления сообщения, или сообщение необходимо использовать повторно, в это время потребленное сообщение может быть отклонено для подтверждения. Есть два места для сообщений, которые отказываются подтверждать: они удаляются и возвращаются в очередь.requeue
Контроль, сообщение, которое отказывается подтвердить, будет помещено в начало очереди, когда оно будет помещено обратно в очередь, а сообщение, которое отказывается возвращаться в очередь, может быть использовано с очередью недоставленных сообщений.
void basicReject(long deliveryTag, boolean requeue) throws IOException;
3.3 Отказ от подтверждения basicNack
Потребление подтверждений может быть подтверждено пакетами, почему сообщения об отклонении не могут быть отклонены пакетами? Таким образом, чтобы восполнить недостаточность basicReject(),basicNack()
. Этот API имеет на один параметр больше, чем basicReject().multiple
, эффект соответствует пакетному подтверждению
void basicNack(long deliveryTag, boolean multiple, boolean requeue)
throws IOException;
Четыре: предварительная выборка сообщений
Стратегия, принятая RabbitMQ для потребления сообщений, представляет собой механизм опроса, который отправляет каждое сообщение уникальному потребителю. Сообщения, полученные каждым потребителем, усредняются, и такой механизм приведет к следующим проблемам:- Некоторые сообщения отправляются слишком долго.После того, как сообщения будут распределены равномерно, у некоторых потребителей может остаться невыполненная работа с сообщениями, в то время как некоторые потребители будут бездействовать, что приведет к снижению пропускной способности системы.
Следующий код может сообщить серверу RabbitMQ, я принимаю толькоprefetchCount
Количество неподтвержденных сообщений. Когда количество неподтвержденных сообщений клиента-потребителя достигает предела, сервер не будет передавать данные потребителю. Смысл второго параметра следующий:
значение параметра | значение |
---|---|
false | По умолчанию, применяется ко всем потребителям на канале |
true | Доступно всем потребителям на канале |
void basicQos(int prefetchCount, boolean global) throws IOException;
Разумная предварительная выборка сообщений в сочетании с механизмом ручного подтверждения ACK потребителем может хорошо оптимизировать и сбалансировать производительность потребителя.Эта проблема количества предварительной выборки может быть сбалансирована в соответствии со скоростью роста сообщений в очереди и эффективностью обработки сообщений потребителем.
Пятое: ПКП
На самом деле относительно просто использовать RabbitMQ для выполнения операций RPC, то есть использовать ранее упомянутыйBasicPeoperties
объект. При отправке сообщения сообщение может нести объект, а объектdeliveryMod
Упорство,priority
приоритет,expiration
Автоматически устаревать удаленные атрибуты. Здесь RPC будет использоватьreplyTo
Атрибут сообщает RPC-серверу, что после выполнения необходимо вызвать адрес очереди.correlationId
Уникальный идентификатор, используемый для идентификации запроса
5.1 RPC-клиент
- UUID генерирует CorrelationId для запроса уникального идентификатора, а клиент подтверждает атрибуцию и собственный обратный вызов запроса при использовании очереди обратного вызова.
- Очередь блокировки ArrayBlockingQueue используется для блокировки основного потока и ожидания обратного вызова после того, как сервер RPC завершит логику.
- Если вы хотите ограничить время ожидания удаленного RPC, вы можете добавить максимальное время ожидания в блокирующий метод ожидания очереди take()
@SneakyThrows
public static void main (String[] args) {
Channel channel = TemplateConfigServiceImpl.createChannel();
// 创建BasicProperties赋值replyTo回调队列名称、correlationId请求唯一标识ID
String correlationId = UUID.randomUUID().toString();
String replyQueue = "queue1";
AMQP.BasicProperties basicProperties = new AMQP.BasicProperties.Builder().replyTo(replyQueue).correlationId(correlationId).build();
// 客户端向服务端监控队列发送消息
String rpcQueue = "queueName";
channel.basicPublish("",rpcQueue,basicProperties,"RPC测试".getBytes());
// 创建阻塞队列等到消息回调
ArrayBlockingQueue<String> arrayBlockingQueue = new ArrayBlockingQueue<>(1);
// 监控回调队列消息获取远程调用结果
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
@Override
public void handleDelivery (String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// 校验消息唯一标识是否匹配
String replyCorrelationId = properties.getCorrelationId();
long deliveryTag = envelope.getDeliveryTag();
if (correlationId.equals(replyCorrelationId)){
// 将回调消息放到阻塞队列中
arrayBlockingQueue.offer(new String(body));
channel.basicAck(deliveryTag,false);
}else {
// 不匹配消息放回队列
channel.basicReject(deliveryTag,true);
}
}
};
channel.basicConsume(replyQueue,false,"ConsumerTag",defaultConsumer);
// 阻塞等待阻塞队列中消息处理后续逻辑
String take = arrayBlockingQueue.take();
System.out.println(take);
}
5.2 RPC-сервер
Общая реализация RPC-сервера и клиента представляет собой простейший велосипедный дизайн. Если вам нужна более сложная логика, сделайте это сами.
@SneakyThrows
public static void main (String[] args) {
Channel channel = TemplateConfigServiceImpl.createChannel();
// 监控RPC队列消息执行任务
String rpcQueue = "queueName";
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
@Override
public void handleDelivery (String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// 执行计算逻辑
System.out.println("RPC远程服务端开始执行任务");
System.out.println(new String(body));
// 组装回调消息
String replyTo = properties.getReplyTo();
channel.basicPublish("",replyTo,properties,"RPC远程计算任务完成".getBytes());
// 确认消息消费
long deliveryTag = envelope.getDeliveryTag();
channel.basicAck(deliveryTag,false);
}
};
channel.basicConsume(rpcQueue,false,"ConsumerTag",defaultConsumer);
}