Введение
Возможности RocketMQ
RocketMQ - это промежуточное ПО для распределенного обмена сообщениями с открытым исходным кодом от Alibaba, выпущенное в 2012 году. Оно было передано в дар Apache Software Foundation и 25 сентября 2017 года стало проектом высшего уровня Apache. В качестве отечественного промежуточного программного обеспечения, которое неоднократно проходило крещение в «суперпроектах», таких как Double Eleven от Alibaba, и имеет стабильную и выдающуюся производительность, в последние годы оно используется все больше и больше благодаря своей высокой производительности, низкой задержке и высокой надежности. используется отечественными предприятиями. Его основные особенности:
1. Гибкая масштабируемость
RocketMQ естественно поддерживает кластеры, и каждый из его четырех основных компонентов (сервер имен, брокер, производитель и потребитель) можно масштабировать горизонтально без единой точки отказа.
2. Массивная возможность накопления сообщений
RocketMQ использует принцип нулевого копирования для реализации накопления сверхбольших сообщений.Говорят, что одна машина может поддерживать накопление сотен миллионов сообщений и по-прежнему поддерживает низкую задержку записи после накопления такого количества сообщений.
3. Поддержка последовательных сообщений
Гарантируется, что потребители сообщений потребляют сообщения в том порядке, в котором они были отправлены. Последовательные сообщения делятся на глобальный порядок и локальный порядок, Как правило, рекомендуется использовать локальный порядок, то есть производитель реализует путем последовательной отправки сообщений определенного типа в одну и ту же очередь.
4. Несколько методов фильтрации сообщений
Фильтрация сообщений делится на фильтрацию на стороне сервера и фильтрацию на стороне потребителя. При фильтрации на стороне сервера фильтрация может выполняться в соответствии с требованиями потребителей сообщений. Преимущество заключается в сокращении ненужной передачи сообщений. Недостатком является увеличение нагрузки на сервер сообщений и относительно сложная реализация. Фильтрация на стороне потребителя полностью реализуется конкретными приложениями.Этот метод более гибкий.Недостатком является то, что потребителям сообщений будет передано много бесполезных сообщений.
5. Поддержка сообщений о транзакциях
В дополнение к поддержке обычных сообщений и последовательных сообщений RocketMQ также поддерживает сообщения о транзакциях.Эта функция предоставляет еще одно решение для распределенных транзакций.
6. Ретроспективное потребление
Ретроспективное потребление относится к сообщениям, которые были успешно обработаны потребителями. Из-за бизнес-требований RocketMQ поддерживает ретроспективное потребление в соответствии со временем.Время измеряется с точностью до миллисекунд, и его можно отследить вперед или назад.
Базовые концепты
Ниже приведена схема структуры развертывания RocketMQ, которая включает четыре основных компонента RocketMQ: Сервер имен, Брокер, Производитель и Потребитель.Каждый компонент может быть развернут в режиме кластера для горизонтального расширения.
режиссер
Производитель отвечает за создание сообщений, а производители отправляют сообщения, созданные системой бизнес-приложений, на сервер сообщений. RocketMQ предоставляет три способа отправки сообщений: синхронный, асинхронный и односторонний.
Отправить синхронно
Синхронная отправка означает, что после того, как отправитель сообщения отправит данные, он отправит следующий пакет данных после получения ответа от получателя. Обычно используется для важных уведомлений, таких как электронные письма с важными уведомлениями и маркетинговые текстовые сообщения.
Асинхронная отправка
Асинхронная отправка означает, что после того, как отправитель отправляет данные, он не ждет, пока получатель отправит ответ, а затем отправляет следующий пакет данных.Обычно используется в бизнес-сценариях, где связь может занять много времени и чувствительна к ответу. Например, уведомление начинается после загрузки пользовательского видео. Сервис транскодирования.
односторонняя отправка
Односторонняя отправка означает, что он отвечает только за отправку сообщений, не дожидаясь ответа от сервера, и функция обратного вызова не запускается.Это подходит для некоторых сценариев, которые занимают очень короткое время, но не требуют высокой надежности, таких как сбор журнала .
продюсерская группа
Группа производителей представляет собой набор типов производителей. Этот тип производителей обычно отправляет тип сообщения, а логика отправки согласуется, поэтому эти производители сгруппированы вместе. С точки зрения структуры развертывания производитель помечает себя как кластер именем группы производителей.
потребитель
Потребители отвечают за потребление сообщений, а потребители извлекают информацию с сервера сообщений и вводят ее в пользовательское приложение. С точки зрения пользовательских приложений существует два типа потребителей: потребители-вытягиватели и потребители-выталкиватели.
тянуть потребителя
Pull Consumers активно извлекает информацию с сервера сообщений. Пока сообщения извлекаются пакетами, пользовательское приложение запускает процесс потребления, поэтому Pull называется активным потреблением.
подтолкнуть потребителя
Push Consumer инкапсулирует извлечение сообщений, ход потребления и другие внутренние работы по обслуживанию и оставляет интерфейс обратного вызова выполненным, когда сообщение поступает в пользовательское приложение для реализации. Поэтому Push называется типом пассивного потребления, но с точки зрения реализации он все равно извлекает сообщения с сервера сообщений.В отличие от Pull, Push должен сначала зарегистрировать прослушиватель потребления и начинать потреблять сообщения только тогда, когда прослушиватель срабатывает.
группа потребителей
Группа потребителей — это имя коллекции типа потребителей, которые обычно потребляют сообщения одного и того же типа и имеют одинаковую логику потребления, поэтому эти потребители группируются вместе. Подобно группе производителей, группа потребителей группирует одни и те же роли вместе и дает им имена. Группировка представляет собой очень тонкий концептуальный дизайн. RocketMQ реализует естественную балансировку нагрузки сообщений с помощью этого механизма группировки. При потреблении сообщений сообщения распределяются по нескольким экземплярам сервера-потребителя через группу потребителей.Например, если в теме 9 сообщений, а в одной группе потребителей 3 экземпляра (3 процесса или 3 машины), то в каждом экземпляре 3 сообщения будут разделены поровну. , что также означает, что мы можем легко добиться горизонтального расширения путем добавления машин.
сервер сообщений
Сервер сообщений (Брокер) — это центр хранения сообщений, его основная функция — получать и хранить сообщения от Продюсера, а Потребитель получает сообщения отсюда. Он также хранит метаданные, связанные с сообщениями, включая группы пользователей, смещения хода выполнения, информацию об очереди и многое другое. Из схемы структуры развертывания видно, что есть два типа брокеров: Master и Slave, Master может писать и читать, а Slave не может писать, но может читать. С точки зрения физической структуры существует четыре типа методов развертывания кластера брокера: один главный, несколько главных, несколько главных и несколько подчиненных (синхронная очистка) и несколько ведущих и несколько подчиненных (асинхронная очистка).
Один мастер
Таким образом, после перезапуска или выхода из строя брокера вся служба будет недоступна.Этот метод является рискованным, поэтому он явно не рекомендуется для онлайн-сред.
Несколько мастеров
Все серверы сообщений являются главными, а не подчиненными. Преимущество этого метода в том, что конфигурация проста, а время простоя или перезапуск одного Мастера не влияет на приложение. Недостатком является то, что во время простоя одной машины на неиспользованные сообщения на машине нельзя подписаться, пока машина не будет восстановлена, и это повлияет на характер сообщений в реальном времени.
Несколько мастеров и несколько ведомых (асинхронная репликация)
Каждый ведущий настроен с подчиненным, поэтому существует несколько пар ведущий-ведомый, и сообщения реплицируются асинхронно, а задержка сообщений между ведущим и подчиненным составляет миллисекунды. Преимущество этого метода заключается в том, что теряется очень мало сообщений, и это не влияет на производительность сообщений в реальном времени.После того, как ведущее устройство не работает, потребители могут продолжать потреблять из ведомого устройства.Промежуточный процесс прозрачен для пользовательского приложения. , никакого ручного вмешательства не требуется, а производительность такая же, как и у нескольких мастеров.Способ почти такой же. Недостатком является то, что очень мало сообщений теряется в случае повреждения диска, когда мастер выходит из строя.
Несколько мастеров и несколько ведомых устройств (синхронная двойная запись)
Каждый главный настроен с подчиненным, поэтому существует несколько пар ведущий-подчиненный.Сообщение принимает синхронный метод двойной записи, и мастер и резервная копия успешно записываются, прежде чем вернуть успех. Преимущество этого метода в том, что нет единой точки проблемы для данных и сервисов, нет задержки в сообщениях, когда Master не работает, а доступность сервисов и данных очень высока. Недостаток в том, что производительность немного ниже, чем у асинхронного метода репликации, а задержка отправки сообщений будет немного выше.
сервер имен
NameServer используется для хранения метаинформации, связанной с посредником, и для поиска информации о посреднике для производителя и потребителя. NameServer спроектирован таким образом, чтобы практически не сохранять состояние и масштабироваться горизонтально, без связи между узлами и путем развертывания нескольких компьютеров, чтобы пометить себя как псевдокластер. Каждый Брокер будет регистрироваться на Сервере имен при его запуске, Производитель будет получать информацию о маршрутизации Брокера от темы к Серверу имен перед отправкой сообщения, а Потребитель также будет регулярно получать информацию о маршрутизации темы. Таким образом, с функциональной точки зрения он должен быть похож на ZooKeeper.Говорят, что ранняя версия RocketMQ действительно использовала ZooKeeper, который позже был заменен на NameServer, реализованный сам по себе.
Информация
Сообщение – это информация, которая должна быть передана. Сообщение должно иметь тему (Topic), а тему можно рассматривать как адрес, на который должно быть отправлено ваше письмо. Сообщение также может иметь необязательную пару «тег» и «ключ-значение», которую можно использовать для установки бизнес-ключа и поиска этого сообщения в брокере для поиска проблем во время разработки.
тема
Тему (Topic) можно рассматривать как обычный тип сообщения, это первый тип сообщения. Например, систему электронной коммерции можно разделить на: новости транзакций, новости логистики и т. д. Сообщение должно иметь Тему. Тема имеет очень слабые отношения с производителями и потребителями.Тема может иметь 0, 1 или несколько производителей, отправляющих сообщения в нее, и производитель также может отправлять сообщения в разные темы одновременно. Тема также может быть подписана 0, 1 или несколькими потребителями.
Этикетка
Тег (Tag) можно рассматривать как подтему, это тип сообщения второго уровня, используемый для предоставления пользователям дополнительной гибкости. С помощью тегов сообщения одного и того же бизнес-модуля с разными целями могут быть идентифицированы одной и той же темой, но разными тегами. Например, сообщения о транзакциях можно разделить на: сообщения о создании транзакции, сообщения о завершении транзакции и т. д. Сообщение может не иметь тега. Теги помогают поддерживать чистоту и согласованность кода, а также помогают с системой запросов, предоставляемой RocketMQ.
очередь сообщений
Очередь сообщений (Message Queue), тема делится на одну или несколько подтем, то есть очередь сообщений. В теме можно установить несколько очередей сообщений. При отправке сообщения выполняется тема сообщения. RocketMQ будет опрашивать все очереди в теме для отправки сообщения. На следующем рисунке показана ситуация внутренних сообщений брокера:
шаблон потребления сообщений
Существует два режима потребления сообщений: кластеризация и широковещательная рассылка. По умолчанию используется кластерное потребление. В этом режиме потребительский кластер потребляет несколько очередей темы вместе, и очередь может потребляться только одним потребителем. млн. потребителей продолжают тратить. Широковещательное сообщение о потреблении будет отправлено каждому потребителю в группе потребителей для потребления.
порядок сообщений
Существует два типа порядка сообщений: последовательное потребление (Orderly) и параллельное потребление (Concurrently). Последовательное потребление означает, что порядок потребления сообщений такой же, как порядок, который производитель отправляет для каждой очереди сообщений, поэтому, если вы имеете дело со сценарием, в котором глобальный порядок является обязательным, вам необходимо убедиться, что в используемой теме есть только одно сообщение. очередь. Параллельное потребление больше не гарантирует порядок сообщений, а максимальное параллельное потребление ограничено пулом потоков, указанным каждым клиентом-потребителем.
Инженерный пример
Доступ к Java-экземпляру RocketMQ
RocketMQ в настоящее время поддерживает доступ на трех языках: Java, C++ и Go. По соглашению, возьмите язык Java в качестве примера, чтобы увидеть, как использовать RocketMQ для отправки и получения сообщений.
импортировать зависимости
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.2.0</version>
</dependency>
Добавить поддержку клиентского доступа RocketMQ. Конкретная версия совпадает с установленной версией RocketMQ.
производитель сообщений
package org.study.mq.rocketMQ.java;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class Producer {
public static void main(String[] args) throws Exception {
//创建一个消息生产者,并设置一个消息生产者组
DefaultMQProducer producer = new DefaultMQProducer("niwei_producer_group");
//指定 NameServer 地址
producer.setNamesrvAddr("localhost:9876");
//初始化 Producer,整个应用生命周期内只需要初始化一次
producer.start();
for (int i = 0; i < 100; i++) {
//创建一条消息对象,指定其主题、标签和消息内容
Message msg = new Message(
"topic_example_java" /* 消息主题名 */,
"TagA" /* 消息标签 */,
("Hello Java demo RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* 消息内容 */
);
//发送消息并返回结果
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
// 一旦生产者实例不再被使用则将其关闭,包括清理资源,关闭网络连接等
producer.shutdown();
}
}
В примере класс DefaultMQProducer используется для создания производителя сообщений.Обычно приложение создает объект DefaultMQProducer, поэтому объект-производитель обычно поддерживается приложением, которое может быть установлено как глобальный объект или одноэлементный объект. Входным параметром productGroup этого конструктора класса является имя группы производителей сообщений. И производитель, и потребитель должны указать имя группы и обеспечить уникальность имени. Группа производителей мало влияет на отправку обычных сообщений. Будут введены распределенные транзакции. позже будет использовано сообщение.
Далее укажите адрес NameServer и вызовите метод start для инициализации.Метод start необходимо вызвать только один раз за весь жизненный цикл приложения.
После завершения инициализации вызовите метод send для отправки сообщения.В примере для отправки просто создается только 100 одинаковых сообщений.На самом деле объект Producer может отправлять сообщения с несколькими темами и несколькими тегами, а тег объект сообщения может быть пустым. Метод отправки является синхронным вызовом, если он не генерирует исключение, он указывает на успех.
Наконец, когда приложение завершает работу, вызовите метод завершения работы, чтобы очистить ресурсы, закрыть сетевое соединение и выйти из сервера.Обычно рекомендуется, чтобы приложение вызывало метод завершения работы в обработчике выхода таких контейнеров, как JBOSS и Tomcat. .
потребитель сообщений
package org.study.mq.rocketMQ.java;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import java.io.UnsupportedEncodingException;
import java.util.Date;
import java.util.List;
public class Consumer {
public static void main(String[] args) throws Exception {
//创建一个消息消费者,并设置一个消息消费者组
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("niwei_consumer_group");
//指定 NameServer 地址
consumer.setNamesrvAddr("localhost:9876");
//设置 Consumer 第一次启动时从队列头部开始消费还是队列尾部开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//订阅指定 Topic 下的所有消息
consumer.subscribe("topic_example_java", "*");
//注册消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {
//默认 list 里只有一条消息,可以通过设置参数来批量接收消息
if (list != null) {
for (MessageExt ext : list) {
try {
System.out.println(new Date() + new String(ext.getBody(), "UTF-8"));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 消费者对象在使用之前必须要调用 start 初始化
consumer.start();
System.out.println("消息消费者已启动");
}
}
В этом примере класс DefaultMQPushConsumer используется для создания потребителя сообщений.Такое приложение, как производитель, обычно создает объект DefaultMQPushConsumer, который обычно поддерживается приложением и может быть задан как глобальный объект или одноэлементный объект. Параметр ConsumerGroup этого конструктора класса является именем группы потребителей сообщений, и необходимо гарантировать уникальность имени.
Затем укажите адрес NameServer и задайте, начинает ли приложение-потребитель потреблять из головы очереди или из хвоста очереди, когда приложение-потребитель запускается в первый раз.
Затем вызовите метод подписки, чтобы подписать объект-потребитель на сообщения в указанной теме. Первый параметр этого метода — название темы, а второй — имя тега. В примере показано, что сообщение всех тегов в названии темы подписан.
Самое главное зарегистрировать прослушиватель сообщений для потребления сообщений.В примере используется метод Consumer Push, то есть метод установки обратного вызова слушателя для потребления сообщений.В Списке по умолчанию только одно сообщение метод обратного вызова слушателя, который может быть получен пакетами путем установки параметров информации.
Наконец, метод запуска вызывается для инициализации, а метод запуска нужно вызывать только один раз за весь жизненный цикл приложения.
Запустить сервер имен
nohup sh bin/mqnamesrv &
tail -f ~/logs/rocketmqlogs/namesrv.log
Среди четырех основных компонентов RocketMQ Name Server и Broker предоставляются установочным пакетом RocketMQ, поэтому эти два приложения необходимо запустить для предоставления служб сообщений. Сначала запустите сервер имен, убедитесь, что на вашем компьютере установлен JDK, соответствующий RocketMQ, и установите переменную среды JAVA_HOME, а затем выполните mqnamesrv в каталоге bin в каталоге установки RocketMQ.По умолчанию выполнение команды Вывод в файл nohup.out в текущем каталоге и, наконец, отслеживание файла журнала для просмотра фактической работы сервера имен.
Начать Брокер
nohup sh bin/mqbroker -n localhost:9876 &
tail -f ~/logs/rocketmqlogs/broker.log
Также убедитесь, что на вашем компьютере установлен JDK, соответствующий RocketMQ, и установлена переменная среды JAVA_HOME, а затем выполните mqbroker в каталоге bin в каталоге установки RocketMQ.По умолчанию выполнение команды будет выводиться к текущему файлу nohup.out каталога и, наконец, проследите файл журнала, чтобы просмотреть фактическую работу брокера.
управлять потребителем
Сначала запустите класс Consumer, чтобы, когда производитель отправляет сообщение, вы могли видеть запись сообщения в серверной части потребителя. Если с конфигурацией проблем нет, вы увидите вывод, напечатанный на консоли.消息消费者已启动
запустить продюсер
Наконец, запустите класс Producer, и вы сможете увидеть полученные сообщения в консоли Consumer.
Spring интегрирует RocketMQ
В отличие от RabbitMQ, ActiveMQ, Kafka и другого промежуточного программного обеспечения сообщений, сообщество Spring обеспечивает интеграцию этих продуктов промежуточного программного обеспечения различными способами, например, интегрируя ActiveMQ через spring-jms, интегрируя RabbitMQ через spring-rabbit в рамках проекта Spring AMQP и интегрируя Spring. через spring -kafka Интеграция kafka, благодаря которой они могут более легко использовать ее API в проектах Spring. В настоящее время существует три способа интеграции RocketMQ в среду Spring: один — определить производителей и потребителей сообщений как bean-объекты, которыми будет управлять контейнер Spring, а другой — использовать внешний проект RocketMQ-jms сообщества RocketMQ. (https://github.com/apache/rocketmq-externals/tree/master/rocketmq-jms), а затем интегрируется и используется через spring-jms. В-третьих, если ваше приложение основано на spring-boot, вы можете использовать внешний RocketMQ проект Rocketmq-spring-boot-starter (https://github.com/apache/rocketmq-externals/tree/master/rocketmq-spring-boot-starter) более удобен для отправки и получения сообщений.
В общем, проект Rocketmq-jms реализует часть спецификации JMS 1.1 и в настоящее время поддерживает модель публикации/подписки в JMS для отправки и получения сообщений. Проект Rocketmq-spring-boot-starter в настоящее время поддерживает синхронную отправку, асинхронную отправку, одностороннюю отправку, последовательное потребление, параллельное потребление, кластерное потребление, широковещательное потребление и др. Существуют функции, которые соответствуют бизнес-требованиям для использования проекта. Конечно, самый гибкий способ использования API — это первый способ.Давайте возьмем первый способ в качестве примера, чтобы кратко увидеть, как Spring интегрирует RocketMQ.
производитель сообщений
package org.study.mq.rocketMQ.spring;
import org.apache.log4j.Logger;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
public class SpringProducer {
private Logger logger = Logger.getLogger(getClass());
private String producerGroupName;
private String nameServerAddr;
private DefaultMQProducer producer;
public SpringProducer(String producerGroupName, String nameServerAddr) {
this.producerGroupName = producerGroupName;
this.nameServerAddr = nameServerAddr;
}
public void init() throws Exception {
logger.info("开始启动消息生产者服务...");
//创建一个消息生产者,并设置一个消息生产者组
producer = new DefaultMQProducer(producerGroupName);
//指定 NameServer 地址
producer.setNamesrvAddr(nameServerAddr);
//初始化 SpringProducer,整个应用生命周期内只需要初始化一次
producer.start();
logger.info("消息生产者服务启动成功.");
}
public void destroy() {
logger.info("开始关闭消息生产者服务...");
producer.shutdown();
logger.info("消息生产者服务已关闭.");
}
public DefaultMQProducer getProducer() {
return producer;
}
}
Производитель сообщений должен разделить жизненный цикл объекта производителя DefaultMQProducer на три метода: конструктор, инициализация и уничтожение.В конструкторе имя группы производителей и адрес NameServer предоставляются контейнером Spring в качестве переменных во время настройки, а метод DefaultMQProducer создается в методе init.Объект, установка адреса NameServer, инициализация объекта-производителя, метод уничтожения используется для очистки ресурсов при уничтожении объекта-производителя.
потребитель сообщений
package org.study.mq.rocketMQ.spring;
import org.apache.log4j.Logger;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
public class SpringConsumer {
private Logger logger = Logger.getLogger(getClass());
private String consumerGroupName;
private String nameServerAddr;
private String topicName;
private DefaultMQPushConsumer consumer;
private MessageListenerConcurrently messageListener;
public SpringConsumer(String consumerGroupName, String nameServerAddr, String topicName, MessageListenerConcurrently messageListener) {
this.consumerGroupName = consumerGroupName;
this.nameServerAddr = nameServerAddr;
this.topicName = topicName;
this.messageListener = messageListener;
}
public void init() throws Exception {
logger.info("开始启动消息消费者服务...");
//创建一个消息消费者,并设置一个消息消费者组
consumer = new DefaultMQPushConsumer(consumerGroupName);
//指定 NameServer 地址
consumer.setNamesrvAddr(nameServerAddr);
//设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//订阅指定 Topic 下的所有消息
consumer.subscribe(topicName, "*");
//注册消息监听器
consumer.registerMessageListener(messageListener);
// 消费者对象在使用之前必须要调用 start 初始化
consumer.start();
logger.info("消息消费者服务启动成功.");
}
public void destroy(){
logger.info("开始关闭消息消费者服务...");
consumer.shutdown();
logger.info("消息消费者服务已关闭.");
}
public DefaultMQPushConsumer getConsumer() {
return consumer;
}
}
Подобно производителю сообщений, потребитель сообщений делит жизненный цикл объекта производителя DefaultMQPushConsumer на три метода: конструктор, инициализация и уничтожение.Конкретное значение было введено при знакомстве с Java для доступа к экземплярам RocketMQ и не будет повторяться здесь. Конечно, с объектом-потребителем прослушиватель сообщений также должен выполнить определенную логику обработки после получения сообщения.
package org.study.mq.rocketMQ.spring;
import org.apache.log4j.Logger;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.io.UnsupportedEncodingException;
import java.util.List;
public class MessageListener implements MessageListenerConcurrently {
private Logger logger = Logger.getLogger(getClass());
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
if (list != null) {
for (MessageExt ext : list) {
try {
logger.info("监听到消息 : " + new String(ext.getBody(), "UTF-8"));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
Класс прослушивателя сообщений должен извлечь анонимный внутренний код класса, объявленный при регистрации прослушивателя сообщений в предыдущем примере Java, и определить его как отдельный класс.
Файл конфигурации Spring
Поскольку для интеграции используется только платформа Spring, нет необходимости добавлять дополнительные зависимости в дополнение к пакету jar ядра Spring Framework. В этом примере производитель и потребитель сообщений разделены на два файла конфигурации, что позволяет лучше продемонстрировать эффект отправки и получения сообщений.
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.3.xsd">
<bean id="producer" class="org.study.mq.rocketMQ.spring.SpringProducer" init-method="init" destroy-method="destroy">
<constructor-arg name="nameServerAddr" value="localhost:9876"/>
<constructor-arg name="producerGroupName" value="spring_producer_group"/>
</bean>
</beans>
Конфигурация производителя сообщений очень проста.Определяется объект производителя сообщений.При инициализации объекта вызывается метод init.Перед уничтожением объекта выполняется метод destroy для настройки адреса сервера имен и группы производителей.
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.3.xsd">
<bean id="messageListener" class="org.study.mq.rocketMQ.spring.MessageListener" />
<bean id="consumer" class="org.study.mq.rocketMQ.spring.SpringConsumer" init-method="init" destroy-method="destroy">
<constructor-arg name="nameServerAddr" value="localhost:9876"/>
<constructor-arg name="consumerGroupName" value="spring_consumer_group"/>
<constructor-arg name="topicName" value="spring-rocketMQ-topic" />
<constructor-arg name="messageListener" ref="messageListener" />
</bean>
</beans>
Конфигурация потребителя сообщений аналогична конфигурации производителя сообщений, за исключением определения и привязки объекта прослушивателя сообщений.
Запустите пример программы
Запустите сервер имен и брокер в соответствии с предыдущими шагами, а затем запустите программы производителя и потребителя сообщений.Для простоты мы моделируем эти две программы с двумя классами модульных тестов:
package org.study.mq.rocketMQ.spring;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.junit.Before;
import org.junit.Test;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class SpringProducerTest {
private ApplicationContext container;
@Before
public void setup() {
container = new ClassPathXmlApplicationContext("classpath:spring-producer.xml");
}
@Test
public void sendMessage() throws Exception {
SpringProducer producer = container.getBean(SpringProducer.class);
for (int i = 0; i < 20; i++) {
//创建一条消息对象,指定其主题、标签和消息内容
Message msg = new Message(
"spring-rocketMQ-topic",
null,
("Spring RocketMQ demo " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* 消息内容 */
);
//发送消息并返回结果
SendResult sendResult = producer.getProducer().send(msg);
System.out.printf("%s%n", sendResult);
}
}
}
Класс SpringProducerTest имитирует отправку сообщений производителями сообщений.
package org.study.mq.rocketMQ.spring;
import org.junit.Before;
import org.junit.Test;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class SpringConsumerTest {
private ApplicationContext container;
@Before
public void setup() {
container = new ClassPathXmlApplicationContext("classpath:spring-consumer.xml");
}
@Test
public void consume() throws Exception {
SpringConsumer consumer = container.getBean(SpringConsumer.class);
Thread.sleep(200 * 1000);
consumer.destroy();
}
}
Класс SpringConsumerTest имитирует потребителя сообщений для получения сообщений.Прежде чем метод потребления вернется, текущий поток должен заснуть в течение определенного периода времени, чтобы программа-потребитель могла продолжать жить, чтобы отслеживать сообщения, отправленные производителем.
Запустите класс SpringProducerTest и класс SpringConsumerTest соответственно, и вы увидите полученные сообщения в консоли SpringConsumerTest:
Если вы запустите два процесса класса SpringConsumerTest, поскольку они принадлежат к одной и той же группе потребителей, вы можете увидеть, что они распространяются на сообщение в консоли SpringConsumerTest: