предисловие
Основные особенности MQ:разъединение,асинхронный,отсечение пика, эта статья в основном записывает и публикует личный RocketMQ в реальных проектах.отсечение пикаиспользование, используемое для снижения нагрузки на базу данных в бизнес-сценариях, где концепции основных компонентов RocketMQ следующие:
- Производитель: производит отправку сообщений
- Брокер: хранит сообщения, отправленные продюсером.
- Потребитель: получать сообщения от брокера и потреблять их.
- Inserver: для брокера для производителя или маршрутизации потребителей
Среди них процесс потребления имеет следующие моменты, на которые необходимо обратить внимание:
- Потребитель RocketMQ получает сообщения, отправляя запросы на вытягивание Брокеру, а не так, как Брокер отправляет Потребителю для получения.
- Каждый раз, когда Consumer тянет сообщение, сообщение будет равномерно распределено в очередь сообщений, а затем передано, поэтому многие параметры в RocketMQ относятся к очереди, а не к теме (это ключевой момент, кстати, документ с исходным кодом действительно непонятно, многие требуют проб и ошибок, но Dashboard делает свою работу хорошо), где количество каждой очереди сообщений Broker (ConsumeQueue) можно настроить в реальном времени через RocketMQ DashBoard.
rocketmq-spring-boot-starterВведение
Подумайте об использовании RocketMQ, когда вам нужно быстро интегрировать RocketMQ в разработку.rocketmq-spring-boot-starterСоздайте интегрированную среду RocketMQ, но инфраструктура не полностью поддерживает все упрощения конфигурации RocketMQ.Если вам нужно использовать сообщения в пакетах, вам нужно настроить bean-компонент DefaultMQPushConsumer для использования.
Лично использовал в разработкеrocketmq-spring-boot-starter
Связанные классы:
-
RocketMQListener
Интерфейс: потребители должны реализовать метод потребления этого интерфейса.onMessage(msg)
. -
RocketMQPushConsumerLifecycleListener
интерфейс: когда@RocketMQMessageListener
Когда конфигурации недостаточно для удовлетворения наших потребностей, мы можем реализовать этот интерфейс, чтобы напрямую изменить потребительский класс.DefaultMQPushConsumer
настроить -
@RocketMQMessageListener
: Аннотируется этой аннотацией и реализует интерфейсRocketMQListener
Бин является потребителем и прослушивает сообщения в указанной очереди темы.Аннотация содержит некоторые общие конфигурации потребителей (большинство из них могут быть по умолчанию).Как правило, вам нужно только изменить ConsumerGroup (группу потребителей) и тему.RocketMQMessageListener
Конфигурацию атрибута можно получить из файла конфигурации или центра конфигурации с помощью Placeholder (заполнитель), как показано на следующем рисунке:
бизнес-кейс
Есть подобный бизнес, который не ограничивает количество лайков пользователей, а только требует записи (требования к продукту, предложения по развитию недействительны), когда каждый пользователь выполняет x последовательных кликов, чтобы насладиться кайфом от всплеска числа, если база данных должна выполнить x точек Как и вставка данных, база данных, несомненно, будет заблокирована и вызовет сбой. Вот я и подумал, что можно попробовать пиковое шейвинг MQ.Например, 5000 сообщений в секунду, а база данных выдерживает только 2000. Тогда мне нужно будет тянуть только 1600 каждый раз, когда я потребляю, а остальное будет аккумулироваться в Брокере и съедается медленно.. Поскольку предыдущий центр сообщений также использовал RocketMQ, было подтверждено, что RocketMQ использовался для сглаживания пиков.
Конфигурация среды
Пример среды для статьи: 1NameServer + 2Broker + 1Consumer
добавить зависимости maven
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
</dependency>
конфигурация application.yml
rocketmq:
name-server: 127.0.0.1:9876
producer:
group: praise-group
server:
port: 10000
spring:
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
username: root
password: tiger
url: jdbc:mysql://localhost:3306/wilson
swagger:
docket:
base-package: io.rocket.consumer.controller
Нравится интерфейс
PraiseRecord (как запись):
@Data
public class PraiseRecord implements Serializable {
private Long id;
private Long uid;
private Long liveId;
private LocalDateTime createTime;
}
MessageController (простой тестовый интерфейс):
RestController
@RequestMapping("/message")
public class MessageController {
@Resource
private RocketMQTemplate rocketMQTemplate;
@PostMapping("/praise")
public ServerResponse praise(@RequestBody PraiseRecordVO vo) {
rocketMQTemplate.sendOneWay(RocketConstant.Topic.PRAISE_TOPIC, MessageBuilder.withPayload(vo).build());
return ServerResponse.success();
}
// ......
}
Поскольку пользователи могут лайкать непрерывно, считается, что обработка подобных сообщений может быть смягчена (чтобы позволить потерю сообщений) для достижения более высокой производительности, поэтому выберите использованиеsendOneyWay()
Отправить сообщение.
Методы отправки сообщений RocketMQ в основном включают синхронную отправку syncSend(), асинхронную отправку asyncSend() и sendOneWay()., но пропускная способность выше, что необходимо выбирать в соответствии с бизнес-ситуацией. Производительность: sendOneWay > asyncSend > syncSend Метод send() RocketMQTemplate по умолчанию является синхронным (syncSend) Дополнительные сведения см. в реализации исходного кода.
PraiseListener: Нравится потребитель сообщений
@Service
@RocketMQMessageListener(topic = RocketConstant.Topic.PRAISE_TOPIC, consumerGroup = RocketConstant.ConsumerGroup.PRAISE_CONSUMER)
@Slf4j
public class PraiseListener implements RocketMQListener<PraiseRecordVO>, RocketMQPushConsumerLifecycleListener {
@Resource
private PraiseRecordService praiseRecordService;
@Override
public void onMessage(PraiseRecordVO vo) {
praiseRecordService.insert(vo.copyProperties(PraiseRecord::new));
}
@Override
public void prepareStart(DefaultMQPushConsumer consumer) {
// 每次拉取的间隔,单位为毫秒
consumer.setPullInterval(2000);
// 设置每次从队列中拉取的消息数为16
consumer.setPullBatchSize(16);
}
}
Брокер хранит максимальное количество одиночных сообщений pull.MessageStoreConfig.maxTransferCountOnMessageInMemory
(по умолчанию 32) ограничение значения, то есть, если вы хотите, чтобы количество сообщений, вытягиваемых потребителями из очереди, было больше 32 действительных (pullBatchSize>32), вам необходимо изменить параметры запуска БрокераmaxTransferCountOnMessageInMemory
ценность. В параметрах конфигурации ограничения пика MQ следующееDefaultMQPushConsumer
На параметры нужно обращать внимание:
- pullInterval: интервал между каждым пулл-сообщением от брокера в миллисекундах.
- pullBatchSize: количество сообщений, каждый раз извлекаемых из очереди брокера.Этот параметр легко понять неправильно.Сначала я подумал, что это общее количество сообщений, извлекаемых каждый раз, но после нескольких тестов было подтверждено, что это на самом деле из каждой очереди.Количество подтягиваний (комментарии к исходному коду действительно скудные, равно как и нет), то есть общее количество сообщений, подтягиваемых Consume каждый раз, следующее:
EachPullTotal=所有Broker上的写队列数和(writeQueueNums=readQueueNums) * pullBatchSize
- потребленияMessageBatchMaxSize: максимальное количество сообщений на потребление (то есть объединение нескольких сообщений в потребление списка), значение по умолчанию равно 1,rocketmq-spring-boot-starterПакетное потребление в настоящее время не поддерживается (версия 2.1.0)
Когда потребитель начинает потребление сообщений, он сначала извлекает сообщение из каждой очереди для потребления, а затем извлекает его на число pullBatchSize каждый раз после успешного потребления.
В PraiseListener интервал для каждого извлечения установлен на 2 с, а количество сообщений, извлекаемых из очереди каждый раз, равно 16. вытаскивается каждый раз 16 * 2 * 4 = 128, после вытягивания 1 сообщения из каждой очереди в первый раз (то есть всего 8 сообщений), после успешного потребления, он будет вытягивать до 128 сообщений каждый раз для потребления , Если вы хотите проверить это, вы можете изменить вставку () из onMessage () на log.info («1»), а затем подсчитать, равно ли количество журналов, напечатанных в секунду, 128.
Согласно приведенной выше конфигурации, теоретическое потребление за 2 с составляет 128 в случае одного Потребителя, то есть каждые две секунды в базу данных добавляется около 128 новых лайков, а 20% отклонения находится в допустимом диапазоне индивидуальный, а затем подобный интерфейс прост.Стресс-тест 1s 2000 запрашивает проверку эффекта MQ.В соответствии с конфигурацией потребления теоретически для потребления требуется 16 извлечений и 32 с.После стресс-теста проверьте эффект проверки базы данных :
Как динамически изменять эффективность потребления, когда он в сети, но оценка эффективности потребления неверна?
Когда для параметра pullBatchSize задано максимальное значение передачи брокера по умолчанию, равное 32, и онлайн не хочет перезапускать брокера для изменения параметра maxTransferCountOnMessageInMemory, если есть два брокера и очередь равна 4, тогда стоимость извлечения для выхода эффективность составляет 32 * 2 * 4 = 256. Если вы хотите настроить динамически, вы можете начать с количества брокеров или количества очередей брокера.Вы можете увеличить writeQueueNums и readQueueNums брокера.Если они оба изменены на 8 , то КПД становится 32 * 2 * 8 = 512. Следует отметить, что после изменения очередей вы должны зайти в МЕНЕДЖЕР ПОТРЕБИТЕЛЕЙ в теме Dashboard, чтобы проверить, все ли новые очереди имеют успешно зарегистрированных потребителей, потому что вы столкнулись с использованием Rocketmq-spring-boot-starter@ RocketMQListener в тестировании и производстве Отметьте ситуацию, что потребители не будут автоматически зарегистрированы в новую очередь, но не исключает, что это из-за версии RocketMQ (персональная локальная версия - небольшая версия 0.0.1 выше, чем среда , и нет локальной регистрации потребителя в новой очереди). проблемы в очереди), но альтернатива использованию пользовательского bean-компонента DefaultMQPushConsumer (собственный способ не представляет проблемы). Когда новое приложение-потребитель будет перезапущено, МЕНЕДЖЕР ПОТРЕБИТЕЛЕЙ (ниже) появится в строке очереди количество новых потребителей * количество очередей каждого брокера и сумма.
Как использовать массовое потребление RocketMQ?
Хотя TPS достиг текущих требований бизнес-индекса после использования одиночной вставки MQ для аналогичного бизнеса, учитывая, что если последующим требованием является увеличение TPS без добавления количества машин, а объем данных еще не достиг уровня подбазы данных и подтаблицы, я планирую начать с пакетного потребления и перейти от вставки одной похожей записи за раз к вставке нескольких записей за раз (insertBatch). Конечно, если это может удовлетворить существующие потребности, это точно не будет сделано, чрезмерная оптимизация слишком большая помеха, но было бы неплохо подумать о других решениях. Rocketmq-spring-boot-starter не предоставляет функцию пакетного потребления, поэтому для потребления сообщений пакетами необходимо настроитьDefaultMQPushConsumer
и настроить егоconsumeMessageBatchMaxSize
Атрибуты.consumeMessageBatchMaxSize
Значение атрибута по умолчанию равно 1, то есть одновременно потребляется только одно сообщение.Следует отметить, что на этот атрибут также будет влиятьpullBatchSize
повлиять, еслиconsumeMessageBatchMaxSize
32 ноpullBatchSize
Если это только 12, то максимальное количество сообщений, используемых в пакетах, составляет только 12.
Ниже приведен тестовый компонент, который использует Consumer пакетами для отдельных тестов:
@Bean
public DefaultMQPushConsumer userMQPushConsumer() throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(RocketConstant.ConsumerGroup.SPRING_BOOT_USER_CONSUMER);
consumer.setNamesrvAddr(nameServer);
consumer.subscribe(RocketConstant.Topic.SPRING_BOOT_USER_TOPIC, "*");
// 设置每次消息拉取的时间间隔,单位毫秒
consumer.setPullInterval(1000);
// 设置每个队列每次拉取的最大消息数
consumer.setPullBatchSize(24);
// 设置消费者单次批量消费的消息数目上限
consumer.setConsumeMessageBatchMaxSize(12);
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context)
-> {
List<UserInfo> userInfos = new ArrayList<>(msgs.size());
Map<Integer, Integer> queueMsgMap = new HashMap<>(8);
msgs.forEach(msg -> {
userInfos.add(JSONObject.parseObject(msg.getBody(), UserInfo.class));
queueMsgMap.compute(msg.getQueueId(), (key, val) -> val == null ? 1 : ++val);
});
log.info("userInfo size: {}, content: {}", userInfos.size(), userInfos);
/*
处理批量消息,如批量插入:userInfoMapper.insertBatch(userInfos);
*/
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
return consumer;
}
Если размер userInfo, печатаемый журналом, всегда равен 1 в конфигурации по умолчанию, но из-за настройкиconsumeMessageBatchMaxSize
иpullBatchSize
,иpullBatchSize
Небольшой, поэтому максимальное количество сообщений, потребляемых каждый раз, равно 12, как показано на следующем рисунке:
Прикрепленная информация об этой статье
- Убедитесь, что mqnamesrv и mqbroker успешно запущены, например, запуск среды статьи:
mqnamesrv -n 127.0.0.1:9876 mqbroker -c E:\RocketMQ\rocketmq-all-4.5.2-bin-release\bin\2m-noslave\broker-a.properties mqbroker -c E:\RocketMQ\rocketmq-all-4.5.2-bin-release\bin\2m-noslave\broker-b.properties
- Процесс запуска RocketMQ DashBoard может относиться кОфициальная документация на гитхабеили кмои ресурсыЗагрузите пакет jar и запустите его.
- Адрес источника, каталог 2m-noslave — это конфигурация брокера 2master и сценарий запуска в примере в этой статье, а каталог spring-boot-consumer-peak — это фактический пример, содержащий соответствующий код статьи.