Предположим сценарий, потому что все наши потребители внезапно становятся недоступными, в результате чего на сервере rabbitMQ остаются десятки тысяч необработанных сообщений.Если в это время ничего не сделать, просто открыть клиент-потребитель, это приведет к огромному количеству всех сообщений передаются в одно мгновение, но наш единственный клиент не может обрабатывать столько данных одновременно, что приведет к тому, что потребитель станет огромной картой, которая может сразу выйти из строя и стать недоступной. Поэтому в реальном производстве очень важна защита от ограничения тока.
RabbitMQ предоставляет функцию qos (Quality of Service), то есть в предпосылке неавтоматического подтверждения сообщений, если определенное количество сообщений (путем установки значения QOS на основе потребления или канала) не подтверждены, новые сообщения не будет потребляться. Код ключа находится в заявленном коде потребителя
void basicQos(unit prefetchSize , ushort prefetchCount, bool global )
-
размер предварительной выборки: 0
-
prefetchCount: укажет RabbitMQ не отправлять более N сообщений потребителю одновременно, то есть, как только будет получено N сообщений без подтверждения, потребитель будет заблокирован до тех пор, пока не будет получено подтверждение сообщения.
-
global: true, false независимо от того, применяются ли вышеуказанные настройки к каналу, короче говоря, указанный выше предел — это уровень канала или уровень потребителя
Примечания: prefetchSize и global не реализованы rabbitmq и пока не будут изучаться. Обратите внимание, что prefetchCount действует только при no_ask=false, то есть эти два значения не действуют в случае автоматического ответа.
Демонстрация кода:
代码地址: https://github.com/hmilyos/rabbitmqdemo.git rabbitmq-api 项目下
Производственный код практически не изменился, изменились только exchange и routingKey.
public class Procuder {
private static final Logger log = LoggerFactory.getLogger(Procuder.class);
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(RabbitMQCommon.RABBITMQ_HOST);
connectionFactory.setPort(RabbitMQCommon.RABBITMQ_PORT);
connectionFactory.setVirtualHost(RabbitMQCommon.RABBITMQ_DEFAULT_VIRTUAL_HOST);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String msg = "Hello RabbitMQ limit Message";
for(int i = 0; i < 5; i ++){
log.info("生产端发送:{}", msg + i);
channel.basicPublish(Consumer.EXCHANGE_NAME, Consumer.ROUTING_KEY, true, null, (msg + i).getBytes());
}
}
}
Код потребителя необходимо изменитьautoAck имеет значение false **
Добавлено ** channel.basicQos(0, 1, false);
Полный потребительский код выглядит следующим образом
/**
* 使用自定义消费者
*/
public class Consumer {
private static final Logger log = LoggerFactory.getLogger(Consumer.class);
public static final String EXCHANGE_NAME = "test_qos_exchange";
public static final String EXCHANGE_TYPE = "topic";
public static final String ROUTING_KEY_TYPE = "qos.#";
public static final String ROUTING_KEY = "qos.save";
public static final String QUEUE_NAME = "test_qos_queue";
public static void main(String[] args) throws IOException, TimeoutException {
//1 创建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(RabbitMQCommon.RABBITMQ_HOST);
connectionFactory.setPort(RabbitMQCommon.RABBITMQ_PORT);
connectionFactory.setVirtualHost(RabbitMQCommon.RABBITMQ_DEFAULT_VIRTUAL_HOST);
//2 获取C onnection
Connection connection = connectionFactory.newConnection();
//3 通过Connection创建一个新的Channel
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE, true, false, null);
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY_TYPE);
/**
* prefetchSize:0
prefetchCount:会告诉RabbitMQ不要同时给一个消费者推送多于N个消息,限速N个
即一旦有 N 个消息还没有 ack,则该 consumer 将 block 掉,直到有消息 ack 回来,你再发送 N 个过来
global:true\false 是否将上面设置应用于channel级别,false是consumer级别
prefetchSize 和global这两项,rabbitmq没有实现,暂且不研究
*/
channel.basicQos(0, 1, false);
//使用自定义消费者
//1 限流方式 第一件事就是 autoAck设置为 false
//使用自定义消费者
channel.basicConsume(QUEUE_NAME, false, new MyConsumer(channel));
log.info("消费端启动成功");
}
}
пользовательский потребитель
public class MyConsumer extends DefaultConsumer {
private static final Logger log = LoggerFactory.getLogger(MyConsumer.class);
private Channel channel;
public MyConsumer(Channel channel) {
super(channel);
this.channel = channel;
}
@Override
public void handleDelivery(String consumerTag, //消费者标签
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
log.info("------limit-----consume message----------");
log.info("consumerTag: " + consumerTag);
log.info("envelope: " + envelope);
log.info("properties: " + properties);
log.info("body: " + new String(body));
//一定要手动ACK回去
//channel.basicAck(envelope.getDeliveryTag(), false);
}
}
Затем запустите потребителя, перейдите в консоль, чтобы увидеть, сгенерированы ли test_qos_exchange и test_qos_queue.
Убедитесь, что test_qos_queue привязан к test_qos_exchange. Запустите производителя для отправки 5 сообщений Было обнаружено, что потребитель напечатал только одно сообщение. Также см. в общей сложности 5 сообщений с консоли, 4 ожидаются и одно используется, но подтверждение не возвращается.Измените код в пользовательском потребителе следующим образом.public class MyConsumer extends DefaultConsumer {
private static final Logger log = LoggerFactory.getLogger(MyConsumer.class);
private Channel channel;
public MyConsumer(Channel channel) {
super(channel);
this.channel = channel;
}
@Override
public void handleDelivery(String consumerTag, //消费者标签
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
log.info("------limit-----consume message----------");
log.info("consumerTag: " + consumerTag);
log.info("envelope: " + envelope);
log.info("properties: " + properties);
log.info("body: " + new String(body));
//一定要手动ACK回去
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
Перезапустите потребителя, посмотрите, что потребитель потребляет один за другим, а ACK возвращается
Как показано выше, это простая стратегия ограничения тока на стороне потребителя RabbitMQ.