RabbitMQ очереди сообщений

Java RabbitMQ

Что касается очереди сообщений, то с позапрошлого года периодически перечитывал некоторые материалы. Давно хотел написать, но не находил времени. Недавно встретил нескольких друзей, говорящих о технический выбор этой области Настало время систематизировать и зафиксировать знания в этой области.

На рынке есть много продуктов для очередей сообщений, таких как старый ActiveMQ, RabbitMQ, самый популярный Kafka, который я вижу в настоящее время, и ZeroMQ, RocketMQ, подаренный Apache компанией Alibaba в конце прошлого года, и даже базы данных NoSQL, такие как redis, также поддержка функций MQ. Короче говоря, есть более десятка известных продуктов, и я намерен говорить только о RabbitMQ, Kafka и ActiveMQ, исходя из собственного опыта и интересов.В этой статье сначала будет рассказано о RabbitMQ, а до этого давайте посмотрим на связанных концепциях очередей сообщений.

Что такое очередь сообщений

Сообщение (Message) относится к данным, передаваемым между приложениями. Сообщения могут быть очень простыми, например, содержать только текстовые строки, или более сложными, возможно, содержащими встроенные объекты.

Очередь сообщений (Message Queue) — это метод связи между приложениями.После того как сообщение отправлено, оно может быть немедленно возвращено, а система сообщений может обеспечить надежную доставку сообщения. Издателю сообщения нужно только опубликовать сообщение в MQ, и ему все равно, кто его получит, а потребителю сообщения нужно только получить сообщение от MQ, независимо от того, кто его публикует. Таким образом, ни издатель, ни пользователь не должны знать о существовании друг друга.

Зачем использовать очереди сообщений

Из приведенного выше описания видно, что очередь сообщений — это асинхронный механизм взаимодействия между приложениями.Когда нам нужно использовать MQ?

На примере обычной системы заказов бизнес-логика после нажатия пользователем кнопки [Разместить заказ] может включать в себя: вычет запасов, формирование соответствующих документов, отправку красных конвертов и отправку SMS-уведомлений. На ранней стадии развития бизнеса эти логики могут быть объединены и выполняться синхронно.По мере развития бизнеса количество заказов увеличивается, и производительность системных служб необходимо улучшать.В это время некоторые операции, выполняющие не нужно вступать в силу немедленно, можно разделить и выполнить асинхронно, например, выдача красных конвертов, отправка SMS-уведомлений и т. д. В этом сценарии MQ можно использовать для отправки сообщения в MQ после основного процесса размещения заказа (например, вычета запасов и создания соответствующих документов) для быстрого завершения основного процесса, в то время как другой отдельный поток извлекает сообщения MQ (или MQ отправляет сообщения). сообщения), когда обнаруживается, что в MQ есть такие сообщения, как красные конверты или текстовые сообщения, выполняется соответствующая бизнес-логика.

Вышеупомянутый случай разделения бизнеса Другие распространенные сценарии включают консистентность в конечном счете, широковещательную рассылку, поэтапное управление потоком и т. д.

Возможности RabbitMQ

RabbitMQ — это реализация AMQP с открытым исходным кодом, разработанная на языке Erlang.

AMQP: расширенная очередь сообщений, расширенный протокол очереди сообщений. Это открытый стандарт протокола прикладного уровня, разработанный для промежуточного программного обеспечения, ориентированного на сообщения.Клиенты и промежуточное программное обеспечение сообщений, основанные на этом протоколе, могут передавать сообщения и не ограничиваются продуктами, языками разработки и другими условиями.

Первоначально RabbitMQ возник из финансовой системы и используется для хранения и пересылки сообщений в распределенной системе.Он хорошо работает с точки зрения простоты использования, масштабируемости и высокой доступности. Особенности включают в себя:

  1. Надежность RabbitMQ использует некоторые механизмы для обеспечения надежности, такие как постоянство, подтверждение передачи и подтверждение публикации.

  2. Гибкая маршрутизация Сообщения направляются через Exchange до того, как они попадут в очередь. Для типичной функциональности маршрутизации RabbitMQ уже предоставляет некоторые встроенные возможности Exchange для реализации. Для более сложных функций маршрутизации несколько обменов могут быть связаны вместе, а их собственные обмены также могут быть реализованы с помощью механизма подключаемых модулей.

  3. Кластеризация сообщений (кластеризация) Несколько серверов RabbitMQ могут образовывать кластер для формирования логического брокера.

  4. Высокодоступные очереди Очереди можно зеркально отразить на компьютерах в кластере, чтобы очереди оставались доступными в случае сбоя некоторых узлов.

  5. Мультипротокол RabbitMQ поддерживает различные протоколы очередей сообщений, такие как STOMP, MQTT и другие.

  6. Много клиентов RabbitMQ поддерживает практически все распространенные языки, такие как Java, .NET, Ruby и т. д.

  7. Интерфейс управления RabbitMQ предоставляет простой в использовании пользовательский интерфейс, который позволяет пользователям отслеживать и управлять многими аспектами брокера сообщений.

  8. Отслеживание Если сообщение является ненормальным, RabbitMQ предоставляет механизм отслеживания сообщения, потребитель может узнать, что произошло.

  9. Система плагинов (Система плагинов) RabbitMQ предоставляет множество плагинов, которые можно расширять разными способами, или вы можете написать свои собственные плагины.

Концепции в RabbitMQ

модель сообщения

Абстракция модели — это один и тот же процесс для всех продуктов MQ: Потребитель подписывается на очередь. Производитель (producer) создает сообщение, затем публикует его в очередь (queue) и, наконец, отправляет сообщение прослушивающим потребителям.

消息流

Основные понятия RabbitMQ

Вышеприведенное является лишь самым простым абстрактным описанием, и есть более подробные концепции, которые необходимо объяснить, когда речь идет о RabbitMQ. Как упоминалось выше, RabbitMQ — это реализация протокола AMQP с открытым исходным кодом, поэтому его внутреннее устройство фактически представляет собой базовые концепции AMQP:

RabbitMQ 内部结构

  1. Сообщение Сообщение, сообщение анонимно, состоит из заголовка сообщения и тела сообщения. Тело сообщения непрозрачно, а заголовок сообщения состоит из ряда необязательных атрибутов, включая ключ маршрутизации (ключ маршрутизации), приоритет (приоритет по отношению к другим сообщениям), режим доставки (указывающий, что сообщению может потребоваться постоянное хранение), и Т. Д.
  2. Издатель Производитель сообщения также является клиентским приложением, которое публикует сообщения для обмена.
  3. Обмен Обмен, который получает сообщения, отправленные производителями, и направляет их в очереди на сервере.
  4. Связывание Привязки для ассоциаций между очередями сообщений и обменами. Привязка — это правило маршрутизации, которое связывает обмен и очередь сообщений на основе ключа маршрутизации, поэтому обмен можно понимать как таблицу маршрутизации, состоящую из привязок.
  5. Очередь Очередь сообщений, используемая для хранения сообщений до тех пор, пока они не будут отправлены потребителям. Это контейнер для сообщения и пункт назначения для сообщения. Сообщение может быть помещено в одну или несколько очередей. Сообщение было в очереди, ожидая, пока потребитель подключится к очереди, чтобы забрать его.
  6. Связь Сетевое подключение, например TCP-соединение.
  7. Канал канал, независимый двунаправленный канал потока данных в мультиплексном соединении. Канал — это виртуальное соединение, установленное в реальном TCP-соединении.Команды AMQP отправляются через канал.Будь то публикация сообщения, подписка на очередь или получение сообщения, эти действия выполняются через канал. Поскольку установление и уничтожение TCP очень дорого обходится операционной системе, для повторного использования TCP-соединения вводится понятие канала.
  8. Потребитель Потребитель сообщений, который представляет собой клиентское приложение, получающее сообщения из очереди сообщений.
  9. Виртуальный хост Виртуальный хост, представляющий набор обменов, очередей сообщений и связанных объектов. Виртуальные хосты — это отдельные серверные домены, использующие одну и ту же среду аутентификации и шифрования. Каждый виртуальный хост по сути представляет собой мини-версию сервера RabbitMQ со своими очередями, обменами, привязками и механизмами разрешений. vhost является основой концепции AMQP и должен быть указан при подключении.Vhost по умолчанию для RabbitMQ — / .
  10. Маклер Представляет сущность сервера очереди сообщений.

Маршрутизация сообщений в AMQP

Процесс маршрутизации сообщений в AMQP несколько отличается от привычного Java-разработчикам JMS.В AMQP добавлены роли Exchange и Binding. Производитель публикует сообщение в Exchange, сообщение, наконец, поступает в очередь и принимается потребителем, а Binding определяет, в какую очередь должно быть отправлено сообщение Exchange.

AMQP 的消息路由过程

Тип обмена

Exchange распределяет сообщения в соответствии с различными стратегиями распространения различных типов. В настоящее время существует четыре типа: прямое, разветвление, тема и заголовки. Заголовки соответствуют заголовку сообщения AMQP, а не ключу маршрутизации, кроме того, обмен заголовками точно такой же, как и прямой обмен, но производительность намного хуже, и он почти бесполезен в настоящее время, поэтому давайте посмотрим на другие три типа напрямую:

  1. direct
    direct 交换器
    Если ключ маршрутизации в сообщении совпадает с ключом привязки в Binding, биржа отправит сообщение в соответствующую очередь. Ключ маршрутизации точно соответствует имени очереди. Если очередь привязана к коммутатору и требует, чтобы ключ маршрутизации был "dog", будут пересылаться только сообщения, отмеченные ключом маршрутизации "dog", а не "dog.puppy" или " dog.puppy" будет переадресован.dog.guard" и так далее. Это точное совпадение, одноадресный шаблон.
  2. fanout
    fanout 交换器
    Каждое сообщение, отправленное на обмен типа разветвления, распространяется на все связанные очереди. Разветвленный обмен не обрабатывает ключи маршрутизации, он просто привязывает очередь к обмену, и каждое сообщение, отправленное на обмен, перенаправляется во все очереди, связанные с этим обменом. Как и при широковещании подсети, узлы в каждой подсети получают копию сообщения. Тип разветвления является самым быстрым для пересылки сообщений.
  3. topic
    topic 交换器
    Обмен темами назначает атрибут ключа маршрутизации сообщения посредством сопоставления с шаблоном и сопоставляет ключ маршрутизации с определенным шаблоном.В это время очередь должна быть привязана к шаблону. Он разбивает строку ключей маршрутизации и ключей привязки на слова, разделенные точками. Он также распознает два подстановочных знака: символ «#» и символ «*». # соответствует 0 или более словам, * соответствует не более чем одному слову.

Установка RabbitMQ

Вообще говоря, вам нужно установить Erlang перед установкой RabbitMQ, вы можете перейти кОфициальный сайт Эрлангаскачать. затем перейтиОфициальный сайт RabbitMQЗагрузите установочный пакет и разархивируйте его. В зависимости от операционной системы официальный сайт предоставляет соответствующие инструкции по установке:Windows,Debian / Ubuntu,RPM-based Linux,Mac

Если вы являетесь пользователем Mac, я лично рекомендую использовать HomeBrew для установки, и brew необходимо обновить перед установкой:

brew update

Затем установите сервер rabbitmq:

brew install rabbitmq

Таким образом, RabbitMQ устанавливается, и Erlang, от которого он зависит, будет автоматически установлен в процессе установки.

Работа и управление RabbitMQ

  1. запускать Запуск очень прост. Найдите каталог sbin в каталоге, где находится установленный RabbitMQ. Вы можете видеть, что в этом каталоге есть 6 исполняемых файлов, начинающихся с rabbitmq. Вы можете напрямую запустить rabbitmq-server. Место установки RabbitMQ: заменяется на . , команда запуска:
./sbin/rabbitmq-server

Если запуск нормальный, вы увидите некоторую информацию о процессе запуска, а окончательный вариант завершен с 7 плагинами, что также означает, что 7 плагинов загружаются по умолчанию при запуске.

正常启动

  1. начать в фоновом режиме Если вы хотите, чтобы RabbitMQ работал в фоновом режиме как демон, вы можете добавить параметр -detached при запуске:
./sbin/rabbitmq-server -detached
  1. Состояние сервера запросов В каталоге sbin есть особенно важный файл с именем rabbitmqctl, который обеспечивает почти универсальное решение для нужд управления RabbitMQ и может предоставить большинство команд по эксплуатации и обслуживанию. Чтобы запросить информацию о состоянии сервера RabbitMQ, вы можете использовать параметр status :
./sbin/rabbitmqctl status

Эта команда выведет много информации о сервере, такую ​​как версии RabbitMQ и Erlang, имя ОС, память и т. д.

  1. Завершите работу узла RabbitMQ. Мы знаем, что RabbitMQ написан на языке Erlang, а в Erlang есть два понятия: узел и приложение. Узел — это каждый экземпляр виртуальной машины Erlang, и на одном узле могут работать несколько приложений Erlang. Между узлами возможна локальная связь (независимо от того, работают они на одном сервере или нет). Например, приложение, работающее на узле A, может вызвать метод приложения на узле B, точно так же, как вызов локальной функции. Если приложение по какой-либо причине падает, узел Erlang автоматически попытается перезапустить приложение. Если вы хотите закрыть весь узел RabbitMQ, вы можете использовать параметр stop :
./sbin/rabbitmqctl stop

Он свяжется с локальным узлом и сообщит ему о корректном завершении работы.Вы также можете указать, чтобы закрыть различные узлы, включая удаленные узлы, просто передайте параметр -n :

./sbin/rabbitmqctl -n rabbit@server.example.com stop 

-n node Имя узла по умолчанию — rabbit@server, если ваше имя хоста — server.example.com, то имя узла — rabbit@server.example.com.

  1. Закройте приложение RabbitMQ. Если вы просто хотите закрыть приложение, оставив узел Erlang работающим, вы можете использовать stop_app:
./sbin/rabbitmqctl stop_app

Эта команда будет полезна в режиме кластера, описанном ниже.

  1. Запустите приложение RabbitMQ.
./sbin/rabbitmqctl start_app
  1. Сбросить узел RabbitMQ
./sbin/rabbitmqctl reset

Эта команда очистит все очереди.

  1. Просмотр объявленных очередей
./sbin/rabbitmqctl list_queues
  1. Просмотр переключателей
./sbin/rabbitmqctl list_exchanges

Команда также может добавлять параметры, такие как список имени и типа обмена, является ли он постоянным, удаляется ли он автоматически:

./sbin/rabbitmqctl list_exchanges name type durable auto_delete
  1. Посмотреть привязки
./sbin/rabbitmqctl list_bindings

Доступ к Java-клиенту

RabbitMQ поддерживает многоязычный доступ, возьмите Java в качестве примера, чтобы увидеть шаги использования RabbitMQ в целом.

  1. Добавьте зависимости в файл pom проекта maven.
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>4.1.0</version>
</dependency>
  1. производитель сообщений
package org.study.rabbitmq;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer {

    public static void main(String[] args) throws IOException, TimeoutException {
        //创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUsername("guest");
        factory.setPassword("guest");
        //设置 RabbitMQ 地址
        factory.setHost("localhost");
        //建立到代理服务器到连接
        Connection conn = factory.newConnection();
        //获得信道
        Channel channel = conn.createChannel();
        //声明交换器
        String exchangeName = "hello-exchange";
        channel.exchangeDeclare(exchangeName, "direct", true);

        String routingKey = "hola";
        //发布消息
        byte[] messageBodyBytes = "quit".getBytes();
        channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);

        channel.close();
        conn.close();
    }
}
  1. потребитель сообщений
package org.study.rabbitmq;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer {

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setHost("localhost");
        //建立到代理服务器到连接
        Connection conn = factory.newConnection();
        //获得信道
        final Channel channel = conn.createChannel();
        //声明交换器
        String exchangeName = "hello-exchange";
        channel.exchangeDeclare(exchangeName, "direct", true);
        //声明队列
        String queueName = channel.queueDeclare().getQueue();
        String routingKey = "hola";
        //绑定队列,通过键 hola 将队列和交换器绑定起来
        channel.queueBind(queueName, exchangeName, routingKey);

        while(true) {
            //消费消息
            boolean autoAck = false;
            String consumerTag = "";
            channel.basicConsume(queueName, autoAck, consumerTag, new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag,
                                           Envelope envelope,
                                           AMQP.BasicProperties properties,
                                           byte[] body) throws IOException {
                    String routingKey = envelope.getRoutingKey();
                    String contentType = properties.getContentType();
                    System.out.println("消费的路由键:" + routingKey);
                    System.out.println("消费的内容类型:" + contentType);
                    long deliveryTag = envelope.getDeliveryTag();
                    //确认消息
                    channel.basicAck(deliveryTag, false);
                    System.out.println("消费的消息体内容:");
                    String bodyStr = new String(body, "UTF-8");
                    System.out.println(bodyStr);

                }
            });
        }
    }
}
  1. Запустите сервер RabbitMQ.
./sbin/rabbitmq-server
  1. управлять потребителем Сначала запустите приемник, чтобы, когда производитель отправляет сообщение, вы могли видеть запись сообщения в бэкэнде потребителя.
  2. запустить продюсер Затем запустите Producer, опубликуйте сообщение, и вы увидите полученное сообщение в консоли Consumer:
    Consumer 控制台

Кластер RabbitMQ

Одной из лучших функций RabbitMQ является встроенная кластеризация, которая предназначена для того, чтобы позволить потребителям и производителям продолжать работу в случае сбоя узла, а также линейно масштабировать пропускную способность обмена сообщениями путем добавления дополнительных узлов. RabbitMQ внутренне использует распределенную коммуникационную структуру OTP, предоставленную Erlang, для удовлетворения вышеуказанных требований, чтобы клиент мог повторно подключиться к любому другому узлу в кластере, чтобы продолжить создание и потребление сообщений, даже если соединение с узлом RabbitMQ потеряно.

Некоторые концепции кластера RabbitMQ

RabbitMQ всегда записывает следующие четыре типа внутренних метаданных:

  1. метаданные очереди Включая имена очередей и их свойства, например, являются ли они постоянными и удаляются ли они автоматически.
  2. Обмен метаданными Имя биржи, тип, свойства
  3. связывание метаданных Внутри находится таблица, в которой записано, как сообщения направляются в очередь.
  4. метаданные виртуального хоста Предоставляет пространство имен и свойства безопасности для очередей, обменов, привязок внутри виртуальных хостов.

В одном узле RabbitMQ будет хранить всю эту информацию в памяти, а также хранить очереди, обмены и привязки, помеченные как постоянные на диск. Сохранение на диск гарантирует возможность восстановления очередей и обменов после перезапуска узла. В режиме кластера также есть два варианта: сохранить на жесткий диск (настройка по умолчанию для независимых узлов) и сохранить в памяти.

Если вы создаете очередь в кластере, кластер будет создавать полную информацию об очереди (метаданные, состояние, содержимое) только на одном узле, а не на всех узлах. В результате только узел-владелец очереди знает все об очереди, поэтому, когда узел кластера выходит из строя, очередь и привязки этого узла исчезают, а также теряются все новые сообщения, соответствующие привязкам очереди. К счастью, после RabbitMQ 2.6.0 зеркальные очереди предоставляются, чтобы избежать недоступности содержимого очереди, вызванной сбоями узлов кластера.

В кластере RabbitMQ можно совместно использовать пользователей, виртуальные хосты, биржи и т. д. Все данные и состояния должны реплицироваться на всех узлах, за исключением упомянутой выше очереди сообщений. Узлы RabbitMQ могут динамически присоединяться к кластеру.

При объявлении очередей, обменов и привязок в кластере эти операции не будут возвращаться до тех пор, пока все узлы кластера не зафиксируют изменения метаданных. В кластере есть два типа узлов памяти и дисковых узлов.Хотя узлы памяти не выполняют запись на диск, их выполнение лучше, чем у дисковых узлов. Узлы памяти могут обеспечить превосходную производительность, а дисковые узлы могут обеспечить доступность информации о конфигурации после перезапуска узла.Как сбалансировать эти два фактора в кластере?

Для RabbitMQ требуется только один дисковый узел в кластере, все остальные узлы могут быть узлами памяти, и когда узлы присоединяются к кластеру или покидают его, они должны уведомлять по крайней мере один дисковый узел об изменении. Если есть только один дисковый узел, и этот узел просто выходит из строя, кластер может продолжать маршрутизацию сообщений, но не может создавать очереди, создавать обмены, создавать привязки, добавлять пользователей, изменять разрешения или добавлять или удалять узлы кластера. Другими словами, если единственный дисковый узел в кластере выходит из строя, кластер все еще может работать, но ничего нельзя изменить, пока этот узел не восстановится.

Настройка и запуск кластера RabbitMQ

Если вы запускаете несколько узлов RabbitMQ одновременно на одном компьютере для формирования кластера, только запуск второго и третьего узлов описанным выше способом приведет к сбою запуска из-за конфликтов имен узлов и портов. Поэтому перед каждым вызовом команды rabbitmq-server задайте переменные среды RABBITMQ_NODENAME и RABBITMQ_NODE_PORT, чтобы явно указать уникальное имя узла и порт. В следующем примере номер порта начинается с 5672, и каждый новый запущенный узел увеличивается на 1, а узлы также называются test_rabbit_1, test_rabbit_2 и test_rabbit_3 соответственно.

Запустите первый узел:

RABBITMQ_NODENAME=test_rabbit_1 RABBITMQ_NODE_PORT=5672 ./sbin/rabbitmq-server -detached

Запустите второй узел:

RABBITMQ_NODENAME=test_rabbit_2 RABBITMQ_NODE_PORT=5673 ./sbin/rabbitmq-server -detached

Перед запуском второй ноды рекомендуется отключить активированный по умолчанию плагин в RabbitMQ, иначе возникнет конфликт номеров портов при использовании плагина, что приведет к неудачному запуску ноды.

Теперь второй узел и первый узел являются независимыми узлами, они не знают о существовании других узлов. Узлы, которые присоединяются после первого узла в кластере, должны получить метаданные в кластере, поэтому необходимо сначала остановить приложение RabbitMQ, работающее на узле Erlang, сбросить метаданные узла, а затем присоединиться и получить метаданные. кластера. Наконец, перезапустите приложение RabbitMQ.

Остановите приложение второго узла:

./sbin/rabbitmqctl -n test_rabbit_2 stop_app

Сбросьте метаданные второго узла:

./sbin/rabbitmqctl -n test_rabbit_2 reset

Второй узел присоединяется к кластеру, состоящему из первого узла:

./sbin/rabbitmqctl -n test_rabbit_2 join_cluster test_rabbit_1@localhost

Запустите приложение для 2-го узла

./sbin/rabbitmqctl -n test_rabbit_2 start_app

Процесс настройки третьего узла аналогичен процессу настройки второго узла:

RABBITMQ_NODENAME=test_rabbit_3 RABBITMQ_NODE_PORT=5674 ./sbin/rabbitmq-server -detached

./sbin/rabbitmqctl -n test_rabbit_3 stop_app

./sbin/rabbitmqctl -n test_rabbit_3 reset

./sbin/rabbitmqctl -n test_rabbit_3 join_cluster test_rabbit_1@localhost

./sbin/rabbitmqctl -n test_rabbit_3 start_app

Работа и обслуживание кластера RabbitMQ

Остановите указанный узел, например, остановите второй узел:

RABBITMQ_NODENAME=test_rabbit_2 ./sbin/rabbitmqctl stop

Проверьте состояние кластера узла 3:

./sbin/rabbitmqctl -n test_rabbit_3 cluster_status