В предыдущих статьях было показано, почему был выбран RocketMQ, и некоторые сравнения с kafka:Сравнение преимуществ Ali RocketMQ, чтобы вы могли иметь простое общее представление о RocketMQ, а затем представить:Сценарии применения MQ, дайте нам знать, когда можно использовать MQ и какие проблемы можно решить, а затем представим:Конфигурация развертывания кластера RocketMQ; После того, как эта статья продолжит содержание предыдущей статьи, я познакомлю вас с RocketMQ Quick Start.
как пользоваться
1. Внедрить RocketMQ-клиент
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.1.0-incubating</version>
</dependency>
2. Написать продюсеру
DefaultMQProducer producer = new DefaultMQProducer("producer_demo");
//指定NameServer地址
producer.setNamesrvAddr("192.168.116.115:9876;192.168.116.116:9876"); //修改为自己的
/**
* Producer对象在使用之前必须要调用start初始化,初始化一次即可
* 注意:切记不可以在每次发送消息时,都调用start方法
*/
producer.start();
for (int i = 0; i < 997892; i++) {
try {
//构建消息
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("测试RocketMQ" + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
//发送同步消息
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
e.printStackTrace();
Thread.sleep(1000);
}
}
producer.shutdown();
3. Напишите потребителю
/**
* Consumer Group,非常重要的概念,后续会慢慢补充
*/
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_demo");
//指定NameServer地址,多个地址以 ; 隔开
consumer.setNamesrvAddr("192.168.116.115:9876;192.168.116.116:9876"); //修改为自己的
/**
* 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费
* 如果非第一次启动,那么按照上次消费的位置继续消费
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
try {
for(MessageExt msg:msgs){
String msgbody = new String(msg.getBody(), "utf-8");
System.out.println(" MessageBody: "+ msgbody);//输出消息内容
}
} catch (Exception e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER; //稍后再试
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //消费成功
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
4. Описание
Вы можете изменить значение NamesrvAddr в соответствии с вашей средой.Что касается моего кластера, см.:Конфигурация развертывания кластера RocketMQ. Позже, через консоль RocketMQ, вы можете увидеть режим multi-Master multi-slave, построенный ранее, и режим кластера асинхронной репликации.
5. Через консоль RocketMQ
Способ получить RocketMQ-Console-NG:rocketmq-console-ng, а затем скомпилировать и получить jar через mavne, команда выглядит следующим образом:
mvn clean package -Dmaven.test.skip=true
java -jar target/rocketmq-console-ng-1.0.0.jar
После получения Rocketmq-console-ng-1.0.0.jar найдите файл Rocketmq-console-ng-1.0.0.jar\BOOT-INF\classes\application.properties и измените Rocketmq.config.namesrvAddr в соответствии с вашим собственным NamesrvAddr. ценность.
Начните напрямую:
java -jar rocketmq-console-ng-1.0.0.jar
Консоль основана на springboot.Действительно, springboot очень удобен и очень популярен, поэтому необходимо изучить springboot (по сути, это все же серия spring, так что spring тоже нужно глубоко изучить), а потом наблюдать и запустить через консоль.6. Наблюдение за работой
Хорошая привычка — запускать сначала Consumer, потом Producer, а потом наблюдать через консоль Rocketmq-console-ng.
После завершения операции верно, что данные брокера-а плюс объем данных брокера-б равны количеству данных, которые мы отправляем, а количество подчиненных также равно количеству мастеров. Эффект следующий:
Глядя на отправку этих данных, состояние дисков двух машин выглядит следующим образом:RocketMQ1 занимает место на диске
RocketMQ2 занимает место на дискеНа этом краткое руководство по RocketMQ закончено, продолжение следует...
Если вы найдете это полезным после прочтения, ставьте лайк и подписывайтесь.
Личный публичный аккаунт, добро пожаловать, чтобы обратить внимание и проверить более замечательную историю! ! !