RocketMQ быстро запускается

RocketMQ

Начать быстро

Это краткое руководство относится к настройке системы обмена сообщениями RocketMQ на локальном компьютере и содержит подробные инструкции по базовому производству и использованию.

необходимая среда

  1. 64-битная операционная система
  2. 64-разрядный JDK1.8+
  3. Maven 3.2.x +
  4. Git
  5. Место для хранения выше 4G используется для экземпляра прокси

Скачайте и соберите релизную версию

существуетздесьИсходный код официальной версии 4.4.0 можно скачать, а также наздесьСкачать бинарную версию

Теперь выполните следующие команды, чтобы распаковать исходную версию 4.4.0 и собрать компоненты двоичного пакета.

  > unzip rocketmq-all-4.4.0-source-release.zip
  > cd rocketmq-all-4.4.0/
  > mvn -Prelease-all -DskipTests clean install -U
  > cd distribution/target/apache-rocketmq

mvn -Prelease-all -DskipTests clean install -UКоманда перейдет к сборке загруженного пакета исходного кода, а затем создаст целевой каталог в каталоге дистрибутива, где находится собранный пакет.

Запустить сервер имен

charse@charse-thinkpad:/media/charse/文档/Code/study/MQ/rocketmq-all-4.4.0/distribution/targe
t/apache-rocketmq$ nohup sh bin/mqnamesrv &
[1] 5273
charse@charse-thinkpad:/media/charse/文档/Code/study/MQ/rocketmq-all-4.4.0/distribution/targe
t/apache-rocketmq$ nohup: 忽略输入并把输出追加到'nohup.out'
tail -f ~/logs/rocketmqlogs/namesrv.log
2019-03-31 19:24:10 INFO main - tls.client.keyPath = null
2019-03-31 19:24:10 INFO main - tls.client.keyPassword = null
2019-03-31 19:24:10 INFO main - tls.client.certPath = null
2019-03-31 19:24:10 INFO main - tls.client.authServer = false
2019-03-31 19:24:10 INFO main - tls.client.trustCertPath = null
2019-03-31 19:24:11 INFO main - Using OpenSSL provider
2019-03-31 19:24:11 INFO main - SSLContext created for server
2019-03-31 19:24:12 INFO NettyEventExecutor - NettyEventExecutor service started
2019-03-31 19:24:12 INFO FileWatchService - FileWatchService service started
2019-03-31 19:24:12 INFO main - The Name Server boot success. serializeType=JSON
2019-03-31 19:25:11 INFO NSScheduledThread1 - --------------------------------------------------------
2019-03-31 19:25:11 INFO NSScheduledThread1 - configTable SIZE: 0

можно увидетьThe Name Server boot successВы можете знать, что запуск прошел успешно, а метод сериализации — JSON.

Запустить брокерский сервер

charse@charse-thinkpad:/media/charse/文档/Code/study/MQ/rocketmq-all-4.4.0/distr
ibution/target/apache-rocketmq$ nohup sh bin/mqbroker -n localhost:9876 &
[1] 5784
nohup: 忽略输入并把输出追加到'nohup.out'
charse@charse-thinkpad:/media/charse/文档/Code/study/MQ/rocketmq-all-4.4.0/distr
ibution/target/apache-rocketmq$ tail -f ~/logs/rocketmqlogs/broker.log 
2019-03-31 19:41:12 INFO BrokerControllerScheduledThread1 - Slave fall behind master: 0 bytes
2019-03-31 19:41:22 INFO brokerOutApi_thread_3 - register broker to name server localhost:9876 OK
2019-03-31 19:41:52 INFO brokerOutApi_thread_4 - register broker to name server localhost:9876 OK
2019-03-31 19:42:12 INFO BrokerControllerScheduledThread1 - dispatch behind commit log 0 bytes
2019-03-31 19:42:12 INFO BrokerControllerScheduledThread1 - Slave fall behind master: 0 bytes
2019-03-31 19:42:22 INFO brokerOutApi_thread_1 - register broker to name server localhost:9876 OK
2019-03-31 19:42:52 INFO brokerOutApi_thread_2 - register broker to name server localhost:9876 OK

Вы можете видеть, что брокер успешно зарегистрирован на сервере имен.

В то же время вы можете использовать команду jps, чтобы проверить, успешно ли запущена служба.

charse@charse-thinkpad:~$ jps
12128 Main
12549 Jps
5279 NamesrvStartup
5791 BrokerStartup

Как видите, и NameServer, и Broker успешно запущены, поэтому мы можем перейти к следующему шагу, чтобы смоделировать отправителя и получателя.

Broker

При программном создании сообщений в тему (TopicTest) посредник обнаруживает, что тема не существует, затем посредник создает тему (TopicTest) по умолчанию и настраивает конфигурацию по умолчанию. Производитель показывает адрес моей локальной сети (192.168.3.16)

2019-03-31 20:37:43 WARN SendMessageThread_1 - the topic TopicTest not exist, producer: /192.168.3.16:47538
2019-03-31 20:37:43 INFO SendMessageThread_1 - Create new topic by default topic:[TBW102] config:[TopicConfig [topicName=TopicTest, readQueueNums=4, writeQueueNums=4, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false]] producer:[192.168.3.16:47538]
2019-03-31 20:37:43 INFO brokerOutApi_thread_4 - register broker to name server localhost:9876 OK
2019-03-31 20:37:43 INFO HeartbeatThread_1 - new producer connected, group: CLIENT_INNER_PRODUCER channel: ClientChannelInfo [channel=[id: 0x9725cb35, L:/192.168.3.16:10911 - R:/192.168.3.16:48222], clientId=192.168.3.16@14206, language=JAVA, version=293, lastUpdateTimestamp=1554035863842]
2019-03-31 20:37:43 INFO HeartbeatThread_1 - new producer connected, group: producer1 channel: ClientChannelInfo [channel=[id: 0x9725cb35, L:/192.168.3.16:10911 - R:/192.168.3.16:48222], clientId=192.168.3.16@14206, language=JAVA, version=293, lastUpdateTimestamp=1554035863842]
2019-03-31 20:37:43 INFO ClientManageThread_1 - unregister a producer[producer1] from groupChannelTable ClientChannelInfo [channel=[id: 0x9725cb35, L:/192.168.3.16:10911 - R:/192.168.3.16:48222], clientId=192.168.3.16@14206, language=JAVA, version=293, lastUpdateTimestamp=1554035863927]
2019-03-31 20:37:43 INFO ClientManageThread_1 - unregister a producer group[producer1] from groupChannelTable
2019-03-31 20:37:43 INFO ClientManageThread_2 - unregister a producer[CLIENT_INNER_PRODUCER] from groupChannelTable ClientChannelInfo [channel=[id: 0x9725cb35, L:/192.168.3.16:10911 - R:/192.168.3.16:48222], clientId=192.168.3.16@14206, language=JAVA, version=293, lastUpdateTimestamp=1554035863933]
2019-03-31 20:37:43 INFO ClientManageThread_2 - unregister a producer group[CLIENT_INNER_PRODUCER] from groupChannelTable
2019-03-31 20:37:52 INFO brokerOutApi_thread_1 - register broker to name server localhost:9876 OK
2019-03-31 20:38:12 INFO BrokerControllerScheduledThread1 - dispatch behind commit log 0 bytes
2019-03-31 20:38:12 INFO BrokerControllerScheduledThread1 - Slave fall behind master: 1780 bytes

Когда вы создаете производителя, вы можете видеть, что он сначала создаст его.CLIENT_INNER_PRODUCERпродюсер вgroupChannelTableа затем создать производителя клиента, а именноproducer1зарегистрироваться вgroupChannelTable, вы можете увидеть некоторую информацию в этом производителе. Но когда производитель закрывает shutdown. Сначала закройте клиентproducer1Затем сгруппируйтеChannelTable изCLIENT_INNER_PRODUCERУдалить. Агент будет время от времени регистрироваться на сервере имен.

Когда клиент создает потребителя, как показано в выходных данных журнала агента, как показано на рисунке ниже, вы можете видеть, что при наличии потребителя будет создана группа подписки, и информация о конфигурации группы подписки будет создан, а затем, после подключения нового потребителя, Topi будет добавлен в соответствующую группу, в которую входит тема (Topic), на которую вы подписаны, и одновременно будет добавлена ​​повторная темаz.Эта тема основана на%RETRY%消费者group名称Named и добавьте подписку. В то же время также создается новый производитель, которыйCLIENT_INNER_PRODUCER

2019-03-31 20:38:22 INFO brokerOutApi_thread_2 - register broker to name server localhost:9876 OK
2019-03-31 20:38:35 INFO HeartbeatThread_2 - auto create a subscription group, SubscriptionGroupConfig [groupName=consumer1, consumeEnable=true, consumeFromMinEnable=true, consumeBroadcastEnable=true, retryQueueNums=1, retryMaxTimes=16, brokerId=0, whichBrokerWhenConsumeSlowly=1, notifyConsumerIdsChangedEnable=true]
2019-03-31 20:38:35 INFO HeartbeatThread_2 - create new topic TopicConfig [topicName=%RETRY%consumer1, readQueueNums=1, writeQueueNums=1, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false]
2019-03-31 20:38:35 INFO brokerOutApi_thread_3 - register broker to name server localhost:9876 OK
2019-03-31 20:38:35 INFO HeartbeatThread_2 - new consumer connected, group: consumer1 CONSUME_PASSIVELY CLUSTERING channel: ClientChannelInfo [channel=[id: 0x1c14007d, L:/192.168.3.16:10911 - R:/192.168.3.16:48236], clientId=192.168.3.16@14311, language=JAVA, version=293, lastUpdateTimestamp=1554035915211]
2019-03-31 20:38:35 INFO HeartbeatThread_2 - subscription changed, add new topic, group: consumer1 SubscriptionData [classFilterMode=false, topic=TopicTest, subString=*, tagsSet=[], codeSet=[], subVersion=1554035913253, expressionType=TAG]
2019-03-31 20:38:35 INFO HeartbeatThread_2 - subscription changed, add new topic, group: consumer1 SubscriptionData [classFilterMode=false, topic=%RETRY%consumer1, subString=*, tagsSet=[], codeSet=[], subVersion=1554035913345, expressionType=TAG]
2019-03-31 20:38:35 INFO HeartbeatThread_2 - registerConsumer info changed ConsumerData [groupName=consumer1, consumeType=CONSUME_PASSIVELY, messageModel=CLUSTERING, consumeFromWhere=CONSUME_FROM_FIRST_OFFSET, unitMode=false, subscriptionDataSet=[SubscriptionData [classFilterMode=false, topic=TopicTest, subString=*, tagsSet=[], codeSet=[], subVersion=1554035913253, expressionType=TAG], SubscriptionData [classFilterMode=false, topic=%RETRY%consumer1, subString=*, tagsSet=[], codeSet=[], subVersion=1554035913345, expressionType=TAG]]] 192.168.3.16:48236
2019-03-31 20:38:35 INFO HeartbeatThread_2 - new producer connected, group: CLIENT_INNER_PRODUCER channel: ClientChannelInfo [channel=[id: 0x1c14007d, L:/192.168.3.16:10911 - R:/192.168.3.16:48236], clientId=192.168.3.16@14311, language=JAVA, version=293, lastUpdateTimestamp=1554035915412]
2019-03-31 20:38:35 INFO HeartbeatThread_3 - subscription changed, group: consumer1 OLD: SubscriptionData [classFilterMode=false, topic=TopicTest, subString=*, tagsSet=[], codeSet=[], subVersion=1554035913253, expressionType=TAG] NEW: SubscriptionData [classFilterMode=false, topic=TopicTest, subString=*, tagsSet=[], codeSet=[], subVersion=1554035915833, expressionType=TAG]
2019-03-31 20:38:35 INFO HeartbeatThread_4 - subscription changed, group: consumer1 OLD: SubscriptionData [classFilterMode=false, topic=%RETRY%consumer1, subString=*, tagsSet=[], codeSet=[], subVersion=1554035913345, expressionType=TAG] NEW: SubscriptionData [classFilterMode=false, topic=%RETRY%consumer1, subString=*, tagsSet=[], codeSet=[], subVersion=1554035915843, expressionType=TAG]
2019-03-31 20:38:52 INFO brokerOutApi_thread_4 - register broker to name server localhost:9876 OK
2019-03-31 20:39:12 INFO BrokerControllerScheduledThread1 - dispatch behind commit log 0 bytes
2019-03-31 20:39:12 INFO BrokerControllerScheduledThread1 - Slave fall behind master: 1780 bytes
2019-03-31 20:39:22 INFO brokerOutApi_thread_1 - register broker to name server localhost:9876 OK
2019-03-31 20:39:52 INFO brokerOutApi_thread_2 - register broker to name server localhost:9876 OK
2019-03-31 20:40:12 INFO BrokerControllerScheduledThread1 - dispatch behind commit log 0 bytes

NameServer

Когда тема не создана, можно увидеть вывод журнала в nameever, и многие темы создаются по умолчанию в Roacket MQ.

2019-03-31 19:39:28 INFO NettyServerCodecThread_1 - NETTY SERVER PIPELINE: channelRegistered 127.0.0.1:48210
2019-03-31 19:39:28 INFO NettyServerCodecThread_1 - NETTY SERVER PIPELINE: channelActive, the channel[127.0.0.1:48210]
2019-03-31 19:39:41 INFO RemotingExecutorThread_1 - new topic registered, charse-thinkpad QueueData [brokerName=charse-thinkpad, readQueueNums=1, writeQueueNums=1, perm=7, topicSynFlag=0]
2019-03-31 19:39:41 INFO RemotingExecutorThread_1 - new topic registered, BenchmarkTest QueueData [brokerName=charse-thinkpad, readQueueNums=1024, writeQueueNums=1024, perm=6, topicSynFlag=0]
2019-03-31 19:39:41 INFO RemotingExecutorThread_1 - new topic registered, OFFSET_MOVED_EVENT QueueData [brokerName=charse-thinkpad, readQueueNums=1, writeQueueNums=1, perm=6, topicSynFlag=0]
2019-03-31 19:39:41 INFO RemotingExecutorThread_1 - new topic registered, TBW102 QueueData [brokerName=charse-thinkpad, readQueueNums=8, writeQueueNums=8, perm=7, topicSynFlag=0]
2019-03-31 19:39:41 INFO RemotingExecutorThread_1 - new topic registered, SELF_TEST_TOPIC QueueData [brokerName=charse-thinkpad, readQueueNums=1, writeQueueNums=1, perm=6, topicSynFlag=0]
2019-03-31 19:39:41 INFO RemotingExecutorThread_1 - new topic registered, DefaultCluster QueueData [brokerName=charse-thinkpad, readQueueNums=16, writeQueueNums=16, perm=7, topicSynFlag=0]
2019-03-31 19:39:41 INFO RemotingExecutorThread_1 - new broker registered, 192.168.3.16:10911 HAServer: 192.168.3.16:10912
2019-03-31 19:40:12 INFO RemotingExecutorThread_4 - new topic registered, RMQ_SYS_TRANS_HALF_TOPIC QueueData [brokerName=charse-thinkpad, readQueueNums=1, writeQueueNums=1, perm=6, topicSynFlag=0]

При использовании клиента для создания темы (TopicTest) видно, что тема зарегистрирована на сервере имен.

2019-03-31 20:37:42 INFO NettyServerCodecThread_2 - NETTY SERVER PIPELINE: channelRegistered 127.0.0.1:49208
2019-03-31 20:37:42 INFO NettyServerCodecThread_2 - NETTY SERVER PIPELINE: channelActive, the channel[127.0.0.1:49208]
2019-03-31 20:37:43 INFO RemotingExecutorThread_4 - new topic registered, TopicTest QueueData [brokerName=charse-thinkpad, readQueueNums=4, writeQueueNums=4, perm=6, topicSynFlag=0]
2019-03-31 20:37:43 INFO NettyServerCodecThread_2 - NETTY SERVER PIPELINE: channelInactive, the channel[127.0.0.1:49208]
2019-03-31 20:37:43 INFO NettyServerCodecThread_2 - NETTY SERVER PIPELINE: channelUnregistered, the channel[127.0.0.1:49208]
2019-03-31 20:38:34 INFO NettyServerCodecThread_3 - NETTY SERVER PIPELINE: channelRegistered 127.0.0.1:49224
2019-03-31 20:38:34 INFO NettyServerCodecThread_3 - NETTY SERVER PIPELINE: channelActive, the channel[127.0.0.1:49224]
2019-03-31 20:38:35 INFO RemotingExecutorThread_4 - new topic registered, %RETRY%consumer1 QueueData [brokerName=charse-thinkpad, readQueueNums=1, writeQueueNums=1, perm=6, topicSynFlag=0]

Когда потребитель закрыт, потребитель клиента будетconsumer1отconsumerGroupInfoчтобы выйти из системы . Затем установитеCLIENT_INNER_PRODUCERотgroupChannelTableвыйти из системы.

2019-03-31 21:50:08 INFO HeartbeatThread_3 - new consumer connected, group: consumer1 CONSUME_PASSIVELY CLUSTERING channel: ClientChannelInfo [channel=[id: 0x7e95d6ee, L:/192.168.3.16:10911 - R:/192.168.3.16:49810], clientId=192.168.3.16@23852, language=JAVA, version=293, lastUpdateTimestamp=1554040208700]
2019-03-31 21:50:08 INFO HeartbeatThread_3 - subscription changed, add new topic, group: consumer1 SubscriptionData [classFilterMode=false, topic=TopicTest, subString=*, tagsSet=[], codeSet=[], subVersion=1554040206960, expressionType=TAG]
2019-03-31 21:50:08 INFO HeartbeatThread_3 - subscription changed, add new topic, group: consumer1 SubscriptionData [classFilterMode=false, topic=%RETRY%consumer1, subString=*, tagsSet=[], codeSet=[], subVersion=1554040206999, expressionType=TAG]
2019-03-31 21:50:08 INFO HeartbeatThread_3 - registerConsumer info changed ConsumerData [groupName=consumer1, consumeType=CONSUME_PASSIVELY, messageModel=CLUSTERING, consumeFromWhere=CONSUME_FROM_FIRST_OFFSET, unitMode=false, subscriptionDataSet=[SubscriptionData [classFilterMode=false, topic=TopicTest, subString=*, tagsSet=[], codeSet=[], subVersion=1554040206960, expressionType=TAG], SubscriptionData [classFilterMode=false, topic=%RETRY%consumer1, subString=*, tagsSet=[], codeSet=[], subVersion=1554040206999, expressionType=TAG]]] 192.168.3.16:49810
2019-03-31 21:50:08 INFO HeartbeatThread_3 - new producer connected, group: CLIENT_INNER_PRODUCER channel: ClientChannelInfo [channel=[id: 0x7e95d6ee, L:/192.168.3.16:10911 - R:/192.168.3.16:49810], clientId=192.168.3.16@23852, language=JAVA, version=293, lastUpdateTimestamp=1554040208700]
2019-03-31 21:50:08 INFO HeartbeatThread_4 - subscription changed, group: consumer1 OLD: SubscriptionData [classFilterMode=false, topic=%RETRY%consumer1, subString=*, tagsSet=[], codeSet=[], subVersion=1554040206999, expressionType=TAG] NEW: SubscriptionData [classFilterMode=false, topic=%RETRY%consumer1, subString=*, tagsSet=[], codeSet=[], subVersion=1554040208756, expressionType=TAG]
2019-03-31 21:50:08 INFO ClientManageThread_3 - unregister a consumer[consumer1] from consumerGroupInfo ClientChannelInfo [channel=[id: 0x7e95d6ee, L:/192.168.3.16:10911 - R:/192.168.3.16:49810], clientId=192.168.3.16@23852, language=JAVA, version=293, lastUpdateTimestamp=1554040208763]
2019-03-31 21:50:08 INFO ClientManageThread_3 - unregister consumer ok, no any connection, and remove consumer group, consumer1
2019-03-31 21:50:08 INFO ClientManageThread_4 - unregister a producer[CLIENT_INNER_PRODUCER] from groupChannelTable ClientChannelInfo [channel=[id: 0x7e95d6ee, L:/192.168.3.16:10911 - R:/192.168.3.16:49810], clientId=192.168.3.16@23852, language=JAVA, version=293, lastUpdateTimestamp=1554040208801]
2019-03-31 21:50:08 INFO ClientManageThread_4 - unregister a producer group[CLIENT_INNER_PRODUCER] from groupChannelTable
2019-03-31 21:50:12 INFO BrokerControllerScheduledThread1 - dispatch behind commit log 0 bytes
2019-03-31 21:50:12 INFO BrokerControllerScheduledThread1 - Slave fall behind master: 1780 bytes
2019-03-31 21:50:22 INFO brokerOutApi_thread_3 - register broker to name server localhost:9876 OK
2019-03-31 21:50:28 WARN PullMessageThread_2 - the consumer's group info not exist, group: consumer1
2019-03-31 21:50:28 ERROR NettyServerNIOSelector_3_3 - processRequestWrapper response to /192.168.3.16:49810 failed
java.nio.channels.ClosedChannelException: null
	at io.netty.channel.AbstractChannel$AbstractUnsafe.write(...)(Unknown Source) ~[netty-all-4.0.42.Final.jar:4.0.42.Final]
2019-03-31 21:50:28 ERROR NettyServerNIOSelector_3_3 - RemotingCommand [code=11, language=JAVA, version=293, opaque=24, flag(B)=0, remark=null, extFields={queueId=0, maxMsgNums=32, sysFlag=2, suspendTimeoutMillis=15000, commitOffset=0, topic=%RETRY%consumer1, queueOffset=0, expressionType=TAG, subVersion=1554040208756, consumerGroup=consumer1}, serializeTypeCurrentRPC=JSON]
2019-03-31 21:50:28 ERROR NettyServerNIOSelector_3_3 - RemotingCommand [code=24, language=JAVA, version=293, opaque=24, flag(B)=1, remark=the consumer's group info not exist
See http://rocketmq.apache.org/docs/faq/ for further details., extFields=null, serializeTypeCurrentRPC=JSON]
2019-03-31 21:50:52 INFO brokerOutApi_thread_4 - register broker to name server localhost:9876 OK
2019-03-31 21:51:12 INFO BrokerControllerScheduledThread1 - dispatch behind commit log 0 bytes

Пример производства и потребления

  • Используйте RocketMQ для отправки и использования тремя способами: надежный синхронный метод, надежный асинхронный метод и односторонняя доставка.
  • Использование сообщений с помощью RocketMQ

1. Добавьте зависимости

опытный способ:

 <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-client</artifactId>
        <version>4.3.0</version>
    </dependency>

2.1 Синхронная отправка и потребление

Надежная синхронная передача широко используется в важных уведомительных сообщениях, SMS-уведомлениях, системах SMS-маркетинга и других сценариях.

public class SyncProducer {
    public static void main(String[] args) throws Exception {
        //Instantiate with a producer group name.
        DefaultMQProducer producer = new
            DefaultMQProducer("please_rename_unique_group_name");
        // Specify name server addresses.
        producer.setNamesrvAddr("localhost:9876");
        //Launch the instance.
        producer.start();
        for (int i = 0; i < 100; i++) {
            //Create a message instance, specifying topic, tag and message body.
            Message msg = new Message("TopicTest" /* Topic */,
                "TagA" /* Tag */,
                ("Hello RocketMQ " +
                    i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
            );
            //Call send message to deliver message to one of brokers.
            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        }
        //Shut down once the producer instance is not longer in use.
        producer.shutdown();
    }
}

2.2 Отправка сообщений асинхронно

Асинхронные передачи часто используются в бизнес-сценариях, чувствительных ко времени отклика.

public class AsyncProducer {
    public static void main(String[] args) throws Exception {
        //Instantiate with a producer group name.
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        // Specify name server addresses.
        producer.setNamesrvAddr("localhost:9876");
        //Launch the instance.
        producer.start();
        producer.setRetryTimesWhenSendAsyncFailed(0);
        for (int i = 0; i < 100; i++) {
                final int index = i;
                //Create a message instance, specifying topic, tag and message body.
                Message msg = new Message("TopicTest",
                    "TagA",
                    "OrderID188",
                    "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                producer.send(msg, new SendCallback() {
                    @Override
                    public void onSuccess(SendResult sendResult) {
                        System.out.printf("%-10d OK %s %n", index,
                            sendResult.getMsgId());
                    }
                    @Override
                    public void onException(Throwable e) {
                        System.out.printf("%-10d Exception %s %n", index, e);
                        e.printStackTrace();
                    }
                });
        }
        //Shut down once the producer instance is not longer in use.
        producer.shutdown();
    }
}

2.3 Трансфер в одну сторону

Односторонние передачи используются в ситуациях, требующих умеренной надежности, например при сборе журналов.

public class OnewayProducer {
    public static void main(String[] args) throws Exception{
        //Instantiate with a producer group name.
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        // Specify name server addresses.
        producer.setNamesrvAddr("localhost:9876");
        //Launch the instance.
        producer.start();
        for (int i = 0; i < 100; i++) {
            //Create a message instance, specifying topic, tag and message body.
            Message msg = new Message("TopicTest" /* Topic */,
                "TagA" /* Tag */,
                ("Hello RocketMQ " +
                    i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
            );
            //Call send message to deliver message to one of brokers.
            producer.sendOneway(msg);

        }
        //Shut down once the producer instance is not longer in use.
        producer.shutdown();
    }
}

3. Пример использования сообщения

public class Consumer {

    public static void main(String[] args) throws InterruptedException, MQClientException {

        // Instantiate with specified consumer group name.
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
         
        // Specify name server addresses.
        consumer.setNamesrvAddr("localhost:9876");
        
        // Subscribe one more more topics to consume.
        consumer.subscribe("TopicTest", "*");
        // Register callback to execute on arrival of messages fetched from brokers.
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        //Launch the consumer instance.
        consumer.start();

        System.out.printf("Consumer Started.%n");
    }
}

Дополнительные примеры использования RocketMQ см.здесь.

последовательное сообщение

RocketMQ предоставляет последовательную очередь сообщений "первым пришел - первым вышел". В следующем примере показана отправка/получение глобально и частично упорядоченных сообщений.

Пример отправки сообщения

public class OrderedProducer {
    public static void main(String[] args) throws Exception {
        //Instantiate with a producer group name.
        MQProducer producer = new DefaultMQProducer("example_group_name");
        //Launch the instance.
        producer.start();
        String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
        for (int i = 0; i < 100; i++) {
            int orderId = i % 10;
            //Create a message instance, specifying topic, tag and message body.
            Message msg = new Message("TopicTestjjj", tags[i % tags.length], "KEY" + i,
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
            @Override
            public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                Integer id = (Integer) arg;
                int index = id % mqs.size();
                return mqs.get(index);
            }
            }, orderId);

            System.out.printf("%s%n", sendResult);
        }
        //server shutdown
        producer.shutdown();
    }
}

Пример сообщения о подписке

public class OrderedConsumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group_name");

        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        consumer.subscribe("TopicTest", "TagA || TagC || TagD");

        consumer.registerMessageListener(new MessageListenerOrderly() {

            AtomicLong consumeTimes = new AtomicLong(0);
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
                                                       ConsumeOrderlyContext context) {
                context.setAutoCommit(false);
                System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
                this.consumeTimes.incrementAndGet();
                if ((this.consumeTimes.get() % 2) == 0) {
                    return ConsumeOrderlyStatus.SUCCESS;
                } else if ((this.consumeTimes.get() % 3) == 0) {
                    return ConsumeOrderlyStatus.ROLLBACK;
                } else if ((this.consumeTimes.get() % 4) == 0) {
                    return ConsumeOrderlyStatus.COMMIT;
                } else if ((this.consumeTimes.get() % 5) == 0) {
                    context.setSuspendCurrentQueueTimeMillis(3000);
                    return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                }
                return ConsumeOrderlyStatus.SUCCESS;

            }
        });

        consumer.start();

        System.out.printf("Consumer Started.%n");
    }
}

широковещательное сообщение

Широковещательная рассылка — это отправка сообщения всем подписчикам мастера, и это хороший выбор, если вы хотите, чтобы все подписки получали сообщения по теме.

Пример производства

ublic class BroadcastProducer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
        producer.start();

        for (int i = 0; i < 100; i++){
            Message msg = new Message("TopicTest",
                "TagA",
                "OrderID188",
                "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        }
        producer.shutdown();
    }
}

пример потребления

public class BroadcastConsumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group_name");

        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        //set to broadcast mode
        consumer.setMessageModel(MessageModel.BROADCASTING);

        consumer.subscribe("TopicTest", "TagA || TagC || TagD");

        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                ConsumeConcurrentlyContext context) {
                System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
        System.out.printf("Broadcast Consumer Started.%n");
    }
}

сообщение по времени

Запланированные сообщения отличаются от обычных сообщений тем, что они доставляются после установленного времени задержки.

1. Запустите потребитель для ожидания входящих сообщений о подписке

public class ScheduledMessageConsumer {
    
     public static void main(String[] args) throws Exception {
         // Instantiate message consumer
         DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ExampleConsumer");
         // Subscribe topics
         consumer.subscribe("TestTopic", "*");
         // Register message listener
         consumer.registerMessageListener(new MessageListenerConcurrently() {
             @Override
             public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
                 for (MessageExt message : messages) {
                     // Print approximate delay time period
                     System.out.println("Receive message[msgId=" + message.getMsgId() + "] "
                             + (System.currentTimeMillis() - message.getStoreTimestamp()) + "ms later");
                 }
                 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
             }
         });
         // Launch consumer
         consumer.start();
     }
 }

2. Отправляйте сообщения по времени

public class ScheduledMessageProducer {
    
     public static void main(String[] args) throws Exception {
         // Instantiate a producer to send scheduled messages
         DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
         // Launch producer
         producer.start();
         int totalMessagesToSend = 100;
         for (int i = 0; i < totalMessagesToSend; i++) {
             Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
             // This message will be delivered to consumer 10 seconds later.
             message.setDelayTimeLevel(3);
             // Send the message
             producer.send(message);
         }
    
         // Shutdown producer after use.
         producer.shutdown();
     }
        
 }

messageDelayLevel=1с 5с 10с 30с 1м 2м 3м 4м 5м 6м 7м 8м 9м 10м 20м 30м 1ч 2ч Среди них уровень = 0 означает отсутствие задержки, уровень = 1 означает задержку уровня 1, уровень = 2 означает задержку уровня 2 и так далее. Можно заметить, что время потребления сообщения будет на 10 секунд позже, чем время его сохранения.

Массовая рассылка

Зачем отправлять оптом?

Пакетная отправка сообщений может повысить производительность передачи небольших сообщений.

ограничения на использование

Один и тот же пакет сообщений должен иметь: одинаковую тему, один и тот же waitstoremsgok и не поддерживает сообщения по времени.Кроме того, общий размер тела каждого отправляемого сообщения не должен превышать 1 МБ.

Как использовать массовую рассылку

Массовое использование легко, если только сообщения размером не более 1 МБ байт отправляются за раз.

String topic = "BatchTest";
List<Message> messages = new ArrayList<>();
messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes()));
try {
    producer.send(messages);
} catch (Exception e) {
    e.printStackTrace();
    //handle the error
}
    

Разбить большое число на списки

Сложность пакетной отправки увеличивается только при отправке больших пакетов сообщений, и может быть невозможно определить, превышает ли размер тела сообщения, отправляемого пакетами, предельный размер 1 МБ.В это время вам необходимо разделить сообщение на список

public class ListSplitter implements Iterator<List<Message>> {
    private final int SIZE_LIMIT = 1000 * 1000;
    private final List<Message> messages;
    private int currIndex;
    public ListSplitter(List<Message> messages) {
            this.messages = messages;
    }
    @Override public boolean hasNext() {
        return currIndex < messages.size();
    }
    @Override public List<Message> next() {
        int nextIndex = currIndex;
        int totalSize = 0;
        for (; nextIndex < messages.size(); nextIndex++) {
            Message message = messages.get(nextIndex);
            int tmpSize = message.getTopic().length() + message.getBody().length;
            Map<String, String> properties = message.getProperties();
            for (Map.Entry<String, String> entry : properties.entrySet()) {
                tmpSize += entry.getKey().length() + entry.getValue().length();
            }
            tmpSize = tmpSize + 20; //for log overhead
            if (tmpSize > SIZE_LIMIT) {
                //it is unexpected that single message exceeds the SIZE_LIMIT
                //here just let it go, otherwise it will block the splitting process
                if (nextIndex - currIndex == 0) {
                   //if the next sublist has no element, add this one and then break, otherwise just break
                   nextIndex++;  
                }
                break;
            }
            if (tmpSize + totalSize > SIZE_LIMIT) {
                break;
            } else {
                totalSize += tmpSize;
            }
    
        }
        List<Message> subList = messages.subList(currIndex, nextIndex);
        currIndex = nextIndex;
        return subList;
    }
}
//then you could split the large list into small ones:
ListSplitter splitter = new ListSplitter(messages);
while (splitter.hasNext()) {
   try {
       List<Message>  listItem = splitter.next();
       producer.send(listItem);
   } catch (Exception e) {
       e.printStackTrace();
       //handle the error
   }
}

Пример фильтра сообщений

В большинстве случаев теги представляют собой простую и удобную конструкцию для выбора нужного сообщения, например:

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_EXAMPLE");
consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");

В приведенном выше примере потребитель получит сообщение, содержащее TAGA, TAGB или TAGC, но ограничение состоит в том, что сообщение может иметь только один тег, что может не подходить для сложных сценариев.В этом случае можно использовать выражения SQL. исходящие сообщения.

принцип

Функция SQL может выполнять некоторые вычисления через свойства, введенные при отправке сообщения.Согласно синтаксису, определенному RocketMQ.Вы можете реализовать некоторую интересную логику

------------
| message  |
|----------|  a > 5 AND b = 'abc'
| a = 10   |  --------------------> Gotten
| b = 'abc'|
| c = true |
------------
------------
| message  |
|----------|   a > 5 AND b = 'abc'
| a = 1    |  --------------------> Missed
| b = 'abc'|
| c = true |
------------

грамматика

RocketMQ определяет только базовый синтаксис для поддержки этой функции, и вы также можете легко расширить его.

  1. Numeric comparison, like >, >=, <, <=, BETWEEN, =;
  2. Character comparison, like =, <>, IN;
  3. IS NULL or IS NOT NULL;
  4. Logical AND, OR, NOT;

Тип константы:

  1. Numeric, like 123, 3.1415;
  2. Символ, такой как «abc», должен быть заключен в одинарные кавычки;
  3. NULL, special constant;
  4. Boolean, TRUE or FALSE;

ограничения на использование

Только push использует это для выбора сообщений через sql92.Интерфейс выглядит следующим образом:

public void subscribe(final String topic, final MessageSelector messageSelector)

Пример производства

Свойства можно задавать в сообщении методом putUserProperty при отправке.

DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.start();

Message msg = new Message("TopicTest",
    tag,
    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
// Set some properties.
msg.putUserProperty("a", String.valueOf(i));

SendResult sendResult = producer.send(msg);
   
producer.shutdown();

экземпляр потребления

Используйте MessageSelector.bySql для использования сообщений.

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");

// only subsribe messages have property a, also a >=0 and a <= 3
consumer.subscribe("TopicTest", MessageSelector.bySql("a between 0 and 3");

consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});
consumer.start();

открытое сообщение

Open Messaging, который включает в себя определение отраслевых рекомендаций и спецификаций обмена сообщениями, потоковой передачи, обеспечивает общую структуру для финансов, электронной коммерции, IoT и больших данных.Принципы проектирования ориентированы на облако, просты, гибки и не зависят от языка для разнородных Соответствие этим спецификациям позволит разрабатывать гетерогенные приложения для обмена сообщениями на всех основных платформах и операционных системах.

RocketMQ предоставляет частичную реализацию OpenMessaging 0.1.0-alpha.В следующем примере демонстрируется доступ к RocketMQ на основе OpenMessaging.

OMSProducer

В следующих примерах показано, как использовать RocketMQ для отправки синхронных сообщений, асинхронных сообщений и односторонних сообщений.

public class OMSProducer {
    public static void main(String[] args) {
        final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
            .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");

        final Producer producer = messagingAccessPoint.createProducer();

        messagingAccessPoint.startup();
        System.out.printf("MessagingAccessPoint startup OK%n");

        producer.startup();
        System.out.printf("Producer startup OK%n");

        {
            Message message = producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8")));
            SendResult sendResult = producer.send(message);
            System.out.printf("Send sync message OK, msgId: %s%n", sendResult.messageId());
        }

        {
            final Promise<SendResult> result = producer.sendAsync(producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
            result.addListener(new PromiseListener<SendResult>() {
                @Override
                public void operationCompleted(Promise<SendResult> promise) {
                    System.out.printf("Send async message OK, msgId: %s%n", promise.get().messageId());
                }

                @Override
                public void operationFailed(Promise<SendResult> promise) {
                    System.out.printf("Send async message Failed, error: %s%n", promise.getThrowable().getMessage());
                }
            });
        }

        {
            producer.sendOneway(producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
            System.out.printf("Send oneway message OK%n");
        }

        producer.shutdown();
        messagingAccessPoint.shutdown();
    }
}

OMSPullConsumer

Используйте OMSPullConsumer для извлечения сообщений из специальной очереди.

public class OMSPullConsumer {
    public static void main(String[] args) {
        final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
            .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");

        final PullConsumer consumer = messagingAccessPoint.createPullConsumer("OMS_HELLO_TOPIC",
            OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "OMS_CONSUMER"));

        messagingAccessPoint.startup();
        System.out.printf("MessagingAccessPoint startup OK%n");
        
        consumer.startup();
        System.out.printf("Consumer startup OK%n");

        Message message = consumer.poll();
        if (message != null) {
            String msgId = message.headers().getString(MessageHeader.MESSAGE_ID);
            System.out.printf("Received one message: %s%n", msgId);
            consumer.ack(msgId);
        }

        consumer.shutdown();
        messagingAccessPoint.shutdown();
    }
}

OMSPushConsumer

Присоедините PushConsumer OMS к указанной очереди и используйте сообщения с помощью MessageListenner.

public class OMSPushConsumer {
    public static void main(String[] args) {
        final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
            .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");

        final PushConsumer consumer = messagingAccessPoint.
            createPushConsumer(OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "OMS_CONSUMER"));

        messagingAccessPoint.startup();
        System.out.printf("MessagingAccessPoint startup OK%n");

        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
            @Override
            public void run() {
                consumer.shutdown();
                messagingAccessPoint.shutdown();
            }
        }));
        
        consumer.attachQueue("OMS_HELLO_TOPIC", new MessageListener() {
            @Override
            public void onMessage(final Message message, final ReceivedMessageContext context) {
                System.out.printf("Received one message: %s%n", message.headers().getString(MessageHeader.MESSAGE_ID));
                context.ack();
            }
        });
        
    }
}

Пример транзакционного сообщения

Что такое транзакционное сообщение?

Думайте об этом как о двухэтапной реализации сообщения фиксации, чтобы обеспечить конечную согласованность в распределенных системах. Транзакционные сообщения гарантируют, что выполнение локальных транзакций и отправка сообщений выполняются автоматически.

ограничения на использование

  • Транзакционные сообщения не поддерживают синхронизированные и массовые операции.
  • Во избежание многократной проверки одного сообщения, что приводит к невыполнению половины сообщений в очереди, мы по умолчанию ограничиваем количество проверок отдельных сообщений до 15. Пользователи могут изменить это ограничение, изменив параметр «TransactionCheckMax» в Конфигурация брокера. Если сообщение проверено. Если количество раз превышает «TransactionCheckMax», настроенное в агенте, агент отбрасывает сообщение и распечатывает журнал ошибок по умолчанию. Пользователь может изменить это поведение, переопределив класс «AbstractTransationCheckListener». .
  • Сообщения о транзакциях будут проверяться в рамках события transactionTimeout, этот параметр можно настроить в конфигурации брокера, а также пользователь может изменить это ограничение, установив пользовательское свойство "CHECK_IMMUNITY_TIME_IN_SECONDS". При отправке транзакционных сообщений этот параметр имеет приоритет над параметром «transactionMsgTimeout».
  • Сообщения о транзакциях могут быть проверены или использованы несколько раз.
  • Перемещение отправленных сообщений в целевую тему пользователя может завершиться ошибкой. В настоящее время это зависит от ведения журнала. Высокая доступность гарантируется собственным механизмом высокой доступности RocketMQ.Если вы хотите гарантировать, что сообщения транзакций не будут потеряны и гарантируется целостность транзакций, рекомендуется использовать синхронный механизм двойной записи.
  • Идентификатор производителя транзакционных сообщений не может быть передан с помощью идентификаторов производителей других типов сообщений. В отличие от других типов сообщений, транзакционные сообщения позволяют обратные запросы. Сервер MQ клиента запрашивает идентификатор производителя.

Транзакционный статус

  • TransactionStatus.CommitTransaction: фиксирует транзакцию, что означает, что потребителю разрешено использовать сообщение.
  • TransactionStatus.RollbackTransaction: откат транзакции, что означает, что сообщение будет удалено и не будет разрешено для использования.
  • TransactionStatus.Unknown: промежуточный статус, что означает, что MQ необходимо проверить, чтобы определить статус.

Создать сообщение о транзакции

Используя класс TransactionMQProducer для создания клиента производителя и указав уникальную группу производителей, вы можете настроить собственный пул потоков для обработки запросов на проверку. После выполнения локальной транзакции вам необходимо ответить MQ в соответствии с результатом выполнения, а статус ответа описан в разделе выше.

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;

public class TransactionProducer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        TransactionListener transactionListener = new TransactionListenerImpl();
        TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
        ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setName("client-transaction-msg-check-thread");
                return thread;
            }
        });

        producer.setExecutorService(executorService);
        producer.setTransactionListener(transactionListener);
        producer.start();

        String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
        for (int i = 0; i < 10; i++) {
            try {
                Message msg =
                    new Message("TopicTest1234", tags[i % tags.length], "KEY" + i,
                        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.sendMessageInTransaction(msg, null);
                System.out.printf("%s%n", sendResult);

                Thread.sleep(10);
            } catch (MQClientException | UnsupportedEncodingException e) {
                e.printStackTrace();
            }
        }

        for (int i = 0; i < 100000; i++) {
            Thread.sleep(1000);
        }
        producer.shutdown();
    }
}

Использование транзакционных сообщений

Метод «executeLocalTransaction» используется для выполнения локальной транзакции при успешной отправке полусообщения. Он возвращает одно из трех состояний транзакции, упомянутых в предыдущем разделе. Метод «checkLocalTransaction» используется для проверки состояния локальной транзакции и ответа на запросы проверки MQ. Он также возвращает одно из трех состояний транзакции, упомянутых в предыдущем разделе.

   import ...
   
   public class TransactionListenerImpl implements TransactionListener {
       private AtomicInteger transactionIndex = new AtomicInteger(0);
   
       private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
   
       @Override
       public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
           int value = transactionIndex.getAndIncrement();
           int status = value % 3;
           localTrans.put(msg.getTransactionId(), status);
           return LocalTransactionState.UNKNOW;
       }
   
       @Override
       public LocalTransactionState checkLocalTransaction(MessageExt msg) {
           Integer status = localTrans.get(msg.getTransactionId());
           if (null != status) {
               switch (status) {
                   case 0:
                       return LocalTransactionState.UNKNOW;
                   case 1:
                       return LocalTransactionState.COMMIT_MESSAGE;
                   case 2:
                       return LocalTransactionState.ROLLBACK_MESSAGE;
               }
           }
           return LocalTransactionState.COMMIT_MESSAGE;
       }
   }