RocketMQ — промежуточное программное обеспечение для распределенных сообщений. Первоначально оно было разработано командой промежуточного программного обеспечения сообщений Alibaba и широко применялось в производственной системе для удовлетворения потребностей массового накопления онлайн-сообщений. Оно было передано в дар Apache Open Source Foundation в конце В 2016 году он стал инкубационным проектом, а менее чем через год официально стал проектом верхнего уровня Apache.
Сначала Али разрабатывал систему обмена сообщениями на основе ActiveMQ. По мере увеличения масштабов бизнес-сообщений постепенно возникали узкие места. Позже рассматривалась Kafka, но из-за отсутствия выбора с точки зрения низкой задержки и высокой надежности она Наконец разработал RocketMQ самостоятельно.Производительность RocketMQ лучше, чем у существующих очередей сообщений.RocketMQ и Kafka очень похожи по концепции и принципу, поэтому их часто сравнивают; RocketMQ по умолчанию использует режим длительного опроса, а единый машина поддерживает десятки миллионов сообщений, что может быть очень хорошо применено в системах массовых сообщений.
Эта статья разделена на три части, как показано на следующем рисунке:
1 Установите RocketMQ — версия для Windows
(1) Загрузите установочный пакет Windows
Адрес загрузки версии для Windows:ракета в настоящее время.apache.org/release_not…
Загрузите и разархивируйте установочный пакет RocketMQ.
(2) Настройка переменных системной среды
Настройте системную переменную RocketMQ_HOME = "D: \ Soft \ RocketMQ-All-4.5.1-bin-release", как показано на следующем рисунке:
Примечание. У всех разные каталоги хранения robotmq, мой находится в папке D:\soft, пользователи настраивают соответствующие системные переменные в соответствии со своей средой.
Поскольку переменная среды %ROCKETMQ_HOME% используется при следующем запуске mqnamesrv.cmd, эту системную переменную необходимо настроить здесь.
(3) Запустить сервер имен
Войдите в каталог bin RocketMQ и выполнитеstart mqnamesrv.cmd
, выполнение проходит успешно, как показано на следующем рисунке:
Примечание. После запуска это окно нельзя закрыть.
(4) Запустите брокера
Или выполните его в каталоге binstart mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true
, выполнение проходит успешно, как показано на следующем рисунке:
Также не закрывайте указанное выше окно запуска.
Выполните следующие шаги, указывая, что ваш RocketMQ был успешно выполнен.
2 Установите плагин визуализации
(1) Загрузите плагин
открытое соединениеGitHub.com/Apache/рок…Загрузите плагин визуализации rockemq-externals, как показано ниже:
Нажмите Загрузить ZIP, чтобы загрузить.
Я подготовил ссылку для скачивания Baidu Cloud в Китае, чтобы все могли ее использовать. Ссылка на байду:disk.baidu.com/is/1poem MO6W-56… Код извлечения: нечеткий
(2) Настройте плагин
После завершения загрузки перейдите в rocketmq-externals\rocketmq-console\src\main\resources\application.properties для настройки, как показано на следующем рисунке:
Основные поля описываются следующим образом:
- server.port=8066: порт, на котором работает этот плагин визуализации.
- Rocketmq.config.namesrvAddr=127.0.0.1:9876: Информация о ссылке для RocketMQ.
(3) Скомпилируйте плагин
Перейдите в папку rocketmq-externals\rocketmq-console и выполнитеmvn clean package -Dmaven.test.skip=true
Скомпилируйте проект.
Успешная компиляция показана ниже:
На этапе компиляции могут возникнуть следующие две проблемы: команда mvn не найдена или компиляция выполняется очень медленно.Предусмотрены следующие решения.
Вопрос 1: mvn не является командой, которую можно запустить
Решение: это связано с тем, что Maven не установлен или переменные среды Maven не настроены. Загрузите установочный пакет Maven и увеличьте переменные средыMAVEN_HOME=maven安装目录
, добавить в путь%MAVEN_HOME%\bin
, перезапустите инструмент командной строки (CMD), чтобы повторно выполнить команду.
Проблема 2: Проблема сверхмедленной компиляции
Решение: Это вызвано проблемой использования источника данных Maven в качестве внешнего источника.Вам нужно только настроить источник Maven Али.
Откройте conf/setting.xml в каталоге Maven и добавьте следующее в узел зеркал:
<mirror>
<id>alimaven</id>
<name>aliyun maven</name>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
<mirrorOf>central</mirrorOf>
</mirror>
(4) Запустите плагин
После успешной компиляции перейдите в целевую папку и выполнитеjava -jar rocketmq-console-ng-1.0.1.jar
стартовая программа.
После успешного запуска введите адрес в браузереhttp://127.0.0.1:8066Доступ, эффект следующий:
3 Основное использование
(1) Добавьте эталонный пакет jar
pom.xml добавьте следующий код:
<!-- https://mvnrepository.com/artifact/com.alibaba.rocketmq/rocketmq-client -->
<dependency>
<groupId>com.alibaba.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>3.6.2.Final</version>
</dependency>
(2) Добавьте код производителя и потребителя
public class RocketMQDemo {
static final String MQ_NAMESRVADDR = "localhost:9876";
public static void main(String[] args) {
// 分组名
String groupName = "myGroup-1";
// 主题名
String topicName = "myTopic-1";
// 标签名
String tagName = "myTag-1";
new Thread(() -> {
try {
producer(groupName, topicName, tagName);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (RemotingException e) {
e.printStackTrace();
} catch (MQClientException e) {
e.printStackTrace();
} catch (MQBrokerException e) {
e.printStackTrace();
}
}).start();
new Thread(() -> {
try {
consumer(groupName, topicName, tagName);
} catch (MQClientException e) {
e.printStackTrace();
}
}).start();
}
/**
* @Description 生产者
* @Author wanglei
* @Param [groupName 分组名, topicName 主题名, tagName 标签名]
**/
public static void producer(String groupName, String topicName, String tagName) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
DefaultMQProducer producer = new DefaultMQProducer(groupName);
producer.setNamesrvAddr(MQ_NAMESRVADDR);
producer.start();
String body = "Hello, 老王";
Message message = new Message(topicName, tagName, body.getBytes());
producer.send(message);
producer.shutdown();
}
/**
* @Description 消费者
* @Author wanglei
* @Param [groupName 分组名, topicName 主题名, tagName 标签名]
**/
public static void consumer(String groupName, String topicName, String tagName) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);
consumer.setNamesrvAddr(MQ_NAMESRVADDR);
consumer.subscribe(topicName, tagName);
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.println(new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
}
Результат выполнения вышеуказанной программы следующий:
Привет, фараон