Расширенное ограничение текущего сообщения RabbitMQ и очередь задержки

Java
Расширенное ограничение текущего сообщения RabbitMQ и очередь задержки

Жизнь в конце концов станет одиночным путешествием, до одиночества — смятение, после одиночества — рост.

клин

Эта статья представляет собой очередь сообщенийRabbitMQпятая пуля.

Я собирался поговорить о предыдущемRabbitMQНекоторое расширенное использование:

  • Как обеспечить достоверность сообщения?
  • Как ограничить поток очереди сообщений?
  • Как настроить отложенную очередь для отложенного потребления?

В конце концов, из-за нехватки места в прошлой статье речь шла только о如何保证消息的可靠性?, эта статья закончит оставшиеся две, эта статья также может бытьRabbitMQПоследняя серия~

Знаний, о которых я рассказал, в принципе достаточно для моей работы, и я надеюсь, что вы их хорошо усвоите.

После того, как старые ямы будут заполнены, новые ямы могут открываться медленно.В то же время, поскольку мне нужно подготовиться к экзамену до середины сентября, обновление статьи может быть медленнее, но в понедельник это минимум.Я надеюсь, что все Потерпи.


Желаю вам хорошего урожая, как сначала, так и потом смотреть, и иметь бесконечное счастье.

Код для этой статьи: Адрес облака кодаАдрес GitHub

1. 🔍Как ограничить поток сообщений в очереди?

Ограничение тока очереди сообщений относится к экстренной мере самозащиты, когда сервер сталкивается с огромным объемом трафика.

Поскольку огромный трафик представляет собой большое количество сообщений, если эти сообщения слишком велики для обработки сервером, сервер будет парализован, что повлияет на взаимодействие с пользователем и вызовет неблагоприятные последствия.

Следовательно, требуется операция перехода на более раннюю версию, чтобы изолировать необработанный трафик от системы и предотвратить его нарушение работы системы.

По сути, любая очередь сообщений имеет ограниченную функцию потока.Сегодня мы рассмотримRabbitMQЧто нужно сделать, чтобы ограничить ток?

RabbitMQ предоставляетQOS(Обеспечение качества обслуживания), то есть при условии неавтоматического подтверждения сообщений, если определенное количество сообщений не было подтверждено потреблением, новые сообщения не будут потребляться.


spring:
  rabbitmq:
    addresses: 127.0.0.1
    host: 5672
    username: guest
    password: guest
    virtual-host: /
    # 手动确认消息
    listener:
      simple:
          acknowledge-mode: manual
          prefetch: 2

Нам просто нужно настроитьrabbitmq.listener.simpleвнизprefetchАтрибутов достаточно.Для удобства демонстрации я настроил их здесь на два.Семантика такова: если в очереди больше двух неподписанных сообщений, потребление нового сообщения производиться не будет.

Я отправляю три сообщения в свою очередь без подписи, чтобы увидеть эффект:

消息限流演示01

После отправки он показывает, что в нашей системе есть три сообщения Ready, что означает, что эти три сообщения все еще находятся в очереди, и нет потребителя, который мог бы их использовать.

В это время я открыл потребительскую сторону для потребления, но еще не расписался в получении, а затем посмотрел на эффект:

消息限流演示02

unacked=2, ready=1, что означает, что два сообщения получены на сервере, но не подписаны, и одно сообщение все еще находится в очереди и не отправлено на сервер, потому что мы установилиprefetch=2, так что теперь максимальное количество сообщений, одновременно потребляемых очередью, равно 2. Таким образом, мы выполнили ограничение текущего потребления.

Tip: Таким образом, сообщение должно быть подписано вручную, потому что в предыдущей статье мы говорили о том, что автоматически подписывается сообщение, достигающее стороны потребителя, которое будет подписано, может не запускаться наша бизнес-логика. маленький знак следующего состояния.

2. 📑Консоль RabbitMQ

На моих снимках экрана в предыдущем разделе вы можете видеть, что есть визуальный интерфейс.В прошлом мои снимки экрана обычно представляли собой интерфейсы командной консоли DOS, но на самом делеRabbitMQЭто плагин с визуальным интерфейсом, нам нужно только открыть его.

в нашемRabbitMQВведите следующую команду в:rabbitmq-plugins.bat enable rabbitmq_management

Вы можете открыть страницу визуализации, а затем посетить:http://localhost:15672/

可视化页面01

Имя пользователя и пароль по умолчанию гостевые, вы можете войти напрямую.

可视化页面02

Очень удобная консоль, можете попробовать сами~

3. 📔TTL сообщение/очередь

TTLЭто сокращение от Time To Live, что означает время жить.RabbitMQОн поддерживает время истечения срока действия сообщения, которое можно указать при отправке сообщения. Он также поддерживает время истечения срока действия очереди. Оно рассчитывается с момента поступления сообщения в очередь. превышена очередь, сообщение будет автоматически удалено.

Если очередь установлена, срок действия сообщений во всей очереди истечет, когда придет время, а если задано сообщение, одно сообщение автоматически истечет, когда оно прибудет.

    // TTL队列示例
    @Bean
    public Queue ttlQueue() {
        Map<String, Object> arguments = new HashMap<>();
        // 设置3s过期
        arguments.put("x-message-ttl",3000);
        return new Queue("topicQueue1",false,false,false, arguments);
    }

Приведенный выше код демонстрирует, как создать очередь TTL, вам нужно указать параметры для воспроизведения, другие параметры в структуре очереди я заполнил FALSE напрямую.

    public void sendTtl() {
        String message = "Hello 我是作者和耳朵,欢迎关注我。" + LocalDateTime.now().toString();

        System.out.println("Message content : " + message);

        // 设置过期3s
        MessageProperties props = MessagePropertiesBuilder.newInstance()
                .setExpiration("3000").build();

        rabbitTemplate.send(Producer.QUEUE_NAME,new Message(message.getBytes(StandardCharsets.UTF_8),props));
        System.out.println("消息发送完毕。");
    }

Установка TTL сообщения также является параметром настройки.

ВышеупомянутоеRabbitMQОчки знаний о TTL в .

4. 📌Очередь недоставленных писем DLX

DLX死信队列Хотя это и называется очередью, на самом деле этоExchange, или ссылаясь наExchangeи это принадлежитQueue, вместе они образуют очередь недоставленных сообщений.

Когда сообщение:

  • Потребление отклонено (basic.reject/basic.nack) с requeue=false
  • Срок жизни истек
  • Очередь для ввода достигла максимальной длины

В этих трех случаях можно определить, что сообщение мертво.Если мы не займемся этим типом сообщения, оно будет автоматически удалено.

Но на самом деле мы можем добавить в очередь параметр, чтобы при нахождении очереди死亡的消息затем он будет автоматически перенаправлен наExchange, указанныйExchangeчтобы справиться с новостями об этих смертях.

Этот обрабатывает сообщения о смертиExchangeи то, что мы сказали раньшеExchangeНет никакой разницы, вы все еще можете привязать очередь и потреблять сообщения.

    // DLX队列示例
    @Bean
    public Queue dlxQueue() {
        Map<String, Object> arguments = new HashMap<>();
        // 指定消息死亡后发送到ExchangeName="dlx.exchange"的交换机去
        arguments.put("x-dead-letter-exchange","dlx.exchange");
        return new Queue("topicQueue1", false, false, false, arguments);
    }

Приведенный выше код предназначен для установки адресата сообщения в очереди после его смерти, что означает, что после того, как сообщение умирает, оно не удаляется, а пересылается один раз и отправляется другимExchangeидти с.

Так какой смысл это делать? Это зависит от потребностей бизнеса, но это будет использоваться в следующем разделе, а затем посмотрите вниз ~

5. 💡Отложенная очередь

RabbitMQНет такого понятия как очередь с задержкой в ​​гене А очереди с задержкой.

Представьте себе сценарий, мы закроем заказ, если оплата не будет произведена в течение 15 минут после размещения заказа, Это очень классический сценарий демо-потребления.RabbitMQДля этого нам нужно совместить TTL+DLX.

Сначала установите время истечения срока действия сообщения заказа на 15 минут, а затем очередь перенаправит сообщение в наш набор после истечения срока действия.DLX-Exchange,DLX-ExchangeЗатем он будет распределен в очередь, к которой он привязан, и наши потребители будут потреблять сообщения в этой очереди, так что потребление может быть отложено на пятнадцать минут.

Действительно супер~~~ просто

постскриптум

Все кончено, все кончено, все кончено,RabbitMQНа этом закончилось, хотя некоторые вещи не упомянуты, такие как: зеркальная очередь, потому что я ею не пользовался и вообще не умею сам, поэтому мне лень и писать не буду,RabbitMQВ конце концов, это не естественная распределённая очередь сообщений, и зеркалирование по-прежнему немного хлопотно.

Вроде писалась одна за другой уже почти месяц.Много всего и немного сложного.Или я напишу статью в следующем номере,чтоб пересмотреть ее,нарисовать ментальную карту что-ли,разложить на все, и выберите еще несколько.小册六折码.

Наконец, я буду рекламировать на Youhu.Недавно Nuggets установили программу с открытым исходным кодом на GitHub -open-source, целью которого является включение всевозможных забавных и полезных библиотек с открытым исходным кодом. Если у вас есть библиотеки с открытым исходным кодом, которые вы хотите порекомендовать или поделиться, вы можете принять участие и внести свой вклад в этот план с открытым исходным кодом.StartОн также неуклонно растет, и участие в нем также может увеличить экспозицию ваших собственных проектов, убивая двух зайцев одним выстрелом.

В то же время у этой библиотеки с открытым исходным кодом есть родственный проект —open-source-translation, целью которого является найм волонтеров по переводу технических статей для перевода технических статей, Стремитесь быть лучшим переводчиком с открытым исходным кодом, переводите высококачественные документы в отрасли и вносите свой вклад в рост технических специалистов.


В эти дни произошло много всего. Youhu заставил меня перейти на уровень 3 до конца августа, поэтому лайки читателей очень важны для меня. Надеюсь, вы поднимете руку и поможете мне~

Ну а выше все содержание этого выпуска.Спасибо что вы здесь.Добро пожаловать лайкайте,собирайте и комментируйте эту статью.👍Каждый ваш лайк-самая большая мотивация для моего творчества.

Я Эр, псевдолитературный программист, который всегда хотел выводить знания, до встречи в следующем выпуске.

Код для этой статьи:Адрес облака кодаАдрес GitHub