Краткое содержание цикла статей (обновление)
- Серия анализа исходного кода
- Анализ технологии SpringBoot Series-FatJar
- SpringBoot Series — Анализ процесса запуска
- SpringBoot Series — Анализ механизма событий
- Серия SpringBoot — жизненный цикл и расширение компонентов Bean
- SpringBoot Series — Анализ структуры логов
- SpringBoot Series — анализ доступа к ресурсам
- SpringBoot Series — Анализ встроенного веб-контейнера
- Серия SpringBoot — Анализ конфигурации
- Серия SpringBoot — автоматическая настройка и анализ механизма запуска
- Практическая серия
Быстрый старт RocketMQ
Введение в RocketMQ: Apache RocketMQ — это распределенная платформа обмена сообщениями и потоковой передачи с малой задержкой, высокой производительностью и надежностью, емкостью триллионного уровня и гибкой масштабируемостью. Он предоставляет множество функций, конкретная ссылка:GitHub.com/Apache/рок….
Как указано в официальном руководстве Quick Start, для установки RocketMQ требуются следующие условия:
- 64-битная ОС, рекомендуется Linux/Unix/Mac
- 64bit JDK 1.8+
- Maven 3.2.x
- 4G+ бесплатного диска для сервера Broker (это требует особого внимания)
Скачать, установить и скомпилировать
wget https://archive.apache.org/dist/rocketmq/4.7.0/rocketmq-all-4.7.0-source-release.zip
unzip rocketmq-all-4.7.0-source-release.zip
cd rocketmq-all-4.7.0/
mvn -Prelease-all -DskipTests clean install -U
cd distribution/target/rocketmq-4.7.0/rocketmq-4.7.0
1. Запустите сервер имен
> nohup sh bin/mqnamesrv &
> tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...
2. Запустите брокера
> nohup sh bin/mqbroker -n localhost:9876 &
# nohup sh bin/mqbroker -n localhost:9876 autoCreateTopicEnable=true &
> tail -f ~/logs/rocketmqlogs/broker.log
The broker[%s, 172.30.30.233:10911] boot success...
autoCreateTopicEnable: при использовании RocketMQ для отправки сообщения необходимо указать тему. Для настройки темы существует переключатель autoCreateTopicEnable. Как правило, в среде разработки и тестирования используется настройка по умолчанию autoCreateTopicEnable = true, но это сделает настройку темы трудно стандартизировать управление, нет единого аудита и т. д., поэтому в официальной среде параметр autoCreateTopicEnable=false будет установлен при запуске Брокера. Таким образом, когда вам нужно добавить тему, вам нужно добавить ее в веб-интерфейсе управления или через инструменты администратора.
Интеграция SpringBoot
RocketMQ в настоящее время не предоставляет стартер, интегрирующий SpringBoot, поэтому теперь доступ программируется путем внедрения клиентов. Давайте посмотрим на процесс интеграции SpringBoot с RocketMQ.
Познакомить с клиентскими зависимостями RocketMQ.
Последняя версия, которая в настоящее время обновляется на github, — это версия 4.7.0, и последняя версия используется здесь:
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.7.0</version>
</dependency>
Предоставляет классы автоматической настройки для производителей
/**
* @author: guolei.sgl (glmapper_2018@163.com) 2020/4/5 5:17 PM
* @since:
**/
@Configuration
public class MQProducerConfiguration {
public static final Logger LOGGER = LoggerFactory.getLogger(MQProducerConfiguration.class);
@Value("${rocketmq.producer.groupName}")
private String groupName;
@Value("${rocketmq.producer.namesrvAddr}")
private String namesrvAddr;
@Value("${rocketmq.producer.maxMessageSize}")
private Integer maxMessageSize;
@Value("${rocketmq.producer.sendMsgTimeout}")
private Integer sendMsgTimeout;
@Value("${rocketmq.producer.retryTimesWhenSendFailed}")
private Integer retryTimesWhenSendFailed;
@Bean
@ConditionalOnMissingBean
public DefaultMQProducer defaultMQProducer() throws RuntimeException {
DefaultMQProducer producer = new DefaultMQProducer(this.groupName);
producer.setNamesrvAddr(this.namesrvAddr);
producer.setCreateTopicKey("AUTO_CREATE_TOPIC_KEY");
//如果需要同一个 jvm 中不同的 producer 往不同的 mq 集群发送消息,需要设置不同的 instanceName
//producer.setInstanceName(instanceName);
//如果发送消息的最大限制
producer.setMaxMessageSize(this.maxMessageSize);
//如果发送消息超时时间
producer.setSendMsgTimeout(this.sendMsgTimeout);
//如果发送消息失败,设置重试次数,默认为 2 次
producer.setRetryTimesWhenSendFailed(this.retryTimesWhenSendFailed);
try {
producer.start();
LOGGER.info("producer is started. groupName:{}, namesrvAddr: {}", groupName, namesrvAddr);
} catch (MQClientException e) {
LOGGER.error("failed to start producer.", e);
throw new RuntimeException(e);
}
return producer;
}
}
- groupName: настройка для отправки сообщений одного и того же типа — это одна и та же группа, которая гарантированно будет уникальной. По умолчанию никаких настроек не требуется. Rocketmq будет использовать ip@pid (pid представляет имя jvm) в качестве уникального идентификатора.
- namesrvAddr: адрес сервера имен
- maxMessageSize: максимальный размер сообщения, по умолчанию 4M.
- sendMsgTimeout: время ожидания отправки сообщения, по умолчанию 3 секунды
- retryTimesWhenSendFailed: количество повторных попыток отправки сообщения, по умолчанию 2 раза.
Предоставляет класс автоматической настройки для потребителей
@Configuration
public class MQConsumerConfiguration {
public static final Logger LOGGER = LoggerFactory.getLogger(MQConsumerConfiguration.class);
@Value("${rocketmq.consumer.namesrvAddr}")
private String namesrvAddr;
@Value("${rocketmq.consumer.groupName}")
private String groupName;
@Value("${rocketmq.consumer.consumeThreadMin}")
private int consumeThreadMin;
@Value("${rocketmq.consumer.consumeThreadMax}")
private int consumeThreadMax;
// 订阅指定的 topic
@Value("${rocketmq.consumer.topics}")
private String topics;
@Value("${rocketmq.consumer.consumeMessageBatchMaxSize}")
private int consumeMessageBatchMaxSize;
@Autowired
private MessageListenerHandler mqMessageListenerProcessor;
@Bean
@ConditionalOnMissingBean
public DefaultMQPushConsumer defaultMQPushConsumer() throws RuntimeException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);
consumer.setNamesrvAddr(namesrvAddr);
consumer.setConsumeThreadMin(consumeThreadMin);
consumer.setConsumeThreadMax(consumeThreadMax);
consumer.registerMessageListener(mqMessageListenerProcessor);
// 设置 consumer 第一次启动是从队列头部开始消费还是队列尾部开始消费
// 如果非第一次启动,那么按照上次消费的位置继续消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// 设置消费模型,集群还是广播,默认为集群
consumer.setMessageModel(MessageModel.CLUSTERING);
// 设置一次消费消息的条数,默认为 1 条
consumer.setConsumeMessageBatchMaxSize(consumeMessageBatchMaxSize);
try {
// 设置该消费者订阅的主题和tag,如果是订阅该主题下的所有tag,使用*;
consumer.subscribe(topics, "*");
// 启动消费
consumer.start();
LOGGER.info("consumer is started. groupName:{}, topics:{}, namesrvAddr:{}",groupName,topics,namesrvAddr);
} catch (Exception e) {
LOGGER.error("failed to start consumer . groupName:{}, topics:{}, namesrvAddr:{}",groupName,topics,namesrvAddr,e);
throw new RuntimeException(e);
}
return consumer;
}
}
Параметры относятся к разделу производителя выше. Конфигурация здесь — это только мониторинг активированного потребителя, а для конкретного потребления необходимо реализовать интерфейс MessageListenerConcurrently.
/**
* @author: guolei.sgl (glmapper_2018@163.com) 2020/4/5 5:21 PM
* @since:
**/
@Component
public class MessageListenerHandler implements MessageListenerConcurrently {
private static final Logger LOGGER = LoggerFactory.getLogger(MessageListenerHandler.class);
private static String TOPIC = "DemoTopic";
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
if (CollectionUtils.isEmpty(msgs)) {
LOGGER.info("receive blank msgs...");
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
MessageExt messageExt = msgs.get(0);
String msg = new String(messageExt.getBody());
if (messageExt.getTopic().equals(TOPIC)) {
// mock 消费逻辑
mockConsume(msg);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
private void mockConsume(String msg){
LOGGER.info("receive msg: {}.", msg);
}
}
Отправить сообщение с помощью клиента
Логика использования клиента для отправки сообщений относительно проста, то есть получить объект DefaultMQProducer, вызвать метод отправки и поддерживать синхронные, асинхронные, односторонние и другие методы вызова.
@RestController
public class TestController {
private static final Logger LOGGER = LoggerFactory.getLogger(TestController.class);
private static String TOPIC = "DemoTopic";
private static String TAGS = "glmapperTags";
@Autowired
private DefaultMQProducer defaultMQProducer;
@RequestMapping("send")
public String test() throws Throwable {
Message msg = new Message(TOPIC, TAGS, ("Say Hello RocketMQ to Glmapper").getBytes(RemotingHelper.DEFAULT_CHARSET));
// 调用客户端发送消息
SendResult sendResult = defaultMQProducer.send(msg);
LOGGER.info("sendResult: {}.",sendResult);
return "SUCCESS";
}
}
контрольная работа
Тестовое приложение здесь должно объединить производственную и потребительскую стороны, поэтому конфигурация выглядит следующим образом:
spring.application.name=test-rocket
server.port=8008
#producer
rocketmq.producer.isOnOff=on #该应用是否启用生产者
rocketmq.producer.groupName=${spring.application.name}
rocketmq.producer.namesrvAddr=sofa.cloud.alipay.net:9876
rocketmq.producer.maxMessageSize=4096
rocketmq.producer.sendMsgTimeout=3000
rocketmq.producer.retryTimesWhenSendFailed=2
#consumer
rocketmq.consumer.isOnOff=on #该应用是否启用消费者
rocketmq.consumer.groupName=${spring.application.name}
rocketmq.consumer.namesrvAddr=sofa.cloud.alipay.net:9876
rocketmq.consumer.topics=DemoTopic
rocketmq.consumer.consumeThreadMin=20
rocketmq.consumer.consumeThreadMax=64
rocketmq.consumer.consumeMessageBatchMaxSize=1
Запустите программу и посмотрите вывод журнала:
2020-04-05 22:53:15.141 INFO 46817 --- [ main] c.g.b.b.c.MQProducerConfiguration : producer is started. groupName:test-rocket, namesrvAddr: sofa.cloud.alipay.net:9876
2020-04-05 22:53:15.577 INFO 46817 --- [ main] c.g.b.b.c.MQConsumerConfiguration : consumer is started. groupName:test-rocket, topics:DemoTopic, namesrvAddr:sofa.cloud.alipay.net:9876
Здесь видно, что автоматическая настройка производителей и потребителей вступила в силу и запуск завершен. Инициировать отправку сообщения с помощью curl localhost:8008/send:
2020-04-05 22:54:21.654 INFO 46817 --- [nio-8008-exec-1] c.g.b.boot.controller.TestController : sendResult: SendResult [sendStatus=SEND_OK, msgId=1E0FC3A2B6E118B4AAC21983B3C50000, offsetMsgId=64583D7C00002A9F0000000000011788, messageQueue=MessageQueue [topic=DemoTopic, brokerName=sofa.cloud.alipay.net, queueId=6], queueOffset=50].
2020-04-05 22:54:21.658 INFO 46817 --- [MessageThread_1] c.g.b.b.p.MessageListenerHandler : receive msg: Say Hello RocketMQ to Glmapper.
Смотрите журнал отправленных сообщений и журнал полученных сообщений.
Перехватывать сообщения с помощью хуков
RockKetMQ предоставляет два интерфейса ловушек: интерфейсы SendMessageHook и ConsumeMessageHook, которые можно использовать для перехвата сообщений до и после отправки сообщения, до и после потребления сообщения.В официальном документе нет описания этой части, так что приступим Давайте посмотрим, как используйте эти два интерфейса ловушек, чтобы что-то сделать.
SendMessageHook
Настройте ProducerTestHook с помощью следующего кода:
public class ProducerTestHook implements SendMessageHook {
public static final Logger LOGGER = LoggerFactory.getLogger(ProducerTestHook.class);
@Override
public String hookName() {
return ProducerTestHook.class.getName();
}
@Override
public void sendMessageBefore(SendMessageContext sendMessageContext) {
LOGGER.info("execute sendMessageBefore. sendMessageContext:{}", sendMessageContext);
}
@Override
public void sendMessageAfter(SendMessageContext sendMessageContext) {
LOGGER.info("execute sendMessageAfter. sendMessageContext:{}", sendMessageContext);
}
}
В классе автоконфигурации вышеприведенного производителя зарегистрируйте ProducerTestHook у производителя.
// 注册 SendMessageHook
producer.getDefaultMQProducerImpl().registerSendMessageHook(new ProducerTestHook());
ConsumeMessageHook
Настройте ConsumerTestHook с помощью следующего кода:
public class ConsumerTestHook implements ConsumeMessageHook {
public static final Logger LOGGER = LoggerFactory.getLogger(ConsumerTestHook.class);
@Override
public String hookName() {
return ConsumerTestHook.class.getName();
}
@Override
public void consumeMessageBefore(ConsumeMessageContext consumeMessageContext) {
LOGGER.info("execute consumeMessageBefore. consumeMessageContext: {}",consumeMessageContext);
}
@Override
public void consumeMessageAfter(ConsumeMessageContext consumeMessageContext) {
LOGGER.info("execute consumeMessageAfter. consumeMessageContext: {}",consumeMessageContext);
}
}
В классе автоматической настройки потребителя выше зарегистрируйте ConsumerTestHook с потребителем.
// 注册 ConsumeMessageHook
consumer.getDefaultMQPushConsumerImpl().registerConsumeMessageHook(new ConsumerTestHook());
Результат выполнения следующий:
execute sendMessageBefore. sendMessageContext:org.apache.rocketmq.client.hook.SendMessageContext@a50ea34
execute sendMessageAfter. sendMessageContext:org.apache.rocketmq.client.hook.SendMessageContext@a50ea34
sendResult: SendResult [sendStatus=SEND_OK, msgId=0A0FE8F8C02F18B4AAC21C1275FB0000, offsetMsgId=64583D7C00002A9F0000000000011850, messageQueue=MessageQueue [topic=DemoTopic, brokerName=sofa.cloud.alipay.net, queueId=5], queueOffset=50].
execute consumeMessageBefore. consumeMessageContext: org.apache.rocketmq.client.hook.ConsumeMessageContext@6482209a
receive msg: Say Hello RocketMQ to Glmapper.
execute consumeMessageAfter. consumeMessageContext: org.apache.rocketmq.client.hook.ConsumeMessageContext@6482209a
возникли некоторые проблемы
Несколько проблем, возникших в процессе интеграции, записываются следующим образом:
1. Брокер не запустился.
Ситуация, с которой я столкнулся во время тестирования, заключается в том, что после запуска сервера имен при запуске Boker соединение ssh напрямую выдает сообщение об ошибке подключения.dmesg | egrep -i -B100 'killed process'
Просмотрите запись процесса, который был убит, и получите следующий журнал:
[2257026.030741] Memory cgroup out of memory: Kill process 110719 (systemd) score 0 or sacrifice child
[2257026.031888] Killed process 100735 (sh) total-vm:15708kB, anon-rss:176kB, file-rss:1800kB, shmem-rss:0kB
[2257026.133506] Memory cgroup out of memory: Kill process 110719 (systemd) score 0 or sacrifice child
[2257026.133539] Killed process 100745 (vsar) total-vm:172560kB, anon-rss:22936kB, file-rss:1360kB, shmem-rss:0kB
[2257026.206872] Memory cgroup out of memory: Kill process 104617 (java) score 3 or sacrifice child
[2257026.207742] Killed process 104617 (java) total-vm:9092924kB, anon-rss:4188528kB, file-rss:496kB, shmem-rss:0kB
Вывод, который можно сделать здесь, заключается в том, что произошел OOM, вызванный недостаточным выделением места при запуске (начальный параметр памяти в файле конфигурации по умолчанию слишком велик). Решение такое: после компиляции заходим в директорию дистрибутив/target/apache-rocketmq/bin и находим там два файла скрипта, runbroker.sh и runserver.sh.Эти два скрипта понимают, что параметры указанные по умолчанию при запуске очень большие ( 4 г / 8 г / 2 г), моя автономная тестовая машина всего 1c2 г, поэтому я соответствующим образом отрегулировал следующие параметры:
- runserver.sh
JAVA_OPT="${JAVA_OPT} -server -Xms128m -Xmx256m -Xmn256m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
- runbroker.sh
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m"
Перезапустите namerv и брокер после модификации, это нормально
$ jps
98633 Jps
55689 BrokerStartup
54906 NamesrvStartup
2. Нет информации о маршруте темы, xxx
Это упоминается в официальном FAQ, указывая на то, что частота встреч должна быть очень высокой. Официальный план можно объяснить подробноздесь rocketmq.apache.org/docs/faq/Статья 4. я пассЕсли вы не можете найти эту тему, создайте ее на брокере с помощью команды updateTopic инструментов администратора или веб-консоли.Это решает:
sh mqadmin updateTopic -b localhost:10911 -n localhost:9876 -t DemoTopic # 执行此指令,创建 DemoTopic
RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0).
RocketMQLog:WARN Please initialize the logger system properly.
create topic to localhost:10911 success.
TopicConfig [topicName=DemoTopic, readQueueNums=8, writeQueueNums=8, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false]
Суммировать
Я видел часть кода RocketMQ, когда работал над интегрированным компонентом сообщений SOFATracer, но в реальной работе он все же сэкономил много обходных путей. В целом, интеграция RocketMQ в SpringBoot относительно проста, поэтому давайте запишем это здесь. Если есть ошибка в описании в тексте, пожалуйста, оставьте сообщение, чтобы исправить ее.