1. Введение
Seckill — это, по сути, краткосрочная внезапная проблема с высоким одновременным доступом.Бизнес-характеристики следующие:
- Триггер времени, трафик внезапно увеличивается в одно мгновение
- Только часть запроса на всплеск часто бывает успешной
- Количество продукции seckill часто ограничено и не может быть перепродано, но допустимо продавать меньше
- Не требует немедленного возврата реального результата заказа
Эта статья в основном объясняет фактическое использование RocketMQ в сценарии seckill и не объясняет подробно другие бизнес-процессы seckill.
Ниже приведена вторая блок-схема уничтожения:
Чтобы понять конкретную реализацию, смотрите подробный код:Исходный код большого парня
2. Обзор бизнеса Seckill
Асинхронизировав основной бизнес-процесс seckill, мы можем разделить основной процесс на два этапа: получение и размещение заказов.
2.1 Получение второго процесса убийства
- Пользователь получает доступ к входу seckill, отправляет запрос seckill шлюзу получения платформы seckill, и платформа выполняет предварительную проверку запроса seckill.
- После прохождения проверки запрос заказа отправляется через промежуточный уровень, такой как кеш/очередь/пул потоков, и «очередь» возвращается пользователю одновременно с завершением доставки.
- Для запроса заказа, который не прошел предварительную проверку, он синхронно вернет второй заказ на уничтожение.
На этом взаимодействие с пользователем подошло к концу.
В процессе получения заказа заказ seckill размещается на среднем уровне RocketMQ.
2.2 Второй процесс уничтожения — порядок
В процессе заказа нагрузка на платформу фактически значительно снизилась за счет буферизации среднего уровня. Причина этого в том, что, с одной стороны, некоторые незаконные запросы отфильтровываются в процессе проверки синхронизации заказа пользователя. ; с другой стороны, мы выполняем некоторые операции, такие как ограничение скорости и давление заказа на запрос заказа, выполняя некоторые текущие ограничения, фильтрацию и другую логику на среднем уровне, и медленно перевариваем запрос заказа внутри, чтобы минимизировать влияние трафика на уровне сохраняемости платформы. Это на самом деле отражает средний слой«Срезайте вершины и заполняйте долины»специальность.
Основываясь на вышеприведенной предпосылке, мы кратко резюмируем бизнес-логику части заказа seckill.
- Служба заказа seckill получает запрос заказа среднего уровня и выполняет реальную проверку предварительного заказа Здесь в основном выполняется реальная проверка инвентаря.
- После успешного вычета запасов (или блокировки запасов) начните реальную операцию заказа. Вычет запасов (блокировка запасов) и размещение заказа обычно находятся в одном домене транзакции.
- После того, как заказ будет успешно размещен, платформа часто инициирует push-сообщение, чтобы информировать пользователя о том, что заказ выполнен успешно, и подсказывает пользователю совершить платежную операцию.
- Если пользователь не платит в течение определенного периода времени (например, 30 минут), заказ будет отменен, запасы будут восстановлены, а другим пользователям в очереди будут предоставлены возможности покупки.
- Если пользователь совершит успешную оплату, статус заказа будет обновлен, а поток заказов будет передан другим подсистемам.
На данный момент это в основном основной процесс бизнеса шипов.
дальнейшая абстракцияЗапрос Seckill -> средний уровень -> реальный заказПохож ли этот сценарий на асинхронный режим бизнес-процессов, который мы часто используем?
Я верю, что вы, у кого есть сердце, видели это, да, это«Производитель-Потребитель»модель.
Схема «производитель-потребитель» В процессе, часто черезочередь блокировкиили"подождите-уведомить"Такие механизмы реализуются, и между службами часто реализуются через очереди сообщений, что также является техническим методом реализации, используемым в этом реальном бою. В этой статье будет использоваться очередь сообщений RocketMQ для разделения заказов, размещенных за секунды, для достижения цели сглаживания пиков и заполнения впадин и повышения пропускной способности системы.
Далее я объясню, как использовать RocketMQ для реализации описанных выше сценариев.
3. Настоящий бой
3.1 Структура
- Пользователь получает доступ к seckill-шлюзу seckill-gateway-service и инициирует операцию seckill для интересующего продукта. В частности, информация о товаре загружается в seckill-gateway-service при инициализации системы. При выполнении прединвентаризационной проверки трафик заказа пользователя был отфильтрован один раз в соответствии с кешем.
- После того, как шлюз проведет достаточную предварительную проверку заказа seckill, он доставляет сообщение заказа seckill в RocketMQ и синхронно возвращает очередь пользователю.
- Платформа заказа seckill seckill-order-service подписывается на сообщение заказа seckill, выполняет идемпотентную обработку сообщения и выполняет операцию реального заказа после проверки запасов продукта.
3.2 Структура базы данных
3.3 Конфигурация сервера имен
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@Component
public class MQNamesrvConfig {
@Value("${rocketmq.nameServer.offline}")
String offlineNamesrv;
@Value("${rocketmq.nameServer.aliyun}")
String aliyunNamesrv;
/**
* 根据环境选择nameServer地址
* @return
*/
public String nameSrvAddr() {
String envType = System.getProperty("envType");
//System.out.println(envType);
if (StringUtils.isBlank(envType)) {
throw new IllegalArgumentException("please insert envType");
}
switch (envType) {
case "offline" : {
return offlineNamesrv;
}
case "aliyun" : {
return aliyunNamesrv;
}
default : {
throw new IllegalArgumentException("please insert right envType, offline/aliyun");
}
}
}
}
3.4 Протокол сообщений
Здесь самокодирование и декодирование протокола сообщения реализовано путем реализации шаблонных методов кодирования и декодирования BaseMsg (представляющих кодирование и декодирование сообщения соответственно) и путем установки свойств объекта this.
/**
* @desc 基础协议类
*/
public abstract class BaseMsg {
public Logger LOGGER = LoggerFactory.getLogger(this.getClass());
/**版本号,默认1.0*/
private String version = "1.0";
/**主题名*/
private String topicName;
public abstract String encode();
public abstract void decode(String msg);
public String getVersion() {
return version;
}
public void setVersion(String version) {
this.version = version;
}
public String getTopicName() {
return topicName;
}
public void setTopicName(String topicName) {
this.topicName = topicName;
}
@Override
public String toString() {
return "BaseMsg{" +
"version='" + version + '\'' +
", topicName='" + topicName + '\'' +
'}';
}
}
/**
* @className OrderNofityProtocol
* @desc 订单结果通知协议
*/
public class ChargeOrderMsgProtocol extends BaseMsg implements Serializable {
private static final long serialVersionUID = 73717163386598209L;
/**订单号*/
private String orderId;
/**用户下单手机号*/
private String userPhoneNo;
/**商品id*/
private String prodId;
/**用户交易金额*/
private String chargeMoney;
private Map<String, String> header;
private Map<String, String> body;
@Override
public String encode() {
// 组装消息协议头
ImmutableMap.Builder headerBuilder = new ImmutableMap.Builder<String, String>()
.put("version", this.getVersion())
.put("topicName", MessageProtocolConst.SECKILL_CHARGE_ORDER_TOPIC.getTopic());
header = headerBuilder.build();
body = new ImmutableMap.Builder<String, String>()
.put("orderId", this.getOrderId())
.put("userPhoneNo", this.getUserPhoneNo())
.put("prodId", this.getProdId())
.put("chargeMoney", this.getChargeMoney())
.build();
ImmutableMap<String, Object> map = new ImmutableMap.Builder<String, Object>()
.put("header", header)
.put("body", body)
.build();
// 返回序列化消息Json串
String ret_string = null;
ObjectMapper objectMapper = new ObjectMapper();
try {
ret_string = objectMapper.writeValueAsString(map);
} catch (JsonProcessingException e) {
throw new RuntimeException("ChargeOrderMsgProtocol消息序列化json异常", e);
}
return ret_string;
}
@Override
public void decode(String msg) {
Preconditions.checkNotNull(msg);
ObjectMapper mapper = new ObjectMapper();
try {
JsonNode root = mapper.readTree(msg);
// header
this.setVersion(root.get("header").get("version").asText());
this.setTopicName(root.get("header").get("topicName").asText());
// body
this.setOrderId(root.get("body").get("orderId").asText());
this.setUserPhoneNo(root.get("body").get("userPhoneNo").asText());
this.setChargeMoney(root.get("body").get("chargeMoney").asText());
this.setProdId(root.get("body").get("prodId").asText());
} catch (IOException e) {
throw new RuntimeException("ChargeOrderMsgProtocol消息反序列化异常", e);
}
}
public String getOrderId() {
return orderId;
}
public ChargeOrderMsgProtocol setOrderId(String orderId) {
this.orderId = orderId;
return this;
}
public String getUserPhoneNo() {
return userPhoneNo;
}
public ChargeOrderMsgProtocol setUserPhoneNo(String userPhoneNo) {
this.userPhoneNo = userPhoneNo;
return this;
}
public String getProdId() {
return prodId;
}
public ChargeOrderMsgProtocol setProdId(String prodId) {
this.prodId = prodId;
return this;
}
public String getChargeMoney() {
return chargeMoney;
}
public ChargeOrderMsgProtocol setChargeMoney(String chargeMoney) {
this.chargeMoney = chargeMoney;
return this;
}
@Override
public String toString() {
return "ChargeOrderMsgProtocol{" +
"orderId='" + orderId + '\'' +
", userPhoneNo='" + userPhoneNo + '\'' +
", prodId='" + prodId + '\'' +
", chargeMoney='" + chargeMoney + '\'' +
", header=" + header +
", body=" + body +
"} " + super.toString();
}
}
3.5 Инициализация производителя Instant Kill Order
Загружается @PostConstruct (т.е. init())
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.gateway.common.config.MQNamesrvConfig;
import org.apache.rocketmq.gateway.common.util.LogExceptionWapper;
import org.apache.rocketmq.message.constant.MessageProtocolConst;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
/**
* @className SecKillChargeOrderProducer
* @desc 秒杀订单生产者初始化
*/
@Component
public class SecKillChargeOrderProducer {
private static final Logger LOGGER = LoggerFactory.getLogger(SecKillChargeOrderProducer.class);
@Autowired
MQNamesrvConfig namesrvConfig;
@Value("${rocketmq.acl.accesskey}")
String aclAccessKey;
@Value("${rocketmq.acl.accessSecret}")
String aclAccessSecret;
private DefaultMQProducer defaultMQProducer;
@PostConstruct
public void init() {
defaultMQProducer =
new DefaultMQProducer
(MessageProtocolConst.SECKILL_CHARGE_ORDER_TOPIC.getProducerGroup(),
new AclClientRPCHook(new SessionCredentials(aclAccessKey, aclAccessSecret)));
defaultMQProducer.setNamesrvAddr(namesrvConfig.nameSrvAddr());
// 发送失败重试次数
defaultMQProducer.setRetryTimesWhenSendFailed(3);
try {
defaultMQProducer.start();
} catch (MQClientException e) {
LOGGER.error("[秒杀订单生产者]--SecKillChargeOrderProducer加载异常!e={}", LogExceptionWapper.getStackTrace(e));
throw new RuntimeException("[秒杀订单生产者]--SecKillChargeOrderProducer加载异常!", e);
}
LOGGER.info("[秒杀订单生产者]--SecKillChargeOrderProducer加载完成!");
}
public DefaultMQProducer getProducer() {
return defaultMQProducer;
}
}
3.6 Ввод заказа Seckill (производитель)
/**
* 平台下单接口
* @param chargeOrderRequest
* @return
*/
@RequestMapping(value = "charge.do", method = {RequestMethod.POST})
public @ResponseBody Result chargeOrder(@ModelAttribute ChargeOrderRequest chargeOrderRequest) {
ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
String sessionId = attributes.getSessionId();
// 下单前置参数校验
if (!secKillChargeService.checkParamsBeforeSecKillCharge(chargeOrderRequest, sessionId)) {
return Result.error(CodeMsg.PARAM_INVALID);
}
// 前置商品校验
String prodId = chargeOrderRequest.getProdId();
if (!secKillChargeService.checkProdConfigBeforeKillCharge(prodId, sessionId)) {
return Result.error(CodeMsg.PRODUCT_NOT_EXIST);
}
// 前置预减库存
if (!secKillProductConfig.preReduceProdStock(prodId)) {
return Result.error(CodeMsg.PRODUCT_STOCK_NOT_ENOUGH);
}
// 秒杀订单入队
return secKillChargeService.secKillOrderEnqueue(chargeOrderRequest, sessionId);
}
Режиссер:secKillChargeService::secKillOrderEnqueue
/**
* 秒杀订单入队
* @param chargeOrderRequest
* @param sessionId
* @return
*/
@Override
public Result secKillOrderEnqueue(ChargeOrderRequest chargeOrderRequest, String sessionId) {
// 订单号生成,组装秒杀订单消息协议
String orderId = UUID.randomUUID().toString();
String phoneNo = chargeOrderRequest.getUserPhoneNum();
//消息封装
ChargeOrderMsgProtocol msgProtocol = new ChargeOrderMsgProtocol();
msgProtocol.setUserPhoneNo(phoneNo)
.setProdId(chargeOrderRequest.getProdId())
.setChargeMoney(chargeOrderRequest.getChargePrice())
.setOrderId(orderId);
String msgBody = msgProtocol.encode();
LOGGER.info("秒杀订单入队,消息协议={}", msgBody);
DefaultMQProducer mqProducer = secKillChargeOrderProducer.getProducer();
// 组装RocketMQ消息体
Message message = new Message(MessageProtocolConst.SECKILL_CHARGE_ORDER_TOPIC.getTopic(), msgBody.getBytes());
try {
// 消息发送
SendResult sendResult = mqProducer.send(message);
//判断SendStatus
if (sendResult == null) {
LOGGER.error("sessionId={},秒杀订单消息投递失败,下单失败.msgBody={},sendResult=null", sessionId, msgBody);
return Result.error(CodeMsg.BIZ_ERROR);
}
if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
LOGGER.error("sessionId={},秒杀订单消息投递失败,下单失败.msgBody={},sendResult=null", sessionId, msgBody);
return Result.error(CodeMsg.BIZ_ERROR);
}
ChargeOrderResponse chargeOrderResponse = new ChargeOrderResponse();
BeanUtils.copyProperties(msgProtocol, chargeOrderResponse);
LOGGER.info("sessionId={},秒杀订单消息投递成功,订单入队.出参chargeOrderResponse={},sendResult={}", sessionId, chargeOrderResponse.toString(), JSON.toJSONString(sendResult));
return Result.success(CodeMsg.ORDER_INLINE, chargeOrderResponse);
} catch (Exception e) {
int sendRetryTimes = mqProducer.getRetryTimesWhenSendFailed();
LOGGER.error("sessionId={},sendRetryTimes={},秒杀订单消息投递异常,下单失败.msgBody={},e={}", sessionId, sendRetryTimes, msgBody, LogExceptionWapper.getStackTrace(e));
}
return Result.error(CodeMsg.BIZ_ERROR);
}
3.7 Мгновенное потребление убийств
3.7.1 Определение клиента-потребителя
Мгновенно убивать клиентов, размещающих заказы
@Component
public class SecKillChargeOrderConsumer {
private static final Logger LOGGER = LoggerFactory.getLogger(SecKillChargeOrderConsumer.class);
@Autowired
MQNamesrvConfig namesrvConfig;
@Value("${rocketmq.acl.accesskey}")
String aclAccessKey;
@Value("${rocketmq.acl.accessSecret}")
String aclAccessSecret;
private DefaultMQPushConsumer defaultMQPushConsumer;
@Resource(name = "secKillChargeOrderListenerImpl")
private MessageListenerConcurrently messageListener;
@PostConstruct
public void init() {
defaultMQPushConsumer =
new DefaultMQPushConsumer(
MessageProtocolConst.SECKILL_CHARGE_ORDER_TOPIC.getConsumerGroup(),
new AclClientRPCHook(new SessionCredentials(aclAccessKey, aclAccessSecret)),
// 平均分配队列算法,hash
new AllocateMessageQueueAveragely());
defaultMQPushConsumer.setNamesrvAddr(namesrvConfig.nameSrvAddr());
// 从头开始消费
defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// 消费模式:集群模式
// 集群:同一条消息 只会被一个消费者节点消费到
// 广播:同一条消息 每个消费者都会消费到
defaultMQPushConsumer.setMessageModel(MessageModel.CLUSTERING);
// 注册监听器
defaultMQPushConsumer.registerMessageListener(messageListener);
// 设置每次拉取的消息量,默认为1
defaultMQPushConsumer.setConsumeMessageBatchMaxSize(1);
// 订阅所有消息
try {
defaultMQPushConsumer.subscribe(MessageProtocolConst.SECKILL_CHARGE_ORDER_TOPIC.getTopic(), "*");
// 启动消费者
defaultMQPushConsumer.start();
} catch (MQClientException e) {
LOGGER.error("[秒杀下单消费者]--SecKillChargeOrderConsumer加载异常!e={}", LogExceptionWapper.getStackTrace(e));
throw new RuntimeException("[秒杀下单消费者]--SecKillChargeOrderConsumer加载异常!", e);
}
LOGGER.info("[秒杀下单消费者]--SecKillChargeOrderConsumer加载完成!");
}
}
3.7.2 Реализовать базовую логику получения seckill
Реализовать логику ядра эквайринга seckill, то есть реализовать собственный MessageListenerConcurrently.
@Component
public class SecKillChargeOrderListenerImpl implements MessageListenerConcurrently {
private static final Logger LOGGER = LoggerFactory.getLogger(SecKillChargeOrderListenerImpl.class);
@Resource(name = "secKillOrderService")
SecKillOrderService secKillOrderService;
@Autowired
SecKillProductService secKillProductService;
/**
* 秒杀核心消费逻辑
* @param msgs
* @param context
* @return
*/
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
try {
for (MessageExt msg : msgs) {
// 消息解码
String message = new String(msg.getBody());
int reconsumeTimes = msg.getReconsumeTimes();
String msgId = msg.getMsgId();
String logSuffix = ",msgId=" + msgId + ",reconsumeTimes=" + reconsumeTimes;
LOGGER.info("[秒杀订单消费者]-SecKillChargeOrderConsumer-接收到消息,message={},{}", message, logSuffix);
// 反序列化协议实体
ChargeOrderMsgProtocol chargeOrderMsgProtocol = new ChargeOrderMsgProtocol();
chargeOrderMsgProtocol.decode(message);
LOGGER.info("[秒杀订单消费者]-SecKillChargeOrderConsumer-反序列化为秒杀入库订单实体chargeOrderMsgProtocol={},{}", chargeOrderMsgProtocol.toString(), logSuffix);
// 消费幂等:查询orderId对应订单是否已存在
String orderId = chargeOrderMsgProtocol.getOrderId();
OrderInfoDobj orderInfoDobj = secKillOrderService.queryOrderInfoById(orderId);
if (orderInfoDobj != null) {
LOGGER.info("[秒杀订单消费者]-SecKillChargeOrderConsumer-当前订单已入库,不需要重复消费!,orderId={},{}", orderId, logSuffix);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
// 业务幂等:同一个prodId+同一个userPhoneNo只有一个秒杀订单
OrderInfoDO orderInfoDO = new OrderInfoDO();
orderInfoDO.setProdId(chargeOrderMsgProtocol.getProdId())
.setUserPhoneNo(chargeOrderMsgProtocol.getUserPhoneNo());
Result result = secKillOrderService.queryOrder(orderInfoDO);
if (result != null && result.getCode().equals(CodeMsg.SUCCESS.getCode())) {
LOGGER.info("[秒杀订单消费者]-SecKillChargeOrderConsumer-当前用户={},秒杀的产品={}订单已存在,不得重复秒杀,orderId={}",
orderInfoDO.getUserPhoneNo(), orderInfoDO.getProdId(), orderId);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
// 秒杀订单入库
OrderInfoDO orderInfoDODB = new OrderInfoDO();
BeanUtils.copyProperties(chargeOrderMsgProtocol, orderInfoDODB);
// 库存校验
String prodId = chargeOrderMsgProtocol.getProdId();
SecKillProductDobj productDobj = secKillProductService.querySecKillProductByProdId(prodId);
// 取库存校验
int currentProdStock = productDobj.getProdStock();
if (currentProdStock <= 0) {
LOGGER.info("[decreaseProdStock]当前商品已售罄,消息消费成功!prodId={},currStock={}", prodId, currentProdStock);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
// 正式下单
if (secKillOrderService.chargeSecKillOrder(orderInfoDODB)) {
LOGGER.info("[秒杀订单消费者]-SecKillChargeOrderConsumer-秒杀订单入库成功,消息消费成功!,入库实体orderInfoDO={},{}", orderInfoDO.toString(), logSuffix);
// 模拟订单处理,直接修改订单状态为处理中
secKillOrderService.updateOrderStatusDealing(orderInfoDODB);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
} catch (Exception e) {
LOGGER.info("[秒杀订单消费者]消费异常,e={}", LogExceptionWapper.getStackTrace(e));
}
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
3.7.3 Фактическое хранение второго убийства
Фактическая операция заказа и фактический вычет запасов находятся в одной и той же локальной транзакции.
/**
* 秒杀订单入库
* @param orderInfoDO
* @return
*/
@Transactional(rollbackFor = Exception.class)
@Override
public boolean chargeSecKillOrder(OrderInfoDO orderInfoDO) {
int insertCount = 0;
String orderId = orderInfoDO.getOrderId();
String prodId = orderInfoDO.getProdId();
// 减库存
if (!secKillProductService.decreaseProdStock(prodId)) {
LOGGER.info("[insertSecKillOrder]orderId={},prodId={},下单前减库存失败,下单失败!", orderId, prodId);
// TODO 此处可给用户发送通知,告知秒杀下单失败,原因:商品已售罄
return false;
}
// 设置产品名称
SecKillProductDobj productInfo = secKillProductService.querySecKillProductByProdId(prodId);
orderInfoDO.setProdName(productInfo.getProdName());
try {
insertCount = secKillOrderMapper.insertSecKillOrder(orderInfoDO);
} catch (Exception e) {
LOGGER.error("[insertSecKillOrder]orderId={},秒杀订单入库[异常],事务回滚,e={}", orderId, LogExceptionWapper.getStackTrace(e));
String message =
String.format("[insertSecKillOrder]orderId=%s,秒杀订单入库[异常],事务回滚", orderId);
throw new RuntimeException(message);
}
if (insertCount != 1) {
LOGGER.error("[insertSecKillOrder]orderId={},秒杀订单入库[失败],事务回滚,e={}", orderId);
String message =
String.format("[insertSecKillOrder]orderId=%s,秒杀订单入库[失败],事务回滚", orderId);
throw new RuntimeException(message);
}
return true;
}
4. Резюме и ссылки
резюме
Посмотрите еще раз на блок-схему и посмотрите на исходный код, если вы его не понимаете.
Для таких операций, как оплата и логистика после размещения заказа, RocketMQ можно использовать для асинхронной обработки.
Весь код в этой статье взят изИсходный код большого парня
Просто чтобы учиться и изучать реальный бой RocketMQ.