ActiveMQ очереди сообщений

Java сервер Spring LevelDB

Введение

Возможности ActiveMQ

ActiveMQ — это промежуточное программное обеспечение для обмена сообщениями с открытым исходным кодом, созданное Apache, целью которого является обеспечение эффективного, масштабируемого, стабильного и безопасного обмена сообщениями на уровне предприятия для приложений. Целью его разработки является предоставление стандартного, ориентированного на сообщения, многоязычного промежуточного программного обеспечения для обмена сообщениями, интегрированного в приложения. ActiveMQ реализует JMS 1.1 и предоставляет множество дополнительных функций, таких как управление JMX, управление ведущим и подчиненным, взаимодействие групп сообщений, приоритет сообщений, отложенное получение сообщений, виртуальные получатели, сохранение сообщений, мониторинг очереди сообщений и многое другое. Его основные особенности:

  1. Поддерживает клиенты и протоколы на нескольких языках, включая Java, C, C++, C#, Ruby, Perl, Python, PHP и другие. Протоколы включают OpenWire, Stomp, AMQP, MQTT.
  2. Предоставляет расширенные функции, такие как групповая связь сообщений, приоритезация сообщений, отложенное получение сообщений, виртуальные получатели, сохранение сообщений.
  3. Полная поддержка спецификаций JMS 1.1 и J2EE 1.4 (включая постоянство, распределенные транзакционные сообщения, транзакции)
  4. Поддержка среды Spring, ActiveMQ можно легко встроить в приложения Spring с помощью файлов конфигурации Spring.
  5. Проходит стандартные тесты серверов J2EE, такие как TomEE, Geronimo, JBoss, GlassFish, WebLogic
  6. Разнообразие методов подключения, ActiveMQ предоставляет различные режимы подключения, такие как in-VM, TCP, SSL, NIO, UDP, многоадресная рассылка, JGroups, JXTA.
  7. Поддерживает быстрое сохранение сообщений с помощью JDBC и журнала.
  8. Предназначен для высокопроизводительных кластеров, клиент-сервер, одноранговой связи и т. д.
  9. Предоставляет независимый от технологий и языка интерфейс REST API.
  10. Поддержка метода Ajax для вызова ActiveMQ
  11. ActiveMQ можно легко интегрировать с технологиями веб-сервисов, такими как CXF, Axis и т. д., для обеспечения надежного обмена сообщениями.
  12. Доступен в качестве поставщика JMS в памяти, идеально подходит для модульного тестирования JMS.

Базовые концепты

Поскольку ActiveMQ полностью поддерживает JMS 1.1, его основные концепции согласуются со спецификацией JMS 1.1 с точки зрения пользователя Java.

Модель обмена сообщениями
  1. Модель «точка-точка» Очередь используется в качестве средства передачи сообщений для удовлетворения модели производитель-потребитель.Сообщение может быть использовано только одним потребителем, а неиспользованные сообщения сохраняются в очереди до тех пор, пока они не будут использованы или истечет время ожидания.

  2. Модель публикации-подписки (Pub/Sub) Использование топика в качестве носителя для передачи сообщений аналогично широковещательному режиму.Публикатор публикует сообщение, и сообщение доставляется всем подписчикам через топик.Пользователи, подписавшиеся после трансляции сообщения, не получат сообщение.

основные компоненты

Основные компоненты, включаемые при использовании ActiveMQ, такие же, как и JMS:

  1. Брокер, брокер сообщений, представляет сущность сервера очереди сообщений, принимает клиентские соединения и предоставляет основные службы для обмена сообщениями.
  2. Производитель, производитель сообщений, инициатор бизнеса, несет ответственность за создание сообщений и их передачу Брокеру.
  3. Потребитель, потребитель сообщений, бизнес-процессор, отвечает за получение сообщений от брокера и обработку бизнес-логики.
  4. Топик, топик, унифицированный набор сообщений в режиме публикации-подписки, разные производители отправляют сообщения в топик, а брокер распределяет их по разным подписчикам для осуществления трансляции сообщения.
  5. Очередь, очередь, в режиме точка-точка конкретный производитель отправляет сообщения в определенную очередь, а потребители подписываются на определенную очередь для получения сообщений и выполнения обработки бизнес-логики.
  6. Сообщение, тело сообщения, представляет собой пакет данных, закодированный в соответствии с фиксированным форматом, определенным различными протоколами связи, для инкапсуляции бизнес-данных и реализации передачи сообщения.

Поскольку эти концепции уже были введены в JMS, они не будут здесь подробно описываться.

Соединитель

Основная функция ActiveMQ Broker — предоставить механизм связи для клиентских приложений.Для этой цели ActiveMQ предоставляет механизм соединения и использует коннектор для описания этого механизма соединения. В ActiveMQ есть два типа соединителей: один — транспортный соединитель (транспортный соединитель), используемый для связи между клиентом и брокером сообщений (client-to-broker), а другой — между серверами брокера сообщений (broker) — to-broker) сетевой разъем для связи. Соединитель представлен URI (унифицированным указателем ресурсов), а формат URI:<schema name>:<hierarchical part>[?<query>][#<fragment>]имя схемы представляет протокол, Например: foo://имя пользователя:пароль@example.com:8042/over/there/index.dtb?type=animal&name=narwhal#nose

Часть имени схемы — foo, иерархическая часть — username:password@example.com:8042/over/there/index.dtb, запрос — type=animal&name=narwhal, а фрагмент — нос.

  1. транспортный соединитель Для обмена сообщениями как производители сообщений, так и потребители сообщений (вместе называемые клиентами) должны подключаться к серверу брокера сообщений, а связь между клиентом и сервером брокера сообщений осуществляется через транспортные соединители. Во многих случаях у пользователей разные потребности при подключении к брокерам сообщений. Одни уделяют больше внимания производительности, а другие — безопасности. Поэтому ActiveMQ предоставляет на выбор ряд протоколов подключения для покрытия этих сценариев использования. С точки зрения брокера сообщений транспортный соединитель используется для обработки и мониторинга клиентских подключений.Проверьте демонстрационный файл конфигурации ActiveMQ (/examples/conf/activemq-demo.xml), конфигурация транспортного соединения выглядит следующим образом:
        <transportConnectors>
            <transportConnector name="openwire" uri="tcp://localhost:61616" discoveryUri="multicast://default"/>
            <transportConnector name="ssl" uri="ssl://localhost:61617"/>
            <transportConnector name="stomp" uri="stomp://localhost:61613"/>
            <transportConnector name="ws" uri="ws://localhost:61614/" />
        </transportConnectors>

Транспортные соединители определены в<transportConnectors>элемент, а<transportConnector>элемент определяет конкретный коннектор, коннектор должен иметь свое собственное уникальное имя и атрибуты URI, ноdiscoveryUriСвойства необязательны. В настоящее время в последней версии ActiveMQ 5.15 обычно используются следующие протоколы подключения транспортного соединителя: vm, tcp, udp, multicast, nio, ssl, http, https, websocket, amqp, mqtt, stomp и т. д.

  • vm, который позволяет клиенту и серверу сообщений взаимодействовать непосредственно внутри виртуальной машины.Используемое соединение — это не соединение Socket, а прямой вызов локального метода виртуальной машины, что позволяет избежать накладных расходов на передачу по сети. Вариант использования ограничен сервером и клиентом в одной и той же JVM.
  • tcp клиент подключается к удаленному серверу сообщений через TCP.
  • udp клиент подключается к удаленному серверу сообщений через UDP.
  • multicast, который позволяет использовать многоадресную передачу для подключения к серверу сообщений.
  • nio, nio и tcp имеют одну и ту же функцию, за исключением того, что nio использует пакет java NIO, который может обеспечить более высокую производительность в некоторых сценариях.
  • ssl, ssl позволяет пользователям использовать SSL через TCP.
  • http и https, что позволяет клиентам подключаться с помощью REST или Ajax, а это означает, что сообщения можно отправлять непосредственно в ActiveMQ с помощью Javascript.
  • websocket, который позволяет клиентам подключаться к серверу сообщений через WebSocket в HTML5.
  • amqp, версия 5.8 стала поддерживаться.
  • mqtt, stomp, версия 5.6 стала поддерживаться.

Конкретную конфигурацию каждого протокола см. на официальном веб-сайте (http://activemq.apache.org/uri-protocols.html). В дополнение к указанным выше базовым протоколам ActiveMQ также поддерживает некоторые расширенные протоколы, которые также можно настроить с помощью URI, такие как Failover и Fanout.

  • Отказоустойчивость — это механизм повторного подключения, который работает на верхнем уровне протокола подключения, описанного выше, для установления надежного транспорта. Его синтаксис конфигурации позволяет указать любое количество составных URI, и он будет автоматически выбирать один из URI для попытки установить соединение, а если соединение будет неудачным, он будет продолжать выбирать другие URI для попытки. Например, синтаксис конфигурации: failover:(tcp://localhost:61616,tcp://remotehost:61616)?initialReconnectDelay=100
  • Fanout — это механизм повторного подключения и репликации, который также работает поверх других подключений и реплицирует сообщения на несколько серверов сообщений посредством репликации. Например, синтаксис конфигурации: fanout:(tcp://localhost:61629,tcp://localhost:61639,tcp://localhost:61649)
  1. сетевой разъем Во многих случаях данные, которые нам нужно обработать, могут быть огромными. В этом случае одному серверу сложно их поддерживать. Это требует использования функции кластера. По этой причине ActiveMQ предоставляет режим сетевого подключения. Экземпляры серверов соединяются вместе для предоставления внешних служб в целом, тем самым улучшая общие возможности службы внешних сообщений. Очереди и списки потребителей могут совместно использоваться экземплярами сервера, подключенными таким образом, для достижения цели распределенных очередей, а сетевые соединители используются для настройки связи между серверами.

使用网络连接器的简单场景
)

Как показано на рисунке, серверы S1 и S2 подключены через NewworkConnector, сообщения, отправленные производителем P1, могут быть получены потребителями C3 и C4, а сообщения, отправленные производителем P3, также могут быть получены потребителями C1 и C2. Чтобы использовать функцию сетевого соединителя, вам необходимо добавить следующую конфигурацию под узлом брокера в файле activemq.xml сервера S1 (при условии, что 192.168.11.23:61617 — это адрес S2):

<networkConnectors>      
          <networkConnector uri="static:(tcp://192.168.11.23:61617)"/>    
</networkConnectors>

В этом случае S1 может отправлять сообщения S2, но это только односторонняя связь, и сообщения, отправленные S2, не могут быть отправлены S1. Если вы хотите, чтобы S1 также получал сообщения от S2, вам нужно добавить следующую конфигурацию в узел брокера в файле activemq.xml S2 (при условии, что 192.168.11.45:61617 — это адрес S1):

<networkConnectors>      
          <networkConnector uri="static:(tcp://192.168.11.45:61617)"/>    
</networkConnectors>

Таким образом, S1 и S2 могут обмениваться данными в обоих направлениях. В настоящее время широко используемые протоколы сетевого соединителя в последней версии ActiveMQ 5.15 являются статическими и многоадресными.

  • static, статический протокол для создания статических конфигураций для нескольких прокси в сети, этот протокол конфигурации поддерживает составные URI (т.е. URI, которые содержат другие URI). Напримерstatic://(tcp://ip:61616,tcp://ip2:61616)
  • multicast, многоадресный протокол, сервер сообщений транслирует свои собственные службы, а также находит другие прокси. Этот метод используется для динамической идентификации между серверами, а не для настройки статических групп IP.

Если вам интересна эта часть, вы можете прочитать официальную документацию: http://activemq.apache.org/networks-of-brokers.html

хранилище сообщений

В спецификации JMS есть два способа распространения сообщений: непостоянный и постоянный. Для непостоянных сообщений разработчики JMS должны убедиться, что они делают все возможное для распространения сообщений, но сообщения не хранятся постоянно; сообщения, распространяемые постоянным образом, должны храниться постоянно. Непостоянные сообщения часто используются для отправки уведомлений или данных в реальном времени.Вы можете выбрать непостоянные сообщения, если вам важна производительность системы и вы теряете некоторые сообщения, не влияя на нормальную работу бизнеса. После отправки постоянного сообщения на сервер сообщений, если текущий потребитель сообщений не запущен, сообщение продолжает существовать и не будет удалено с сервера сообщений до тех пор, пока оно не будет обработано и подтверждено потребителем сообщений.

ActiveMQ поддерживает оба вышеуказанных метода, а также поддерживает восстановление сообщений путем буферизации сообщений промежуточного состояния в памяти. Подводя итог, можно сказать, что в ActiveMQ существует три типа хранения сообщений: хранилище в памяти, хранилище в файлах и хранилище в базе данных. Для специального использования ActiveMQ предоставляет подключаемый механизм хранения сообщений, аналогичный многоточечной передаче сообщений, в основном реализующий следующие типы:

  • AMQ, метод хранения сообщений по умолчанию в ActiveMQ 5.0 и более ранних версиях, представляет собой решение для хранения сообщений на основе файлов с поддержкой транзакций. По этой схеме само сообщение сохраняется в виде журналов и хранится в журнале данных. Кроме того, к сообщениям в журнале также делается справочный индекс, что удобно для быстрого поиска сообщений.
  • KahaDB также представляет собой файловый метод хранения сообщений с поддержкой транзакций. Начиная с версии 5.3 для хранения сообщений рекомендуется использовать KahaDB. Он обеспечивает лучшую масштабируемость и восстанавливаемость, чем хранилище сообщений AMQ.
  • JDBC, основанный на JDBC, хранит сообщения в базе данных, и хранить сообщения в базе данных относительно медленно, поэтому ActiveMQ рекомендует хранить их в сочетании с журналами.Он использует технологию быстрой записи в кэш, что значительно повышает производительность.
  • Хранение в памяти относится к размещению всех сообщений для сохранения в памяти.Поскольку здесь нет динамического кеша, вам необходимо обратить внимание на настройку JVM и размер памяти сервера сообщений.
  • LevelDB, механизм сохраняемости LevelDB, был запущен после версии 5.6. Он использует настраиваемый индекс вместо широко используемого индекса BTree. Его производительность сохранения выше, чем у KahaDB. Хотя метод сохранения по умолчанию по-прежнему KahaDB, LevelDB будет тенденцией . Версия 5.9 также предоставляет метод репликации данных на основе LevelDB и Zookeeper в качестве предпочтительной схемы репликации данных для метода Master-Slave.

Инженерный пример

Экземпляр ActiveMQ для доступа к Java

В спецификации JMS есть два способа доставки сообщений: один — метод очереди (Queue) модели «точка-точка», а другой — метод темы (Topic) модели публикации-подписки. Давайте взглянем на пример доставки тем на Java с использованием ActiveMQ.

импортировать зависимости

Проект Java должен представить зависимость пакета ActiveMQ Версия пакета jar совпадает с установленной версией ActiveMQ:

    <dependency>
        <groupId>org.apache.activemq</groupId>
        <artifactId>activemq-all</artifactId>
        <version>5.15.2</version>
    </dependency>
производитель сообщений
package org.study.mq.activeMQ;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class TopicPublisher {

    /**
     * 默认用户名
     */
    public static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    /**
     * 默认密码
     */
    public static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    /**
     * 默认连接地址
     */
    public static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL;

    public static void main(String[] args) {
        //创建连接工厂
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_URL);
        try {
            //创建连接
            Connection connection = connectionFactory.createConnection();
            //开启连接
            connection.start();
            //创建会话,不需要事务
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //创建 Topic,用作消费者订阅消息
            Topic myTestTopic = session.createTopic("activemq-topic-test1");
            //消息生产者
            MessageProducer producer = session.createProducer(myTestTopic);

            for (int i = 1; i <= 3; i++) {
                TextMessage message = session.createTextMessage("发送消息 " + i);
                producer.send(myTestTopic, message);
            }

            //关闭资源
            session.close();
            connection.close();
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

В режиме "Тема" для публикации сообщений используются производители сообщений. Большая часть кода аналогична режиму "Очередь". Разница в том, что в этом примере тема создается на основе сеанса, который служит получателем сообщений для потребителей. .

потребитель сообщений
package org.study.mq.activeMQ;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class TopicSubscriber {

    /**
     * 默认用户名
     */
    public static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    /**
     * 默认密码
     */
    public static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    /**
     * 默认连接地址
     */
    public static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL;

    public static void main(String[] args) {
        //创建连接工厂
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_URL);
        try {
            //创建连接
            Connection connection = connectionFactory.createConnection();
            //开启连接
            connection.start();
            //创建会话,不需要事务
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //创建 Topic
            Topic myTestTopic = session.createTopic("activemq-topic-test1");

            MessageConsumer messageConsumer = session.createConsumer(myTestTopic);
            messageConsumer.setMessageListener(new MessageListener() {
                @Override
                public void onMessage(Message message) {
                    try {
                        System.out.println("消费者1 接收到消息:" + ((TextMessage) message).getText());
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            });

            MessageConsumer messageConsumer2 = session.createConsumer(myTestTopic);
            messageConsumer2.setMessageListener(new MessageListener() {
                @Override
                public void onMessage(Message message) {
                    try {
                        System.out.println("消费者2 接收到消息:" + ((TextMessage) message).getText());
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            });

            MessageConsumer messageConsumer3 = session.createConsumer(myTestTopic);
            messageConsumer3.setMessageListener(new MessageListener() {
                @Override
                public void onMessage(Message message) {
                    try {
                        System.out.println("消费者3 接收到消息:" + ((TextMessage) message).getText());
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            });

            //让主线程休眠100秒,使消息消费者对象能继续存活一段时间从而能监听到消息
            Thread.sleep(100 * 1000);
            //关闭资源
            session.close();
            connection.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

Чтобы продемонстрировать функцию рассылки сообщений нескольким подписчикам в режиме темы, здесь создаются три объекта-потребителя, которые подписываются на одну и ту же тему.Что особенного, так это то, что основной поток засыпает на определенный период времени.Цель это делается для потребления. Объект прослушивателя может продолжать жить, так что консоль может распечатать содержимое отслеживаемого сообщения.

Запустите сервер ActiveMQ

Выполнять непосредственно в каталоге bin ActiveMQactivemq startЭто запускает ActiveMQ

Запустить темуПодписчик

Сначала необходимо запустить основной метод класса TopicSubscriber, чтобы подписчик мог получить сообщение, когда издатель опубликует сообщение.Если порядок выполнения обратный, сообщение будет опубликовано первым, но подписчики не запущены, и не увидишь..

Запустить TopicPublisher

Затем запустите метод main класса TopicPublisher, опубликуйте 3 сообщения в теме, после чего вы сможете увидеть полученное содержимое сообщения в фоновом режиме TopicSubscriber:

消费者接收到消息

Интеграция Spring с ActiveMQ

В реальных проектах, очевидно, более многословно использовать собственный API ActiveMQ для разработки.Посередине код для создания фабрик соединений и создания соединений может быть извлечен и унифицирован фреймворком.Спринг также подумал и помог нам сделать это вещи. ActiveMQ полностью поддерживает конфигурацию клиента и сервера JMS на основе Spring.В следующем примере показано, как использовать режим очереди и режим темы для доставки сообщений в Spring.

импортировать зависимости
<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-all</artifactId>
    <version>5.15.2</version>
</dependency>

<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-jms</artifactId>
    <version>4.3.10.RELEASE</version>
</dependency>

<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-pool</artifactId>
    <version>5.15.0</version>
</dependency>

В дополнение к пакету activemq в проект необходимо добавить пакет JMS, поддерживаемый Spring. Поскольку создание соединения, сессии и производителя потребляет много системных ресурсов, мы используем его здесь.пул соединенийЧтобы повторно использовать эти ресурсы, также добавьте зависимости activemq-pool.

Файл конфигурации Spring
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="
        http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
        http://www.springframework.org/schema/context
        http://www.springframework.org/schema/context/spring-context-3.0.xsd">
    <context:component-scan base-package="org.study.mq.activeMQ.spring"/>

    <bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
        <property name="connectionFactory">
            <bean class="org.apache.activemq.ActiveMQConnectionFactory">
                <property name="brokerURL">
                    <value>tcp://localhost:61616</value>
                </property>
            </bean>
        </property>
        <property name="maxConnections" value="100"></property>
    </bean>
    <bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
        <property name="targetConnectionFactory" ref="jmsFactory"/>
        <property name="sessionCacheSize" value="1"/>
    </bean>
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="cachingConnectionFactory"/>
        <property name="messageConverter">
            <bean class="org.springframework.jms.support.converter.SimpleMessageConverter"/>
        </property>
    </bean>

    <bean id="testQueue" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg name="name" value="spring-queue"/>
    </bean>
    <bean id="testTopic" class="org.apache.activemq.command.ActiveMQTopic">
        <constructor-arg index="0" value="spring-topic"/>
    </bean>

    <bean id="queueListener" class="org.study.mq.activeMQ.spring.QueueListener"/>
    <bean id="topic1Listener" class="org.study.mq.activeMQ.spring.Topic1Listener"/>
    <bean id="topic2Listener" class="org.study.mq.activeMQ.spring.Topic2Listener"/>

    <bean id="queueContainer"
          class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="cachingConnectionFactory"/>
        <property name="destination" ref="testQueue"/>
        <property name="messageListener" ref="queueListener"/>
    </bean>
    <bean id="topic1Container"
          class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="cachingConnectionFactory"/>
        <property name="destination" ref="testTopic"/>
        <property name="messageListener" ref="topic1Listener"/>
    </bean>
    <bean id="topic2Container"
          class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="cachingConnectionFactory"/>
        <property name="destination" ref="testTopic"/>
        <property name="messageListener" ref="topic2Listener"/>
    </bean>

</beans>

В коде Java в следующем примере проекта используются аннотации, что сейчас также является привычкой многих программистов, поэтому укажите путь к пакету сканирования аннотаций в начале файла конфигурации.org.study.mq.activeMQ.spring, вы можете изменить имя пакета в соответствии с вашей реальной ситуацией, весь код Java в этом примере находится в этом пакете.

Затем определяется фабричный компонент JMS с использованием класса объединенной фабрики соединений.org.apache.activemq.pool.PooledConnectionFactory, на самом деле во внутреннюю фабрику соединений ActiveMQ добавлена ​​функция пула соединений.Из его внутренней конфигурации видно, что она правильнаяorg.apache.activemq.ActiveMQConnectionFactoryинкапсуляция функций, в то время какActiveMQConnectionFactoryЭтот класс более знаком, он используется, когда приведенный выше пример ActiveMQ для доступа к Java создает фабрику соединений в начале. Свойство BrokerURL настраивает протокол и адрес сервера соединения. Следующая cachingConnectionFactory обычно используется в фактическом коде проекта. Это еще одно усовершенствование фабрики соединений. Она использует функцию кэширования соединения для повышения эффективности. Читатели могут использовать ее по своему усмотрению.

jmsTemplate — это решение Spring для длинного и повторяющегося кода при доступе к JMS.Два основных свойства, которые необходимо настроить, — это connectionFactory и messageConverter, а подключение, сеанс и другие объекты получаются через connectionFactory. messageConverter — это преобразователь сообщений конфигурации, потому что обычно сообщение необходимо предварительно обработать и постобработать перед отправкой и после получения, и преобразователь делает эту работу. Таким образом, фактический код отправляет и получает сообщения напрямую через jmsTemplate, а среда Spring выполняет работу по созданию фабрики соединений, созданию соединения и созданию сеанса каждый раз, когда сообщение отправляется и принимается.

При наличии шаблона JMS также необходимо знать очередь и тему как место назначения для фактической отправки и получения сообщений, поэтому далее мы определяем testQueue и testTopic как примеры двух шаблонов. При асинхронном получении сообщений вам необходимо предоставить класс реализации MessageListener, поэтому queueListener определяется как прослушиватель для асинхронного получения сообщений в режиме очереди, Topic1Listener и Topic2Listener как прослушиватели для асинхронного получения сообщений в режиме темы, а два прослушивателя используются в теме. mode предназначены для демонстрации того, что несколько потребителей могут получать сообщения. Последний объект queueContainer, тема1Контейнер и топик2Контейнер используются для привязки прослушивателей сообщений к определенным адресатам сообщений.

класс обслуживания сообщений

Ниже приведен класс службы сообщений, который использует шаблон JMS для обработки сообщений.

package org.study.mq.activeMQ.spring;

import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import javax.jms.*;

@Service
public class MessageService {

    @Resource(name = "jmsTemplate")
    private JmsTemplate jmsTemplate;

    @Resource(name = "testQueue")
    private Destination testQueue;

    @Resource(name = "testTopic")
    private Destination testTopic;

    //向队列发送消息
    public void sendQueueMessage(String messageContent) {
        jmsTemplate.send(testQueue, new MessageCreator() {
            @Override
            public Message createMessage(Session session) throws JMSException {
                TextMessage msg = session.createTextMessage();
                // 设置消息内容
                msg.setText(messageContent);
                return msg;
            }
        });

    }

    //向主题发送消息
    public void sendTopicMessage(String messageContent) {
        jmsTemplate.send(testTopic, new MessageCreator() {
            @Override
            public Message createMessage(Session session) throws JMSException {
                TextMessage msg = session.createTextMessage();
                // 设置消息内容
                msg.setText(messageContent);
                return msg;
            }
        });

    }
}

@Service объявляет класс как службу, и многие коды служб в реальных проектах аналогичны. jmsTemplate, определенный в файле конфигурации выше, напрямую вводится в класс MessageService через аннотацию Resource, и его можно использовать напрямую TestQueue и testTopic аналогичны, а очереди и темы, определенные в файле конфигурации, напрямую вводятся в службу класс. Основное внимание уделяется следующим двум методам отправки сообщений: sendQueueMessage отправляет сообщения в очередь и sendTopicMessage отправляет сообщения в темы. Оба режима используют метод отправки jmsTemplate. Первый параметр метода отправки:javax.jms.DestinationВведите, указав адресата сообщения. так какjavax.jms.Queueиjavax.jms.Topicунаследовалиjavax.jms.Destinationинтерфейса, поэтому этот метод применим как в режиме очереди, так и в режиме темы. Второй параметр метода отправкиorg.springframework.jms.core.MessageCreator, где для создания объекта используется анонимный внутренний класс, а текстовое сообщение создается из поддерживаемого объекта Session, чтобы сообщение можно было отправить. Видно, что будь то очередь или топик, код для отправки сообщений через фреймворк Spring намного проще, чем в предыдущих примерах кода Java.

класс прослушивателя сообщений
package org.study.mq.activeMQ.spring;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

public class QueueListener implements MessageListener {
    @Override
    public void onMessage(Message message) {
        if (message instanceof TextMessage) {
            try {
                TextMessage txtMsg = (TextMessage) message;
                String messageStr = txtMsg.getText();
                System.out.println("队列监听器接收到文本消息:" + messageStr);
            } catch (JMSException e) {
                e.printStackTrace();
            }
        } else {
            throw new IllegalArgumentException("只支持 TextMessage 类型消息!");
        }
    }
}

Прослушиватель сообщений очереди проверяет, является ли это типом текстового сообщения при получении сообщения, и печатает содержимое, если это так.

package org.study.mq.activeMQ.spring;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

public class Topic1Listener implements MessageListener {
    @Override
    public void onMessage(Message message) {
        if (message instanceof TextMessage) {
            try {
                TextMessage txtMsg = (TextMessage) message;
                String messageStr = txtMsg.getText();
                System.out.println("主题监听器1 接收到文本消息:" + messageStr);
            } catch (JMSException e) {
                e.printStackTrace();
            }
        } else {
            throw new IllegalArgumentException("只支持 TextMessage 类型消息!");
        }
    }
}
package org.study.mq.activeMQ.spring;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

public class Topic2Listener implements MessageListener {
    @Override
    public void onMessage(Message message) {
        if (message instanceof TextMessage) {
            try {
                TextMessage txtMsg = (TextMessage) message;
                String messageStr = txtMsg.getText();
                System.out.println("主题监听器2 接收到文本消息:" + messageStr);
            } catch (JMSException e) {
                e.printStackTrace();
            }
        } else {
            throw new IllegalArgumentException("只支持 TextMessage 类型消息!");
        }
    }
}

Код прослушивателя темы аналогичен коду прослушивателя очереди, за исключением того, что используются разные строки для указания сообщений, полученных в настоящее время разными слушателями при печати.

запустить приложение

Чтобы продемонстрировать пример, напишите класс StartApplication, загрузите Spring в метод main и вызовите методы sendQueueMessage и sendTopicMessage для отправки сообщений после получения службы MessageService.

package org.study.mq.activeMQ.spring;

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class StartApplication {
    public static void main(String[] args) {
        ApplicationContext ctx = new ClassPathXmlApplicationContext("spring-context.xml");
        MessageService messageService = (MessageService) ctx.getBean("messageService");

        messageService.sendQueueMessage("我的测试消息1");
        messageService.sendTopicMessage("我的测试消息2");
        messageService.sendTopicMessage("我的测试消息3");
    }

}

После запуска службы activeMQ запустите класс StartApplication и посмотрите полученное текстовое сообщение в консоли:

接收到文本消息

Слушатель очереди прослушал одно сообщение, а два прослушивателя темы прослушали два сообщения соответственно.