Подробная настройка и использование RocketMQ

задняя часть RocketMQ

1. Введение в MQ

1.1 Зачем использовать MQ

Очередь сообщений — это структура данных «первым пришел — первым вышел».

在这里插入图片描述

Сценарии его применения в основном включают следующие три аспекта.

1) Разделение приложений

Чем выше связанность системы, тем ниже отказоустойчивость. Возьмем в качестве примера приложение электронной коммерции, после того как пользователь создаст заказ, если система инвентаризации, система логистики и платежная система будут связаны и запущены, и какая-либо подсистема выйдет из строя или временно недоступна из-за обновлений и по другим причинам, процесс заказа работа будет ненормальной, что повлияет на использование пользователем.

在这里插入图片描述

Использование разъединения очереди сообщений улучшит связанность системы. Например, если логистическая система выходит из строя, ремонт занимает несколько минут, за это время данные, которые должны быть обработаны логистической системой, кэшируются в очереди сообщений, и операция заказа пользователя завершается в обычном режиме. Когда логистическая система отвечает, достаточно дополнить и обработать сообщения о заказах, хранящиеся в очереди сообщений, и терминальная система не может определить, что логистическая система вышла из строя в течение нескольких минут.

在这里插入图片描述

2) Ограничение пикового трафика

在这里插入图片描述

Если система приложений сталкивается с внезапным всплеском трафика системных запросов, это может привести к перегрузке системы. С помощью очереди сообщений большое количество запросов может кэшироваться и обрабатываться в течение длительного периода времени, что может значительно повысить стабильность системы и удобство работы пользователей.

在这里插入图片描述

В общем, для обеспечения стабильности системы, если загрузка системы превышает пороговое значение, запрос пользователя будет заблокирован, что повлияет на работу пользователя, а если запрос кэшируется с использованием очереди сообщений, и система будет уведомлять пользователя о том, что заказ выполнен после обработки. Лучше никогда не размещать заказ.

В экономических целях:

Если QPS бизнес-системы в обычные часы составляет 1000, а самый высокий пик трафика — 10000, то явно нерентабельно настраивать высокопроизводительный сервер, чтобы справиться с пиком трафика, в этом случае вы можете использовать сообщение очереди для снижения пикового трафика.

3) Распределение данных

在这里插入图片描述

Данные могут передаваться между несколькими системами через очереди сообщений. Генератору данных не нужно заботиться о том, кто использует данные, ему нужно только отправить данные в очередь сообщений, а потребитель данных может напрямую получить данные из очереди сообщений.

在这里插入图片描述

1.2 Преимущества и недостатки MQ

Преимущества: развязка, отсечение пиков, распределение данных

Недостатки включают следующее:

  • Снижение доступности системы

    Чем больше внешних зависимостей вводит система, тем хуже стабильность системы. Как только MQ выйдет из строя, это повлияет на бизнес.

    Как обеспечить высокую доступность MQ?

  • Повышенная сложность системы

    Добавление MQ значительно увеличивает сложность системы.В прошлом синхронные удаленные вызовы выполнялись между системами, но теперь асинхронные вызовы выполняются через MQ.

    Как сделать так, чтобы сообщения не использовались повторно? Как бороться с потерянными сообщениями? Так упорядоченность доставки сообщений гарантирована?

  • Проблемы согласованности

    После того, как система A обработает бизнес, она отправляет данные сообщения в системы B, C и D через MQ. Если система B и система C обрабатывают успешно, система D дает сбой.

    Как обеспечить согласованность обработки данных сообщений?

1.3 Сравнение различных продуктов MQ

Общие продукты MQ включают Kafka, ActiveMQ, RabbitMQ, RocketMQ.

在这里插入图片描述

Во-вторых, быстрый старт RocketMQ

RocketMQ – это промежуточное ПО MQ от Alibaba, выпущенное в 2016 году. Оно разработано на языке Java. Внутри Alibaba RocketMQ берет на себя поток сообщений в сценариях с высокой степенью параллелизма, таких как "Double 11", и может обрабатывать триллионы сообщений.

2.1 Подготовка

2.1.1 Скачать RocketMQ

Выбранная здесь версия RocketMQ: 4.6.0

ссылка для скачивания:ссылка для скачивания

Официальная документация:ракета в настоящее время.apache.org/docs/quick-…

2.2.2 Требования к окружающей среде

  • Linux64-битная система

  • JDK1.8 (64-разрядная версия)

2.2 Установите RocketMQ

2.2.1 Этапы установки

Я установил его здесь как бинарный пакет:

  1. Разархивируйте установочный пакет
  2. Введите каталог установки

2.2.2 Введение в каталог

  • bin: сценарии запуска, включая сценарии оболочки и сценарии CMD.
  • conf: файлы конфигурации экземпляра, включая файлы конфигурации брокера, файлы конфигурации журнала и т. д.
  • lib: зависит от пакетов jar, включая Netty, commons-lang, FastJSON и т. д.

2.3 Запустите RocketMQ

  1. RocketMQПамять виртуальной машины по умолчанию велика, запуститеBrokerилиNameServerЭто может привести к сбою из-за нехватки памяти, поэтому вам нужно отредактировать следующие два файла конфигурации, чтобы изменить размер памяти JVM.

    # 编辑 runbroker.sh 和 runserver.sh 修改默认 JVM 大小
    $ vi bin/runbroker.sh
    	# 参考设置
    	JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m"
    
    $ vi bin/runserver.sh
    	# 参考设置
    	JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
    
  2. запускатьNameServer

    # 1.启动NameServer
    nohup sh bin/mqnamesrv &
    # 2.查看启动日志
    tail -f ~/logs/rocketmqlogs/namesrv.log
    
  3. запускатьBroker

    # 1.启动Broker
    nohup sh bin/mqbroker -n localhost:9876 &
    # 2.查看启动日志
    tail -f ~/logs/rocketmqlogs/broker.log 
    

bin/mqbrokerНекоторые необязательные параметры:

  • -c: укажите путь к файлу конфигурации
  • -n: адрес NameServer

2.4 Тест RocketMQ

2.4.1 Отправка сообщения

# 1.设置环境变量
export NAMESRV_ADDR=localhost:9876
# 2.使用安装包的Demo发送消息
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer

2.4.2 Получение сообщений

# 1.设置环境变量
export NAMESRV_ADDR=localhost:9876
# 2.接收消息
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

2.5 Закрыть RocketMQ

# 1.关闭NameServer
sh bin/mqshutdown namesrv
# 2.关闭Broker
sh bin/mqshutdown broker

2.6 Введение каждой роли

  • Producer: отправитель сообщения; пример: отправитель
  • Consumer: получатель сообщения; пример: получатель
  • Consumer Group: группа потребителей; каждый экземпляр потребителя принадлежит группе потребителей, и каждое сообщение будет потребляться только одним экземпляром потребителя в той же группе потребителей. (Разные группы потребителей могут получать одно и то же сообщение одновременно)
  • Broker: временное хранение и передача сообщений, пример: курьерская компания
  • NameServer: Management Broker; пример: управляющее агентство курьерской компании.
  • Topic: различать типы сообщений; отправитель может отправлять сообщения в одну или несколько тем; получатели сообщения могут подписаться на одно или несколько сообщений в теме.
  • Message Queue: Эквивалент раздела темы; используется для параллельной отправки и получения сообщений.

在这里插入图片描述

2.7 Подробное объяснение файла конфигурации брокера

Расположение файла конфигурации брокера по умолчанию:conf/broker.conf

#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-a
#0 表示 Master,>0 表示 Slave
brokerId=0
#nameServer地址,分号分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=10911
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq/store
#commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/store/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq/store/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/store/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=SYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128

2.8 Строительство платформы визуального мониторинга

2.8.1 Обзор

RocketMQСуществует проект с открытым исходным кодом, который расширяет его.incubator-rocketmq-externals, этот проект имеет подмодуль с именемrocketmq-console, это проект консоли управления, первыйincubator-rocketmq-externalsТянуть к местным, потому что нам нужноrocketmq-consoleСкомпилируйте, упакуйте и запустите.

在这里插入图片描述

2.8.2 Загрузите и скомпилируйте пакет

  1. клонировать проект
git clone https://github.com/apache/rocketmq-externals
  1. существуетrocketmq-consoleСредняя конфигурацияnamesrvАдрес кластера:
$ cd rocketmq-console
$ vim src/main/resources/application.properties
	rocketmq.config.namesrvAddr=10.211.55.4:9876
  1. Конфигурация завершена, скомпилирована и упакована

    mvn clean package -Dmaven.test.skip=true
    
  2. Запускаем RocketMQ-консоль:

    nohup java -jar rocketmq-console-ng-2.0.0.jar > tmp.log &
    

После успешного запуска мы можем получить к нему доступ через браузерhttp://IP地址:8080Войдите в интерфейс консоли, как показано ниже:

在这里插入图片描述

3. Пример отправки и потребления сообщений (Maven)

  • Импорт зависимостей клиента MQ

    ==Примечание==:rocketmq-clientверсия для использования сRocketMQизта же версия

    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-client</artifactId>
        <version>4.6.0</version>
    </dependency>
    
  • Анализ шагов отправителя сообщения:

    1. Создать производителя сообщенийproducerи укажите имя группы производителей
    2. уточнитьNameserverадрес
    3. запускатьproducer
    4. Создать объект сообщения, указав темуTopic,Tagи тело сообщения
    5. отправлять сообщения
    6. Закрыть производителяproducer
  • Анализ шагов потребителя сообщения:

    1. создать потребителяConsumer, сформулируйте название группы потребителей
    2. уточнитьNameserverадрес
    3. Подписывайтесь на темыTopicиTag
    4. Установите функцию обратного вызова для обработки сообщения
    5. начать потребительconsumer

3.1 Базовый пример

3.1.1 Отправка сообщения

1) Отправить сообщение синхронизации

Этот надежный метод синхронной отправки широко используется, например: уведомление о важном сообщении, уведомление по SMS.

public class SyncProducer {
	public static void main(String[] args) throws Exception {
    	// 实例化消息生产者Producer
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
    	// 设置NameServer的地址
    	producer.setNamesrvAddr("localhost:9876");
    	// 设置消息同步发送失败时的重试次数,默认为 2
        producer.setRetryTimesWhenSendFailed(2);
        // 设置消息发送超时时间,默认3000ms
        producer.setSendMsgTimeout(3000);
    	// 启动Producer实例
        producer.start();
    	for (int i = 0; i < 100; i++) {
    	    // 创建消息,并指定Topic,Tag和消息体
    	    Message msg = new Message("TopicTest" /* Topic */,
        	"TagA" /* Tag */,
        	("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
        	);
        	// 发送消息到一个Broker
            SendResult sendResult = producer.send(msg);
            // 通过sendResult返回消息是否成功送达
            System.out.printf("%s%n", sendResult);
    	}
    	// 如果不再发送消息,关闭Producer实例。
    	producer.shutdown();
    }
}

2) Отправлять асинхронные сообщения

Асинхронные сообщения обычно используются в бизнес-сценариях, чувствительных ко времени ответа, то есть отправитель не может допустить длительного ожидания ответа брокера.

public class AsyncProducer {
	public static void main(String[] args) throws Exception {
    	// 实例化消息生产者Producer
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
    	// 设置NameServer的地址
        producer.setNamesrvAddr("localhost:9876");
        // 设置消息异步发送失败时的重试次数,默认为 2
        producer.setRetryTimesWhenSendAsyncFailed(2);
        // 设置消息发送超时时间,默认3000ms
        producer.setSendMsgTimeout(3000);
    	// 启动Producer实例
        producer.start();
        producer.setRetryTimesWhenSendAsyncFailed(0);
    	for (int i = 0; i < 100; i++) {
                final int index = i;
            	// 创建消息,并指定Topic,Tag和消息体
                Message msg = new Message("TopicTest",
                    "TagA",
                    "OrderID188",
                    "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                // SendCallback接收异步返回结果的回调
                producer.send(msg, new SendCallback() {
                    @Override
                    public void onSuccess(SendResult sendResult) {
                        System.out.printf("%-10d OK %s %n", index,
                            sendResult.getMsgId());
                    }
                    @Override
                    public void onException(Throwable e) {
      	              System.out.printf("%-10d Exception %s %n", index, e);
      	              e.printStackTrace();
                    }
            	});
    	}
    	// 如果不再发送消息,关闭Producer实例。
    	producer.shutdown();
    }
}

3) Отправка сообщений в одном направлении

Этот метод в основном используется в сценариях, когда вы не особенно заботитесь об отправке результатов, таких как отправка журнала.

public class OnewayProducer {
	public static void main(String[] args) throws Exception{
    	// 实例化消息生产者Producer
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
    	// 设置NameServer的地址
        producer.setNamesrvAddr("localhost:9876");
    	// 启动Producer实例
        producer.start();
    	for (int i = 0; i < 100; i++) {
        	// 创建消息,并指定Topic,Tag和消息体
        	Message msg = new Message("TopicTest" /* Topic */,
                "TagA" /* Tag */,
                ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
        	);
        	// 发送单向消息,没有任何返回结果
        	producer.sendOneway(msg);

    	}
    	// 如果不再发送消息,关闭Producer实例。
    	producer.shutdown();
    }
}

3.1.2 Использование сообщений

1) Кластерный режим (балансировка нагрузки)

Потребители потребляют сообщения в кластерном режиме, == сообщение будет потреблять только один потребитель в одной и той же группе потребителей ==

public static void main(String[] args) throws Exception {
    // 实例化消息生产者,指定组名
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
    // 指定Namesrv地址信息.
    consumer.setNamesrvAddr("localhost:9876");
    // 订阅Topic
    consumer.subscribe("Test", "*");
    //负载均衡模式消费
    consumer.setMessageModel(MessageModel.CLUSTERING);
    // 注册回调函数,处理消息
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                        ConsumeConcurrentlyContext context) {
            System.out.printf("%s Receive New Messages: %s %n", 
                              Thread.currentThread().getName(), msgs);
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    //启动消息者
    consumer.start();
    System.out.printf("Consumer Started.%n");
}

2) Режим трансляции

Потребители потребляют сообщения путем широковещательной рассылки == сообщение должно потребляться каждым потребителем в одной и той же группе потребителей ==

public static void main(String[] args) throws Exception {
    // 实例化消息生产者,指定组名
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
    // 指定Namesrv地址信息.
    consumer.setNamesrvAddr("localhost:9876");
    // 订阅Topic
    consumer.subscribe("Test", "*");
    //广播模式消费
    consumer.setMessageModel(MessageModel.BROADCASTING);
    // 注册回调函数,处理消息
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                        ConsumeConcurrentlyContext context) {
            System.out.printf("%s Receive New Messages: %s %n", 
                              Thread.currentThread().getName(), msgs);
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    //启动消息者
    consumer.start();
    System.out.printf("Consumer Started.%n");
}

3.2 Сообщения о последовательности

Порядок сообщений означает, что сообщения могут потребляться в том порядке, в котором они были отправлены (FIFO). RocketMQ может строго гарантировать порядок сообщений, который можно разделить на порядок разделов или глобальный порядок.

Анализ принципа последовательного потребления.По умолчанию при отправке сообщения будет применяться метод опроса Round Robin для отправки сообщения в разные очереди (очереди разделов), а при потреблении сообщений сообщения будут извлекаться из нескольких очередей.В этом случае , сообщения отправляются И потребление не гарантирует порядка. Однако, если сообщения заказа, отправляемые элементом управления, отправляются только в одну и ту же очередь по очереди и извлекаются из этой очереди только по очереди при потреблении, порядок гарантируется. Когда в отправке и потреблении задействована только одна очередь, она упорядочивается глобально; если участвует несколько очередей, она секционируется, то есть сообщения упорядочиваются относительно каждой очереди.

Ниже приведен пример секционирования, упорядоченного по порядку. Последовательный поток заказа: создать, оплатить, отправить, завершить. Сообщения с одинаковым номером заказа будут отправляться в одну и ту же очередь одно за другим, при потреблении один и тот же OrderId должен быть получен из одной и той же очереди.

3.2.1 Последовательное производство сообщений

/**
* Producer,发送顺序消息
*/
public class Producer {

   public static void main(String[] args) throws Exception {
       DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");

       producer.setNamesrvAddr("127.0.0.1:9876");

       producer.start();

       String[] tags = new String[]{"TagA", "TagC", "TagD"};

       // 订单列表
       List<OrderStep> orderList = new Producer().buildOrders();

       Date date = new Date();
       SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
       String dateStr = sdf.format(date);
       for (int i = 0; i < 10; i++) {
           // 加个时间前缀
           String body = dateStr + " Hello RocketMQ " + orderList.get(i);
           Message msg = new Message("TopicTest", tags[i % tags.length], "KEY" + i, body.getBytes());

           SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
               @Override
               public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                   Long id = (Long) arg;  //根据订单id选择发送queue
                   long index = id % mqs.size();
                   return mqs.get((int) index);
               }
           }, orderList.get(i).getOrderId());//订单id

           System.out.println(String.format("SendResult status:%s, queueId:%d, body:%s",
               sendResult.getSendStatus(),
               sendResult.getMessageQueue().getQueueId(),
               body));
       }

       producer.shutdown();
   }

   /**
    * 订单的步骤
    */
   private static class OrderStep {
       private long orderId;
       private String desc;

       public long getOrderId() {
           return orderId;
       }

       public void setOrderId(long orderId) {
           this.orderId = orderId;
       }

       public String getDesc() {
           return desc;
       }

       public void setDesc(String desc) {
           this.desc = desc;
       }

       @Override
       public String toString() {
           return "OrderStep{" +
               "orderId=" + orderId +
               ", desc='" + desc + '\'' +
               '}';
       }
   }

   /**
    * 生成模拟订单数据
    */
   private List<OrderStep> buildOrders() {
       List<OrderStep> orderList = new ArrayList<OrderStep>();

       OrderStep orderDemo = new OrderStep();
       orderDemo.setOrderId(15103111039L);
       orderDemo.setDesc("创建");
       orderList.add(orderDemo);

       orderDemo = new OrderStep();
       orderDemo.setOrderId(15103111065L);
       orderDemo.setDesc("创建");
       orderList.add(orderDemo);

       orderDemo = new OrderStep();
       orderDemo.setOrderId(15103111039L);
       orderDemo.setDesc("付款");
       orderList.add(orderDemo);

       orderDemo = new OrderStep();
       orderDemo.setOrderId(15103117235L);
       orderDemo.setDesc("创建");
       orderList.add(orderDemo);

       orderDemo = new OrderStep();
       orderDemo.setOrderId(15103111065L);
       orderDemo.setDesc("付款");
       orderList.add(orderDemo);

       orderDemo = new OrderStep();
       orderDemo.setOrderId(15103117235L);
       orderDemo.setDesc("付款");
       orderList.add(orderDemo);

       orderDemo = new OrderStep();
       orderDemo.setOrderId(15103111065L);
       orderDemo.setDesc("完成");
       orderList.add(orderDemo);

       orderDemo = new OrderStep();
       orderDemo.setOrderId(15103111039L);
       orderDemo.setDesc("推送");
       orderList.add(orderDemo);

       orderDemo = new OrderStep();
       orderDemo.setOrderId(15103117235L);
       orderDemo.setDesc("完成");
       orderList.add(orderDemo);

       orderDemo = new OrderStep();
       orderDemo.setOrderId(15103111039L);
       orderDemo.setDesc("完成");
       orderList.add(orderDemo);

       return orderList;
   }
}

3.2.2 Последовательное использование сообщений

/**
* 顺序消息消费,带事务方式(应用可控制Offset什么时候提交)
*/
public class ConsumerInOrder {

   public static void main(String[] args) throws Exception {
       DefaultMQPushConsumer consumer = new 
           DefaultMQPushConsumer("please_rename_unique_group_name_3");
       consumer.setNamesrvAddr("127.0.0.1:9876");
       /**
        * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
        * 如果非第一次启动,那么按照上次消费的位置继续消费
        */
       consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

       consumer.subscribe("TopicTest", "TagA || TagC || TagD");

       consumer.registerMessageListener(new MessageListenerOrderly() {

           Random random = new Random();

           @Override
           public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
               context.setAutoCommit(true);
               for (MessageExt msg : msgs) {
                   // 可以看到每个queue有唯一的consume线程来消费, 订单对每个queue(分区)有序
                   System.out.println("consumeThread=" + Thread.currentThread().getName() + "queueId=" + msg.getQueueId() + ", content:" + new String(msg.getBody()));
               }

               try {
                   //模拟业务逻辑处理中...
                   TimeUnit.SECONDS.sleep(random.nextInt(10));
               } catch (Exception e) {
                   e.printStackTrace();
               }
               return ConsumeOrderlyStatus.SUCCESS;
           }
       });

       consumer.start();

       System.out.println("Consumer Started.");
   }
}

3.3 Задержанные сообщения

Например, в электронной коммерции после отправки заказа вы можете отправить отложенное сообщение, проверить статус заказа через 1 час и отменить заказ, чтобы освободить запасы, если он еще не оплачен.

3.3.1 Запуск получателя сообщений

public class ScheduledMessageConsumer {
   public static void main(String[] args) throws Exception {
      // 实例化消费者
      DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ExampleConsumer");
      // 订阅Topics
      consumer.subscribe("TestTopic", "*");
      // 注册消息监听者
      consumer.registerMessageListener(new MessageListenerConcurrently() {
          @Override
          public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
              for (MessageExt message : messages) {
                  // Print approximate delay time period
                  System.out.println("Receive message[msgId=" + message.getMsgId() + "] " + (System.currentTimeMillis() - message.getStoreTimestamp()) + "ms later");
              }
              return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
          }
      });
      // 启动消费者
      consumer.start();
  }
}

3.3.2 Отправка задержанных сообщений

public class ScheduledMessageProducer {
   public static void main(String[] args) throws Exception {
      // 实例化一个生产者来产生延时消息
      DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
      // 启动生产者
      producer.start();
      int totalMessagesToSend = 100;
      for (int i = 0; i < totalMessagesToSend; i++) {
          Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
          // 设置延时等级3,这个消息将在10s之后发送(现在只支持固定的几个时间,详看delayTimeLevel)
          message.setDelayTimeLevel(3);
          // 发送消息
          producer.send(message);
      }
       // 关闭生产者
      producer.shutdown();
  }
}

###4.3.3 Аутентификация

Вы увидите, что сообщение потребляется на 10 секунд позже, чем оно было сохранено.

3.3.4 Ограничения на использование

// org/apache/rocketmq/store/config/MessageStoreConfig.java
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

Теперь RocketMq не поддерживает временную задержку, вам необходимо установить несколько фиксированных уровней задержки, от 1 с до 2 часов, соответствующих уровням с 1 по 18.

3.4 Массовые сообщения

Пакетная отправка сообщений может значительно повысить производительность доставки небольших сообщений. Ограничение состоит в том, что эти пакетные сообщения должны иметь одну и ту же тему, один и тот же waitStoreMsgOK и не могут быть отложенными сообщениями. Кроме того, общий размер этого пакета сообщений не должен превышать 4 МБ.

3.4.1 Отправка массовых сообщений

Если вы отправляете не более4MBсообщения, легко использовать пакетную обработку, например:

String topic = "BatchTest";
List<Message> messages = new ArrayList<>();
messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes()));
try {
   producer.send(messages);
} catch (Exception e) {
   e.printStackTrace();
   //处理error
}

Если общая длина сообщения может превышать4MBВ это время лучше разделить сообщение

public class ListSplitter implements Iterator<List<Message>> {
   private final int SIZE_LIMIT = 1024 * 1024 * 4;
   private final List<Message> messages;
   private int currIndex;
   public ListSplitter(List<Message> messages) {
           this.messages = messages;
   }
    @Override 
    public boolean hasNext() {
       return currIndex < messages.size();
   }
   	@Override 
    public List<Message> next() {
       int nextIndex = currIndex;
       int totalSize = 0;
       for (; nextIndex < messages.size(); nextIndex++) {
           Message message = messages.get(nextIndex);
           int tmpSize = message.getTopic().length() + message.getBody().length;
           Map<String, String> properties = message.getProperties();
           for (Map.Entry<String, String> entry : properties.entrySet()) {
               tmpSize += entry.getKey().length() + entry.getValue().length();
           }
           tmpSize = tmpSize + 20; // 增加日志的开销20字节
           if (tmpSize > SIZE_LIMIT) {
               //单个消息超过了最大的限制
               //忽略,否则会阻塞分裂的进程
               if (nextIndex - currIndex == 0) {
                  //假如下一个子列表没有元素,则添加这个子列表然后退出循环,否则只是退出循环
                  nextIndex++;
               }
               break;
           }
           if (tmpSize + totalSize > SIZE_LIMIT) {
               break;
           } else {
               totalSize += tmpSize;
           }

       }
       List<Message> subList = messages.subList(currIndex, nextIndex);
       currIndex = nextIndex;
       return subList;
   }
}
//把大的消息分裂成若干个小的消息
ListSplitter splitter = new ListSplitter(messages);
while (splitter.hasNext()) {
  try {
      List<Message>  listItem = splitter.next();
      producer.send(listItem);
  } catch (Exception e) {
      e.printStackTrace();
      //处理error
  }
}

3.5 Фильтрация сообщений

В большинстве случаев TAG — это простой и полезный дизайн для выбора нужного сообщения. Например:

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_EXAMPLE");
consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");

Потребители будут получать сообщения, содержащие TAGA, TAGB или TAGC. Но ограничение состоит в том, что сообщение может иметь только один тег, что может не сработать для сложных сценариев. В этом случае сообщения можно фильтровать с помощью выражений SQL. Возможности SQL можно вычислить по атрибутам при отправке сообщения. В соответствии с синтаксисом, определенным RocketMQ, может быть реализована некоторая простая логика. Ниже приведен пример:

------------
| message  |
|----------|  a > 5 AND b = 'abc'
| a = 10   |  --------------------> Gotten
| b = 'abc'|
| c = true |
------------
------------
| message  |
|----------|   a > 5 AND b = 'abc'
| a = 1    |  --------------------> Missed
| b = 'abc'|
| c = true |
------------

3.5.1 Базовый синтаксис SQL

RocketMQ определяет только базовый синтаксис для поддержки этой функции. Вы также можете легко расширить его.

  • Числовые сравнения, такие как:>,>=,<,<=,BETWEEN,=
  • Сравнение символов, например:=,<>,IN
  • IS NULLилиIS NOT NULL
  • логический символAND,OR,NOT

Типы постоянной поддержки:

  • Числовые значения, такие как:123,3.1415
  • символы, такие как:'abc', должен быть заключен в одинарные кавычки
  • NULL, специальная константа
  • логическое значение,TRUEилиFALSE

использовать толькоpushТолько потребители схемы могут использовать оператор sql с использованием стандарта SQL 92. Интерфейс выглядит следующим образом:

public void subscribe(finalString topic, final MessageSelector messageSelector)

3.5.2 Производитель сообщений

При отправке сообщения вы можетеputUserPropertyустановить свойства сообщения

DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.start();
Message msg = new Message("TopicTest",
   tag,
   ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
// 设置一些属性
msg.putUserProperty("a", String.valueOf(i));
SendResult sendResult = producer.send(msg);

producer.shutdown();

3.5.3 Потребитель сообщений

Используйте MessageSelector.bySql для фильтрации сообщений с помощью sql

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
// 只有订阅的消息有这个属性a, a >=0 and a <= 3
consumer.subscribe("TopicTest", MessageSelector.bySql("a between 0 and 3");
consumer.registerMessageListener(new MessageListenerConcurrently() {
   @Override
   public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
       return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
   }
});
consumer.start();

3.6 Транзакционные сообщения

3.6.1 Анализ процесса

在这里插入图片描述

На приведенном выше рисунке показана общая схема сообщений о транзакциях, которые разделены на два процесса: отправка и отправка обычных сообщений о транзакциях и процесс компенсации сообщений о транзакциях.

####1) Отправка и отправка сообщения о транзакции

(1) Отправить сообщение (половина сообщения).

(2) Сервер отвечает на результат написания сообщения.

(3) Выполнить локальную транзакцию в соответствии с результатом отправки (если запись не удалась, полусообщение в это время невидимо для бизнеса, и локальная логика не выполняется).

(4) Выполнить фиксацию или откат в соответствии с состоянием локальной транзакции (операция фиксации создает индекс сообщения, и сообщение видно потребителям).

2) Компенсация сделки

(1) Для сообщений о транзакциях без фиксации/отката (сообщения в состоянии ожидания) инициируйте «обратную проверку» с сервера.

(2) Производитель получает ответное сообщение и проверяет статус локальной транзакции, соответствующей ответному сообщению.

(3) Повторная фиксация или откат в соответствии с локальным статусом транзакции

Среди них фаза компенсации используется для устранения тайм-аута или отказа сообщения Commit или Rollback.

3) Статус сообщения о транзакции

Существует три состояния сообщений транзакций: состояние фиксации, состояние отката и промежуточное состояние:

  • TransactionStatus.CommitTransaction: фиксирует транзакцию, которая позволяет потребителю использовать это сообщение.
  • TransactionStatus.RollbackTransaction: транзакция отката, что означает, что сообщение будет удалено и не будет разрешено для использования.
  • TransactionStatus.Unknown: промежуточный статус, который представляет необходимость проверки очереди сообщений для определения статуса.

3.6.2 Отправка транзакционных сообщений

1) Создать транзакционного производителя

использоватьTransactionMQProducerкласс для создания производителя и указания уникальногоProducerGroup, вы можете настроить собственный пул потоков для обработки этих запросов на проверку. После выполнения локальной транзакции вам необходимо ответить в очередь сообщений в соответствии с результатом выполнения. Пожалуйста, обратитесь к предыдущему разделу для получения статуса возвращенной транзакции.

public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        //创建事务监听器
        TransactionListener transactionListener = new TransactionListenerImpl();
        //创建消息生产者
        TransactionMQProducer producer = new TransactionMQProducer("group6");
        producer.setNamesrvAddr("192.168.25.135:9876;192.168.25.138:9876");
        //生产者这是监听器
        producer.setTransactionListener(transactionListener);
        //启动消息生产者
        producer.start();
        String[] tags = new String[]{"TagA", "TagB", "TagC"};
        for (int i = 0; i < 3; i++) {
            try {
                Message msg = new Message("TransactionTopic", tags[i % tags.length], "KEY" + i,
                        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.sendMessageInTransaction(msg, null);
                System.out.printf("%s%n", sendResult);
                TimeUnit.SECONDS.sleep(1);
            } catch (MQClientException | UnsupportedEncodingException e) {
                e.printStackTrace();
            }
        }
        //producer.shutdown();
    }
}

2) Реализовать интерфейс прослушивания транзакции

Когда отправка половинного сообщения прошла успешно, мы используемexecuteLocalTransactionспособ выполнения локальных транзакций. Он возвращает одно из трех состояний транзакции, упомянутых в предыдущем разделе.checkLocalTranscationМетод используется для проверки состояния локальной транзакции и ответа на запрос проверки очереди сообщений. Он также возвращает одно из трех состояний транзакции, упомянутых в предыдущем разделе.

public class TransactionListenerImpl implements TransactionListener {

    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        System.out.println("执行本地事务");
        if (StringUtils.equals("TagA", msg.getTags())) {
            return LocalTransactionState.COMMIT_MESSAGE;
        } else if (StringUtils.equals("TagB", msg.getTags())) {
            return LocalTransactionState.ROLLBACK_MESSAGE;
        } else {
            return LocalTransactionState.UNKNOW;
        }

    }

    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        System.out.println("MQ检查消息Tag【"+msg.getTags()+"】的本地事务执行结果");
        return LocalTransactionState.COMMIT_MESSAGE;
    }
}

3.6.3 Ограничения на использование

  1. Транзакционные сообщения не поддерживают отложенные сообщения и массовые сообщения.
  2. Чтобы одно сообщение не проверялось слишком много раз и не приводило к накоплению сообщений половинной очереди, мы по умолчанию ограничиваем количество проверок одного сообщения до 15, но пользователи могут передать файл конфигурации брокера.transactionCheckMaxпараметр для изменения этого ограничения. Если сообщение было проверено более N раз ( N =transactionCheckMax), Брокер отклонит это сообщение и по умолчанию также распечатает журнал ошибок. Пользователь может переопределитьAbstractTransactionCheckListenerкласс, чтобы изменить это поведение.
  3. Сообщения о транзакциях будут проверяться через определенный промежуток времени, как параметр transactionMsgTimeout в файле конфигурации брокера. При отправке транзакционных сообщений пользователи также могут устанавливать свойства пользователя,CHECK_IMMUNITY_TIME_IN_SECONDSчтобы изменить это ограничение, этот параметр имеет приоритет надtransactionMsgTimeoutпараметр.
  4. Транзакционные сообщения могут быть проверены или использованы более одного раза.
  5. Сообщение целевой темы, отправленное пользователю, может завершиться ошибкой, в настоящее время в зависимости от ведения журнала. его высокая доступность благодаряRocketMQЭто гарантируется собственным механизмом высокой доступности.Если вы хотите гарантировать, что сообщения транзакций не будут потеряны и гарантируется целостность транзакций, рекомендуется использовать синхронный механизм двойной записи.
  6. Идентификаторы производителей для транзакционных сообщений не могут использоваться совместно с идентификаторами производителей для других типов сообщений. В отличие от других типов сообщений, транзакционные сообщения допускают обратный поиск, а серверы MQ могут запрашивать потребителей по их идентификатору производителя.

3.7 Настройка AK и Secret при подключении к Alibaba Cloud RocketMQ

Если вы звоните в Alibaba CloudRocketMQ, также необходимо указатьAKиSecret. Облачная демонстрация Alibaba:кликните сюда

3.7.1 Производители

Настройки производителяAKиSecertОперация такая же, просто нужно создатьProducerПросто укажите время, вот пример отправки обычного сообщения:

public class SyncAKProducer {
	private static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials("设置自己的ACCESS_KEY", "设置自己的SECRET_KEY"));
    }

	public static void main(String[] args) throws Exception {
		/**
         * 创建Producer,并开启消息轨迹
         * 如果不想开启消息轨迹,可以按照如下方式创建:
         * DefaultMQProducer producer = new DefaultMQProducer(M"设置自己的GroupName(唯一)", getAclRPCHook());
         */
        DefaultMQProducer producer = new DefaultMQProducer("设置自己的GroupName(唯一)", getAclRPCHook(), true, null);

		/**
         * 设置使用接入方式为阿里云,在使用云上消息轨迹的时候,需要设置此项,如果不开启消息轨迹功能,则运行不设置此项.
         */
        producer.setAccessChannel(AccessChannel.CLOUD);
    	// 设置NameServer的地址
    	producer.setNamesrvAddr("localhost:9876");
    	// 设置消息同步发送失败时的重试次数,默认为 2
        producer.setRetryTimesWhenSendFailed(2);
        // 设置消息发送超时时间,默认3000ms
        producer.setSendMsgTimeout(3000);
    	// 启动Producer实例
        producer.start();
    	for (int i = 0; i < 100; i++) {
    	    // 创建消息,并指定Topic,Tag和消息体
    	    Message msg = new Message("TopicTest" /* Topic */,
        	"TagA" /* Tag */,
        	("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
        	);
        	// 发送消息到一个Broker
            SendResult sendResult = producer.send(msg);
            // 通过sendResult返回消息是否成功送达
            System.out.printf("%s%n", sendResult);
    	}
    	// 如果不再发送消息,关闭Producer实例。
    	producer.shutdown();
    }
}

3.7.2 Потребители

потребительские настройкиAKиSecertОперация такая же, просто нужно создатьConsummerДостаточно указать время, вот пример получения обычных сообщений:

private static RPCHook getAclRPCHook() {
    return new AclClientRPCHook(new SessionCredentials("设置自己的ACCESS_KEY", "设置自己的SECRET_KEY"));
}

public static void main(String[] args) throws Exception {
	/**
     * 创建Consumer,并开启消息轨迹
     * 如果不想开启消息轨迹,可以按照如下方式创建:
     * DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(MqConfig.GROUP_ID, getAclRPCHook(), null);
     */
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1", getAclRPCHook(), new AllocateMessageQueueAveragely(), true, null);

    /**
     * 设置使用接入方式为阿里云,在使用云上消息轨迹的时候,需要设置此项,如果不开启消息轨迹功能,则运行不设置此项.
     */
    consumer.setAccessChannel(AccessChannel.CLOUD);

    // 指定Namesrv地址信息.
    consumer.setNamesrvAddr("localhost:9876");
    // 订阅Topic
    consumer.subscribe("Test", "*");
    //负载均衡模式消费
    consumer.setMessageModel(MessageModel.CLUSTERING);
    // 注册回调函数,处理消息
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                        ConsumeConcurrentlyContext context) {
            System.out.printf("%s Receive New Messages: %s %n", 
                              Thread.currentThread().getName(), msgs);
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    //启动消息者
    consumer.start();
    System.out.printf("Consumer Started.%n");
}

4. Пример отправки и потребления сообщений (Spring Boot)

4.1 Импорт зависимостей

   <dependency>
       <groupId>org.apache.rocketmq</groupId>
       <artifactId>rocketmq-spring-boot-starter</artifactId>
       <version>2.1.1</version>
   </dependency>

4.2 Производители

4.2.1 конфигурационный файл application.yaml

# application.yaml
rocketmq:
  name-server: 10.124.128.200:9876
  producer:
    group: test-group
    # 发送同步消息失败时,重试次数,默认是 2
    retry-times-when-send-failed: 2
    # 发送异步消息失败时,重试次数,默认是 2
    retry-times-when-send-async-failed: 2
    # 发送消息超时时间,默认是 3s
    send-message-timeout: 3000

	# 连接阿里云RocketMQ时需要配置AK与SK
    access-key: 
    secret-key: 

4.2.2 Производители

@RestController
@RequestMapping("/test")
public class ProducerTest {

	//自动注入
    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @PostMapping("/sendSyncMessage")
    public void sendSyncMessage(@RequestBody Map<String, Object> msgMap){
    	//构建消息
    	Message message = new Message("TopicName", "Tag", hash, JSON.toJSONBytes(msgData));
    	
    	//发送同步消息 
    	//方法1:使用与第三章相同的方法,调用 getProducer() 方法时会返回DefaultMQProducer对象,然后调用其方法第三章的一样了。
        SendResult sendResult =  rocketMQTemplate.getProducer().send(message);

		//方法2:使用rocketMQTemplate封装的消息发送方法
		// 第一个参数指定Topic与Tag,格式: `topicName:tags`
		// 第二个参数,Message对象
		sendResult = rocketMQTemplate.syncSend("TopicName:Tag", message);
    }
}

4.2 Потребители

4.2.1 конфигурационный файл application.yaml

rocketmq:
  name-server: 10.124.128.200:9876

  # 下面的配置只有在用阿里云的RocketMQ时,才配置,自己搭建的不需要配置
  consumer:
    access-key: 
    secret-key: 
  access-channel: CLOUD

4.2.2 Слушатели сообщений потребителей

@Slf4j
@Component
@RocketMQMessageListener(topic = "springboot-mq", 
		consumerGroup = "springboot-mq-consumer-1",
		selectorExpression = "*")
public class Consumer implements RocketMQListener<String> {

    @Override
    public void onMessage(String message) {
        log.info("Receive message:" + message);
		
		//如果消费失败,则抛出RuntimeException,RocketMQ会自动重试
		//可以手动抛出,也可以使用 Lombok 的 @SneakyThrows 注解来抛出 RuntimeException
		throw new RuntimeException("消费失败");
    }
}

@RocketMQMessageListenerОбщие параметры конфигурации для аннотаций:

параметр тип По умолчанию иллюстрировать
consumerGroup String группа потребителей
topic String Topic
selectorType SelectorType SelectorType.TAG Используйте TAG или SQL92, чтобы выбрать сообщение, тег по умолчанию
selectorExpression String "*" Контролируйте, какие сообщения могут быть выбраны
consumeMode ConsumeMode ConsumeMode.CONCURRENTLY Режим потребления, одновременный или последовательный прием, одновременный режим по умолчанию
messageModel MessageModel MessageModel.CLUSTERING Режим потребления, широковещательный режим или режим кластера, режим кластера по умолчанию
consumeThreadMax int 64 Максимальное количество потребительских потоков
consumeTimeout long 15L Тайм-аут потребления (максимальное время (в минутах), в течение которого сообщение может заблокировать потребляющий поток)
nameServer String Прочитать в файле конфигурации: ${rocketmq.name-server:} адрес сервера имен
accessKey String Прочитайте в файле конфигурации: ${rocketmq.consumer.access-key:} AK
secretKey String Прочитайте в файле конфигурации: ${rocketmq.consumer.secret-key:} SK
accessChannel String ${rocketmq.access-channel:}
# Пять, хранилище сообщений

Из-за высоких требований к надежности распределенных очередей данные должны храниться постоянно.

消息存储方式

  1. производитель сообщений отправляет сообщение
  2. MQ получает сообщение, сохраняет его и добавляет новую запись в хранилище.
  3. Вернуть ACK производителю
  4. Push-сообщение MQ соответствующему потребителю, а затем дождитесь, пока потребитель вернет ACK.
  5. Если потребитель сообщения успешно возвращает ACK в течение заданного времени, то MQ считает, что потребление сообщения прошло успешно, и удаляет сообщение в хранилище, то есть выполняется шаг 6; если MQ не получает ACK в течение заданного времени, он считает, что потребление сообщения не удалось, и попытается повторно отправить сообщение и повторить шаги 4, 5 и 6.
  6. MQ удалить сообщение

5.1 Носитель данных

В настоящее время несколько продуктов, обычно используемых в отрасли (RocketMQ/Kafka/RabbitMQ), все используют == сброс сообщений == на развернутую виртуальную машину/физическую машину == файловую систему== для сохранения (сброс обычно можно выполнить, разделивАсинхронная щеткаисинхронная щеткадва режима).

Сброс сообщений обеспечивает эффективный, надежный и высокопроизводительный метод сохранения данных для хранения сообщений. Если не развернута сама машина MQ или не завис локальный диск, обычно не возникает сбоев.

5.2 Структура хранения сообщений

RocketMQСообщение хранится вConsumeQueueиCommitLogЗавершено, реальный файл физического хранилища сообщенияCommitLog,ConsumeQueueЭто логическая очередь сообщений, аналогичная индексному файлу базы данных, в которой хранится адрес, указывающий на физическое хранилище. каждыйTopicпод каждымMessage Queueимеет соответствующийConsumeQueueдокумент.

在这里插入图片描述

  • CommitLog: сохранить метаданные сообщения
  • ConsumerQueue: сохранить сообщение вCommitLogиндекс чего-либо
  • IndexFile: Предоставляет способ запрашивать сообщения черезkeyили временной интервал для запроса метода сообщения, это черезIndexFileМетод поиска сообщений не влияет на основной процесс отправки и потребления сообщений.

5.3 Последовательная запись

RocketMQСообщения записываются последовательно, что обеспечивает скорость хранения сообщений.

При правильном использовании диска скорость диска может полностью соответствовать скорости передачи данных в сети. Текущие высокопроизводительные диски,писать последовательноскорость может быть достигнута600MB/s, что превышает скорость передачи обычной сетевой карты. но дискслучайная записьСкорость всего около100KB/s, в 6000 раз хуже, чем производительность при последовательной записи! Из-за такой огромной разницы в скорости хорошая система очередей сообщений будет на несколько порядков быстрее, чем обычная система очередей сообщений.

5.4 Мигающий механизм

RocketMQСообщения сохраняются на диске, что может не только обеспечить восстановление после сбоя питания, но и позволить объему хранимых сообщений превысить лимит памяти.RocketMQДля повышения производительности гарантируется максимально возможная последовательная запись на диск. Когда сообщения записываются в RocketMQ через Producer, существует два способа записи на диск: распределенная синхронная очистка диска и асинхронная очистка диска.

同步刷盘和异步刷盘

1) Синхронная чистка

К моменту возврата статуса успешной записи сообщение было записано на диск. Конкретный процесс заключается в том, что сообщение записывается в памятьPAGECACHEПосле этого немедленно уведомить поток очистки о необходимости сброса диска, а затем дождаться завершения очистки.После выполнения потока очистки он пробуждает ожидающий поток и возвращает статус, что сообщение было записано успешно.

2) Асинхронная очистка

При возврате статуса успешной записи сообщение могло быть просто записано в память.PAGECACHE, возврат операции записи быстрый, а пропускная способность большая; когда количество сообщений в памяти накапливается до определенного уровня, действие записи на диск запускается равномерно для быстрой записи.

3) Конфигурация

И синхронная чистка, и асинхронная чистка выполняются черезBrokerв файле конфигурацииflushDiskTypeЕсли параметр установлен, этот параметр настраивается какSYNC_FLUSH(Синхронизировать),ASYNC_FLUSH(асинхронный) в одном.

5.5 Нулевая копия

Операционная система Linux делится на [пользовательский режим】а также【Состояние ядра], операции с файлами и сетевые операции должны включать переключение между этими двумя формами, а копирование данных неизбежно.

Сервер отправляет содержимое файла локального диска клиенту, что обычно делится на два этапа:

  1. read: читать содержимое локального файла;

  2. write: отправить прочитанное содержимое по сети.

Эти две, казалось бы, простые операции на самом деле выполняли 4 копии данных, а именно:

  1. Копировать данные с диска в память режима ядра;
  2. Копировать из памяти режима ядра в память пользовательского режима;
  3. Затем скопируйте из памяти пользовательского режима в сетевую память режима ядра;
  4. Наконец, он копируется из памяти режима ядра сетевого драйвера на сетевую карту для передачи.

在这里插入图片描述

используяmmapспособ, можетУстранение копирования памяти в пользовательский режим, ускорить. Этот механизм реализован на Java с помощьюMappedByteBufferосуществленный

RocketMQВ полной мере используйте вышеупомянутые функции, которые представляют собой так называемую технологию "==zero copy==", чтобы повысить скорость хранения сообщений и передачи по сети.

Здесь следует отметить, что использованиеMappedByteBufferУ этого способа отображения памяти есть несколько ограничений, одно из которых заключается в том, что вы можете отображать не более1.5файл в виртуальную память пользовательского режима, поэтомуRocketMQПо умолчанию одинCommitLog(хранит метаданные сообщения) Файл данных1Gпричина

6. Механизм высокой доступности

在这里插入图片描述

RocketMQРаспределенные кластерыMasterа такжеSlaveСотрудничество обеспечивает высокую доступность.

Masterа такжеSlaveРазница: вBrokerВ файле конфигурации значение параметра brokerId равно 0, чтобы указать, что брокер является ведущим, больше 0, чтобы указать, что брокер является подчиненным, а параметр BrokerRole также указывает, является ли брокер ведущим или подчиненным. .

Роль Broker of the Master поддерживает чтение и запись, а роль Broker of the Slave поддерживает только чтение, то есть Producer может подключаться к Broker роли Master только для написания сообщений; Consumer может подключаться к Broker роли Роль Мастера, либо ее можно подключить к Роли Брокера ведомой для считывания информации.

6.1 Высокая доступность потребления сообщений

В конфигурационном файле Потребителя нет необходимости указывать, читать с Мастера или со Слейва, когда Мастер недоступен или занят, Потребитель будет автоматически переключаться на чтение с Слейва. С механизмом автоматического переключения Потребителей, когда машина с ролью Мастера выходит из строя, Потребитель все еще может читать сообщения от Ведомого, не затрагивая программу Потребителя. Это обеспечивает высокую доступность со стороны потребителя.

6.2 Высокая доступность отправки сообщений

При создании темы создайте несколько очередей сообщений темы в нескольких группах посредников (компьютеры с одинаковым именем посредника и разными идентификаторами брокера образуют группу посредников), чтобы, когда мастер группы посредников недоступен, мастера других групп по-прежнему доступен, производитель все еще может отправлять сообщения. RocketMQ в настоящее время не поддерживает автоматическое преобразование слейва в мастер.Если ресурсов машины недостаточно и слейв необходимо преобразовать в мастер, необходимо вручную остановить брокер роли слейва, изменить файл конфигурации и запустить брокер с новым файлом конфигурации.

在这里插入图片描述

6.3 Репликация ведущий-ведомый

Если в группе посредников есть Master и Slave, сообщения необходимо реплицировать от Master к Slave.Существует два метода репликации: синхронный и асинхронный.

1) Синхронная репликация

Синхронный метод репликации заключается в том, чтобы дождаться успешной записи ведущего и ведомого устройств, прежде чем сообщать об успешном состоянии записи клиенту;

В режиме синхронной репликации, если мастер выходит из строя, все резервные данные остаются у ведомого, которые легко восстановить, но синхронная репликация увеличит задержку записи данных и снизит пропускную способность системы.

2) Асинхронная репликация

Метод асинхронной репликации заключается в том, что до тех пор, пока мастер выполняет успешную запись, он может сообщать клиенту о состоянии успешной записи.

В режиме асинхронной репликации система имеет меньшую задержку и более высокую пропускную способность, но в случае сбоя ведущего часть данных может быть потеряна, поскольку они не записываются на ведомое устройство;

3) Конфигурация

Синхронная репликация и асинхронная репликация выполняются через файл конфигурации Broker.brokerRoleПараметр установлен, и значение этого параметра:

  • ASYNC_MASTER: главный узел асинхронной репликации
  • SYNC_MASTER: главный узел синхронной репликации
  • SLAVE: подчиненный узел

4) Резюме

在这里插入图片描述

В практических приложениях необходимо комбинировать бизнес-сценарии и задавать разумные настройки.Кисть метода такжерепликация master-slaveспособ, особенноSYNC_FLUSH(синхронной очистки), так как действие записи на диск часто запускается, производительность будет значительно снижена.

В норме должно бытьMasterа такжеSlaveнастроен какASYNC_FLUSH(Асинхронная щетка) метод очистки, ведущий и ведомый настроены какSYNC_MASTER(синхронная репликация) метод репликации, так что даже в случае сбоя одной машины она все равно может гарантировать, что данные не будут потеряны, что является хорошим выбором.

Семь, балансировка нагрузки

7.1 Балансировка нагрузки производителя

На стороне производителя, когда каждый экземпляр отправляет сообщение, == будет опрашивать все очереди сообщений для отправки == по умолчанию, чтобы сообщения в среднем попадали в разные средние значения.queueначальство. И из-заqueueможно разбросать по разнымbroker, поэтому сообщение отправляется другомуbrokerвниз, как показано ниже:

在这里插入图片描述

Метки на линиях со стрелками на рисунке представляют последовательность: издатель отправит первое сообщение в очередь 0, затем второе сообщение в очередь 1 и так далее.

7.2 Балансировка потребительской нагрузки

1) Кластерный режим

В режиме потребления кластера каждая подписка на этотtopicизгруппа потребителейбудут получать сообщения, каждое сообщение будет получено только однимгруппа потребителейЭкземпляр потребления в . RocketMQ использует активное извлечение для извлечения и использования сообщений.При извлечении вам необходимо четко указать, какую очередь сообщений извлекать.

Каждый раз, когда количество экземпляров изменяется, будет запускаться балансировка нагрузки для всех экземпляров, при этом очереди будут равномерно распределяться между каждым экземпляром в соответствии с количеством очередей и количеством экземпляров.

Алгоритм распределения по умолчанию:AllocateMessageQueueAveragely,Как показано ниже:

在这里插入图片描述

Другой алгоритм усредненияAllocateMessageQueueAveragelyByCircle, который также распределяет каждую очередь поровну, но в виде кольцевой очереди, как показано на следующем рисунке:

consumer负载均衡2

Следует отметить, что в кластерном режиме может быть выделен только один экземпляр очереди, потому что, если несколько экземпляров потребляют сообщения из одной очереди одновременно, поскольку извлекаемые сообщения активно контролируются потребителем, приводят к одному и тому же сообщению.Оно потребляется несколько раз в разных экземплярах, поэтому алгоритмически очередь назначается только одному экземпляру потребителя, а экземпляр потребителя может быть назначен разным очередям одновременно.

увеличиваяconsumerэкземпляр для распределенияqueueпотребление может играть роль в горизонтальном расширении потребительской способности. Когда экземпляр отключается, балансировка нагрузки запускается повторно. В это время исходный выделенныйqueueОн будет выделен другим экземплярам для продолжения потребления.

но еслиconsumerколичество экземпляровmessage queueЕсли общее количествоconsumerэкземпляр не будет назначенqueue, он не сможет использовать сообщение и не сможет выполнять роль распределения нагрузки. == Поэтому необходимо контролировать, чтобы общее количество очередей было больше или равно количеству потребителей. ==

2) Режим трансляции

Поскольку сообщение должно быть доставлено всем экземплярам потребителей в группе потребителей в широковещательном режиме, не существует такой вещи, как амортизированное потребление сообщений.

С точки зрения реализации одно из отличий состоит в том, что когда потребители выделяют очереди, все потребители выделяются для всех очередей.

Восемь, повтор сообщения

8.1 Повтор последовательных сообщений

Для последовательных сообщений, когда потребителю не удается обработать сообщение, очередь сообщений RocketMQ будет автоматически повторять сообщение непрерывно (каждый интервал равен 1 секунде).В это время приложение будет заблокировано при потреблении сообщения. Поэтому при использовании последовательных сообщений необходимо убедиться, что приложение может своевременно отслеживать и обрабатывать сбои потребления, чтобы избежать блокировки.

8.2 Повторная попытка сообщений не по порядку

Для сообщений не по порядку (обычные, синхронизированные, отложенные, транзакционные сообщения), когда потребителю не удается использовать сообщение, вы можете установить статус возврата, чтобы получить результат повторной попытки сообщения.

Повторная попытка сообщений не по порядку действует только для режима потребления кластера; широковещательный режим не предоставляет функцию повторной попытки при сбое, то есть после сбоя потребления неудачное сообщение не будет повторяться, а новые сообщения будут продолжаться. потребляться.

1) Количество попыток

Очередь сообщений RocketMQ позволяет по умолчанию повторять каждое сообщение до 16 раз.Интервал между каждой повторной попыткой следующий:

количество повторных попыток интервал времени с последней попытки количество повторных попыток интервал времени с последней попытки
1 10 секунд 9 7 минут
2 30 секунд 10 8 минут
3 1 минута 11 9 минут
4 2 минуты 12 10 минут
5 3 минуты 13 20 минут
6 4 минуты 14 30 минут
7 5 минут 15 1 час
8 6 минут 16 два часа

Если сообщение не удается отправить после 16 попыток, сообщение больше не будет доставлено. Если указанный выше интервал повторных попыток строго рассчитан, сообщение будет повторено 16 раз в течение следующих 4 часов и 46 минут при условии, что потребление сообщения было безуспешным, и сообщение не будет повторено для доставки за пределами этого временного диапазона. . . .

Уведомление:Независимо от того, сколько раз сообщение было отправлено повторно, идентификатор этих повторных сообщений не изменится.

2) Метод конфигурации

После сбоя потребления повторите попытку метода настройки

В режиме потребления кластера, если потребление сообщения не удается и ожидается, что сообщение будет отправлено повторно, его необходимо явно настроить в реализации интерфейса прослушивателя сообщений (выберите один из трех методов):

  • вернутьAction.ReconsumeLater(рекомендовать)
  • вернуть ноль
  • Выбросить исключение
public class MessageListenerImpl implements MessageListener {
    @Override
    public Action consume(Message message, ConsumeContext context) {
        //处理消息
        doConsumeMessage(message);
        //方式1:返回 Action.ReconsumeLater,消息将重试
        return Action.ReconsumeLater;
        //方式2:返回 null,消息将重试
        return null;
        //方式3:直接抛出异常, 消息将重试
        throw new RuntimeException("Consumer Message exceotion");
    }
}

После сбоя потребления метод конфигурации не повторяется.

В режиме потребления кластера после сбоя сообщения ожидается, что сообщение не будет повторено.Необходимо перехватывать исключения, которые могут быть выброшены в логике потребления, и, наконец, возвращатьAction.CommitMessage, сообщение не будет повторно отправлено после этого.

public class MessageListenerImpl implements MessageListener {
    @Override
    public Action consume(Message message, ConsumeContext context) {
        try {
            doConsumeMessage(message);
        } catch (Throwable e) {
            //捕获消费逻辑中的所有异常,并返回 Action.CommitMessage;
            return Action.CommitMessage;
        }
        //消息处理正常,直接返回 Action.CommitMessage;
        return Action.CommitMessage;
    }
}

Пользовательское сообщение максимальное количество повторов

Очередь сообщений RocketMQ позволяет потребителю установить максимальное количество повторных попыток при запуске, а интервал повторных попыток будет соответствовать следующей стратегии:

  • Если максимальное количество попыток меньше или равно 16, интервал времени повтора будет таким же, как указано в таблице выше.
  • Максимальное количество повторных попыток превышает 16, а интервал повторных попыток для более чем 16 повторных попыток составляет 2 часа каждый раз.
Properties properties = new Properties();
//配置对应 Group ID 的最大消息重试次数为 20 次
properties.put(PropertyKeyConst.MaxReconsumeTimes,"20");
Consumer consumer =ONSFactory.createConsumer(properties);

Уведомление:

  • Параметр максимального количества повторных попыток сообщения действителен для всех экземпляров Потребителей с одним и тем же идентификатором группы.
  • Если MaxReconsumeTimes задан только для одного из двух экземпляров потребителей с одним и тем же идентификатором группы, конфигурация вступает в силу для обоих экземпляров потребителей.
  • Конфигурация вступает в силу путем перезаписи, то есть последний запущенный экземпляр Consumer перезапишет конфигурацию ранее запущенного экземпляра.

Получить повторные попытки сообщения

После того, как потребитель получит сообщение, количество повторных попыток сообщения можно получить следующим образом:

public class MessageListenerImpl implements MessageListener {
    @Override
    public Action consume(Message message, ConsumeContext context) {
        //获取消息的重试次数
        System.out.println(message.getReconsumeTimes());
        return Action.CommitMessage;
    }
}

3) Повторите попытку с несколькими группами потребителей.

Предположим, что есть группа потребителей A и группа потребителей B, когда A и B слушают одно и то же.topic, и A, и B получают одно и то же сообщение, но A не может потреблять (return Action.ReconsumeLater), но B успешно потребляет. Затем при повторной попыткеrocketMQСообщение будет отправлено только группе потребителей B и не будет отправлено группе потребителей A.

9. Очередь недоставленных писем

Когда сообщение не может быть использовано в первый раз, очередь сообщений RocketMQ автоматически повторит попытку сообщения; после достижения максимального количества попыток, если потребление все еще не удается, это означает, что потребитель не может правильно использовать сообщение при нормальных обстоятельствах. В это время очередь сообщений RocketMQ Сообщение не отбрасывается сразу, а отправляется в специальную очередь, соответствующую этому потребителю.

В очереди сообщений RocketMQ такие сообщения, которые не могут быть использованы в обычных условиях, называются недоставленными сообщениями, а специальная очередь, в которой хранятся недоставленные сообщения, называется очередью недоставленных сообщений.

9.1 Функции недоставленных писем

сообщение с мертвой буквойИмеет следующие характеристики:

  • Он больше не будет нормально потребляться потребителями.
  • Срок действия такой же, как у обычных сообщений, которые составляют 3 дня, и будут автоматически удалены через 3 дня. Поэтому, пожалуйста, обработайте сообщение о недоставленном письме незамедлительно в течение 3 дней после его создания.

очередь недоставленных сообщенийИмеет следующие характеристики:

  • Очередь недоставленных сообщений соответствует идентификатору группы, а не отдельному экземпляру потребителя.
  • Если идентификатор группы не генерирует недоставленное сообщение, очередь сообщений RocketMQ не создаст для него соответствующую очередь недоставленных сообщений.
  • Очередь недоставленных сообщений содержит все недоставленные сообщения, сгенерированные соответствующим идентификатором группы, независимо от того, к какой теме относится сообщение.

9.2 Просмотр информации о недоставленных письмах

  1. Запросите информацию о теме очереди недоставленных сообщений в консоли.

    在这里插入图片描述

  2. Запрос недоставленных сообщений по теме в интерфейсе сообщений

    在这里插入图片描述

  3. Выберите, чтобы повторно отправить сообщение

    Когда сообщение попадает в очередь недоставленных сообщений, это означает, что какой-то фактор не позволяет потребителю нормально использовать сообщение, поэтому оно обычно требует от вас специальной обработки. После проверки подозрительных факторов и решения проблемы вы можете повторно отправить сообщение в консоль RocketMQ очереди сообщений, чтобы потребители могли его снова использовать.

Десять, идемпотент потребления

После получения сообщения потребитель очереди сообщений RocketMQ должен выполнить идемпотентную обработку сообщения в соответствии с уникальным ключом в бизнесе.

10.1 Необходимость идемпотентности потребления

В интернет-приложениях, особенно когда сеть нестабильна, сообщения в очереди сообщений RocketMQ могут повторяться.Это повторение можно обобщить следующим образом:

  • Дублирование сообщения при отправке

    Когда сообщение было успешно отправлено на сервер и сохраняется, происходит сбой в сети или клиент не работает, в результате чего сервер не отвечает клиенту. Если производитель понимает, что сообщение не удалось отправить, и пытается отправить сообщение снова, потребитель получит два сообщения с одинаковым содержимым и одним и тем же идентификатором сообщения.

  • Дублирование сообщения о доставке

    В сценарии потребления сообщения сообщение было доставлено потребителю и бизнес-обработка завершена.Когда клиент отправляет ответ на сервер, сеть отключается. Чтобы гарантировать, что сообщение будет потреблено хотя бы один раз, сервер очереди сообщений RocketMQ попытается снова доставить ранее обработанное сообщение после восстановления сети, а потребитель впоследствии получит два сообщения с одинаковым содержимым и одинаковым Идентификатор сообщения.

  • Дублирование сообщений во время балансировки нагрузки (включая, помимо прочего, дрожание сети, перезапуск брокера и перезапуск приложения подписчика)

    Когда брокер или клиент очереди сообщений RocketMQ перезапускается, расширяется или сжимается, будет запущена перебалансировка, и в это время потребители могут получать повторяющиеся сообщения.

10.2 Обращение

Поскольку идентификатор сообщения может конфликтовать (дублироваться), не рекомендуется использовать идентификатор сообщения в качестве основы для действительно безопасной идемпотентной обработки. Лучше всего использовать уникальный бизнес-идентификатор в качестве основы для идемпотентной обработки, а уникальный идентификатор бизнеса можно установить через сообщение Key:

Message message = new Message();
message.setKey("ORDERID_100");
SendResult sendResult = producer.send(message);

Когда подписчик получает сообщение, он может выполнить идемпотентную обработку в соответствии с ключом сообщения:

consumer.subscribe("ons_test", "*", new MessageListener() {
    public Action consume(Message message, ConsumeContext context) {
        String key = message.getKey()
        // 根据业务唯一标识的 key 做幂等处理
    }
});

11. Меры предосторожности при использовании RocketMQ

  1. В той же группе потребителейПотребительская логика должна быть такой же(мониторингtopic,tagподобные)
  2. В конфигурации по умолчанию сообщения распределяются между различными группами потребителей (все группы потребителей могут получать одно и то же сообщение), а потребители в группе потребителей распределяются по нагрузке (только один потребитель получит сообщение).