Быстрый старт RocketMQ

задняя часть Spring Kafka RocketMQ
Быстрый старт RocketMQ

В предыдущих статьях было показано, почему был выбран RocketMQ, и некоторые сравнения с kafka:Сравнение преимуществ Ali RocketMQ, чтобы вы могли иметь простое общее представление о RocketMQ, а затем представить:Сценарии применения MQ, дайте нам знать, когда можно использовать MQ и какие проблемы можно решить, а затем представим:Конфигурация развертывания кластера RocketMQ; После того, как эта статья продолжит содержание предыдущей статьи, я познакомлю вас с RocketMQ Quick Start.

как пользоваться

1. Внедрить RocketMQ-клиент

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.1.0-incubating</version>
</dependency>

2. Написать продюсеру

 DefaultMQProducer producer = new DefaultMQProducer("producer_demo");
       //指定NameServer地址
        producer.setNamesrvAddr("192.168.116.115:9876;192.168.116.116:9876"); //修改为自己的

        /**
         * Producer对象在使用之前必须要调用start初始化,初始化一次即可
         * 注意:切记不可以在每次发送消息时,都调用start方法
         */
        producer.start();

        for (int i = 0; i < 997892; i++) {
            try {
                //构建消息
                Message msg = new Message("TopicTest" /* Topic */,
                    "TagA" /* Tag */,
                    ("测试RocketMQ" + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
                );

                //发送同步消息
                SendResult sendResult = producer.send(msg);

                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }
        }


producer.shutdown();

3. Напишите потребителю

/**
 * Consumer Group,非常重要的概念,后续会慢慢补充
 */
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_demo");
//指定NameServer地址,多个地址以 ; 隔开
consumer.setNamesrvAddr("192.168.116.115:9876;192.168.116.116:9876"); //修改为自己的

/**
 * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费
 * 如果非第一次启动,那么按照上次消费的位置继续消费
 */
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

consumer.subscribe("TopicTest", "*");

consumer.registerMessageListener(new MessageListenerConcurrently() {

    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
	ConsumeConcurrentlyContext context) {
	try {
	    for(MessageExt msg:msgs){
		String msgbody = new String(msg.getBody(), "utf-8");
		System.out.println("  MessageBody: "+ msgbody);//输出消息内容
	    }
	} catch (Exception e) {
	    e.printStackTrace();
	    return ConsumeConcurrentlyStatus.RECONSUME_LATER; //稍后再试
	}
	return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //消费成功
    }
});


consumer.start();

System.out.printf("Consumer Started.%n");

4. Описание

Вы можете изменить значение NamesrvAddr в соответствии с вашей средой.Что касается моего кластера, см.:Конфигурация развертывания кластера RocketMQ. Позже, через консоль RocketMQ, вы можете увидеть режим multi-Master multi-slave, построенный ранее, и режим кластера асинхронной репликации.

5. Через консоль RocketMQ

Способ получить RocketMQ-Console-NG:rocketmq-console-ng, а затем скомпилировать и получить jar через mavne, команда выглядит следующим образом:

mvn clean package -Dmaven.test.skip=true
java -jar target/rocketmq-console-ng-1.0.0.jar

После получения Rocketmq-console-ng-1.0.0.jar найдите файл Rocketmq-console-ng-1.0.0.jar\BOOT-INF\classes\application.properties и измените Rocketmq.config.namesrvAddr в соответствии с вашим собственным NamesrvAddr. ценность.

Начните напрямую:

java -jar rocketmq-console-ng-1.0.0.jar
Консоль основана на springboot.Действительно, springboot очень удобен и очень популярен, поэтому необходимо изучить springboot (по сути, это все же серия spring, так что spring тоже нужно глубоко изучить), а потом наблюдать и запустить через консоль.

6. Наблюдение за работой

Хорошая привычка — запускать сначала Consumer, потом Producer, а потом наблюдать через консоль Rocketmq-console-ng.

运行中截图

После завершения операции верно, что данные брокера-а плюс объем данных брокера-б равны количеству данных, которые мы отправляем, а количество подчиненных также равно количеству мастеров. Эффект следующий:

运行完成

Глядя на отправку этих данных, состояние дисков двух машин выглядит следующим образом:RocketMQ1 занимает место на диске

RocketMQ2 занимает место на диске

На этом краткое руководство по RocketMQ закончено, продолжение следует...

Если вы найдете это полезным после прочтения, ставьте лайк и подписывайтесь.


Личный публичный аккаунт, добро пожаловать, чтобы обратить внимание и проверить более замечательную историю! ! !