SimpleMessageListenerContainer — это простой контейнер прослушивателя сообщений.
-
Этот класс очень мощный, мы можем сделать для него множество настроек, и этот класс может удовлетворить все элементы конфигурации для потребителей. Он имеет функции мониторинга одной или нескольких очередей, автозапуска и автодекларации.
-
Он может устанавливать характеристики транзакций, диспетчер транзакций, атрибуты транзакций, параллелизм транзакций, открывать ли транзакцию, сообщения отката и т. д. Однако в реальном производстве мы редко используем транзакции и в основном используем компенсационные механизмы.
-
Он может установить количество потребителей, минимальное и максимальное количество и потребление партии.
-
Он может установить подтверждение сообщения и режим автоматического подтверждения, следует ли вернуться в очередь, а также функцию обработчика захвата исключений.
-
Он может установить стратегию генерации потребительских этикеток, эксклюзивный режим, потребительские атрибуты и т. д.
-
Он также может устанавливать определенные прослушиватели, преобразователи сообщений и многое другое.
Примечание: SimpleMessageListenerContainer может быть установлен динамически, например, работающее приложение может динамически изменять размер числа своих потребителей, режим получения сообщений и т. д.
Многие самонастраиваемые внутренние консоли, основанные на rabbitMQ, также реализуются на основе этого при настройке. Итак, видно, что SpringAMQP очень мощный.
Пример кода:
代码地址: https://github.com/hmilyos/rabbitmqdemo.git rabbitmq-api 项目下
На основе исходного кода RabbitMQConfig из RabbitTemplate из SpringAMQP в предыдущем разделе добавьте
@Bean //connectionFactory 也是要和最上面方法名保持一致
public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setQueues(queue001(), queue002(), queue003()); //监听的队列
container.setConcurrentConsumers(1); //当前的消费者数量
container.setMaxConcurrentConsumers(5); // 最大的消费者数量
container.setDefaultRequeueRejected(false); //是否重回队列
container.setAcknowledgeMode(AcknowledgeMode.AUTO); //签收模式
container.setExposeListenerChannel(true);
container.setConsumerTagStrategy(new ConsumerTagStrategy() { //消费端的标签策略
@Override
public String createConsumerTag(String queue) {
return queue + "_" + UUID.randomUUID().toString();
}
});
container.setMessageListener(new ChannelAwareMessageListener() {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
String msg = new String(message.getBody());
log.info("----------消费者: " + msg);
}
});
return container;
}
Продолжайте использовать последний код в модульном тесте
@Test
public void testSendMessage2() throws Exception {
//1 创建消息
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType("text/plain");
Message message = new Message("mq 消息1234 --spring.abc".getBytes(), messageProperties);
rabbitTemplate.send("topic001", "spring.abc", message);
rabbitTemplate.convertAndSend("topic001", "spring.amqp", "hello object message send! -spring.amqp");
rabbitTemplate.convertAndSend("topic002", "rabbit.abc", "hello object message send! -rabbit.abc");
}
Запустите тест, чтобы увидеть журнал, три связанные очереди могут быть использованы
С тех пор было введено простое использование SimpleMessageListenerContainer.В контейнере все еще есть много методов, ожидающих вашего использования, поэтому я не буду продолжать их вводить.