Контейнер сообщений SpringAMQP — SimpleMessageListenerContainer

RabbitMQ

SimpleMessageListenerContainer — это простой контейнер прослушивателя сообщений.

  • Этот класс очень мощный, мы можем сделать для него множество настроек, и этот класс может удовлетворить все элементы конфигурации для потребителей. Он имеет функции мониторинга одной или нескольких очередей, автозапуска и автодекларации.

  • Он может устанавливать характеристики транзакций, диспетчер транзакций, атрибуты транзакций, параллелизм транзакций, открывать ли транзакцию, сообщения отката и т. д. Однако в реальном производстве мы редко используем транзакции и в основном используем компенсационные механизмы.

  • Он может установить количество потребителей, минимальное и максимальное количество и потребление партии.

  • Он может установить подтверждение сообщения и режим автоматического подтверждения, следует ли вернуться в очередь, а также функцию обработчика захвата исключений.

  • Он может установить стратегию генерации потребительских этикеток, эксклюзивный режим, потребительские атрибуты и т. д.

  • Он также может устанавливать определенные прослушиватели, преобразователи сообщений и многое другое.

    Примечание: SimpleMessageListenerContainer может быть установлен динамически, например, работающее приложение может динамически изменять размер числа своих потребителей, режим получения сообщений и т. д.

Многие самонастраиваемые внутренние консоли, основанные на rabbitMQ, также реализуются на основе этого при настройке. Итак, видно, что SpringAMQP очень мощный.

Пример кода:

代码地址:  https://github.com/hmilyos/rabbitmqdemo.git  rabbitmq-api 项目下

На основе исходного кода RabbitMQConfig из RabbitTemplate из SpringAMQP в предыдущем разделе добавьте

    @Bean   //connectionFactory 也是要和最上面方法名保持一致
    public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
        container.setQueues(queue001(), queue002(), queue003());    //监听的队列
        container.setConcurrentConsumers(1);    //当前的消费者数量
        container.setMaxConcurrentConsumers(5); //  最大的消费者数量
        container.setDefaultRequeueRejected(false); //是否重回队列
        container.setAcknowledgeMode(AcknowledgeMode.AUTO); //签收模式
        container.setExposeListenerChannel(true);
        container.setConsumerTagStrategy(new ConsumerTagStrategy() {    //消费端的标签策略
            @Override
            public String createConsumerTag(String queue) {
                return queue + "_" + UUID.randomUUID().toString();
            }
        });


        container.setMessageListener(new ChannelAwareMessageListener() {
            @Override
            public void onMessage(Message message, Channel channel) throws Exception {
                String msg = new String(message.getBody());
                log.info("----------消费者: " + msg);
            }
        });
        return container;
    }

Продолжайте использовать последний код в модульном тесте

    @Test
    public void testSendMessage2() throws Exception {
        //1 创建消息
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setContentType("text/plain");
        Message message = new Message("mq 消息1234   --spring.abc".getBytes(), messageProperties);

        rabbitTemplate.send("topic001", "spring.abc", message);

        rabbitTemplate.convertAndSend("topic001", "spring.amqp", "hello object message send!   -spring.amqp");
        rabbitTemplate.convertAndSend("topic002", "rabbit.abc", "hello object message send!  -rabbit.abc");
    }

Запустите тест, чтобы увидеть журнал, три связанные очереди могут быть использованы

С тех пор было введено простое использование SimpleMessageListenerContainer.В контейнере все еще есть много методов, ожидающих вашего использования, поэтому я не буду продолжать их вводить.