Битва RocketMQ — сценарий с высоким параллелизмом

RocketMQ

1. Введение

Seckill — это, по сути, краткосрочная внезапная проблема с высоким одновременным доступом.Бизнес-характеристики следующие:

  1. Триггер времени, трафик внезапно увеличивается в одно мгновение
  2. Только часть запроса на всплеск часто бывает успешной
  3. Количество продукции seckill часто ограничено и не может быть перепродано, но допустимо продавать меньше
  4. Не требует немедленного возврата реального результата заказа

Эта статья в основном объясняет фактическое использование RocketMQ в сценарии seckill и не объясняет подробно другие бизнес-процессы seckill.

Ниже приведена вторая блок-схема уничтожения:

秒杀流程图

Чтобы понять конкретную реализацию, смотрите подробный код:Исходный код большого парня

2. Обзор бизнеса Seckill

Асинхронизировав основной бизнес-процесс seckill, мы можем разделить основной процесс на два этапа: получение и размещение заказов.

2.1 Получение второго процесса убийства

  1. Пользователь получает доступ к входу seckill, отправляет запрос seckill шлюзу получения платформы seckill, и платформа выполняет предварительную проверку запроса seckill.
  2. После прохождения проверки запрос заказа отправляется через промежуточный уровень, такой как кеш/очередь/пул потоков, и «очередь» возвращается пользователю одновременно с завершением доставки.
  3. Для запроса заказа, который не прошел предварительную проверку, он синхронно вернет второй заказ на уничтожение.

На этом взаимодействие с пользователем подошло к концу.

В процессе получения заказа заказ seckill размещается на среднем уровне RocketMQ.

2.2 Второй процесс уничтожения — порядок

В процессе заказа нагрузка на платформу фактически значительно снизилась за счет буферизации среднего уровня. Причина этого в том, что, с одной стороны, некоторые незаконные запросы отфильтровываются в процессе проверки синхронизации заказа пользователя. ; с другой стороны, мы выполняем некоторые операции, такие как ограничение скорости и давление заказа на запрос заказа, выполняя некоторые текущие ограничения, фильтрацию и другую логику на среднем уровне, и медленно перевариваем запрос заказа внутри, чтобы минимизировать влияние трафика на уровне сохраняемости платформы. Это на самом деле отражает средний слой«Срезайте вершины и заполняйте долины»специальность.

Основываясь на вышеприведенной предпосылке, мы кратко резюмируем бизнес-логику части заказа seckill.

  1. Служба заказа seckill получает запрос заказа среднего уровня и выполняет реальную проверку предварительного заказа Здесь в основном выполняется реальная проверка инвентаря.
  2. После успешного вычета запасов (или блокировки запасов) начните реальную операцию заказа. Вычет запасов (блокировка запасов) и размещение заказа обычно находятся в одном домене транзакции.
  3. После того, как заказ будет успешно размещен, платформа часто инициирует push-сообщение, чтобы информировать пользователя о том, что заказ выполнен успешно, и подсказывает пользователю совершить платежную операцию.
  4. Если пользователь не платит в течение определенного периода времени (например, 30 минут), заказ будет отменен, запасы будут восстановлены, а другим пользователям в очереди будут предоставлены возможности покупки.
  5. Если пользователь совершит успешную оплату, статус заказа будет обновлен, а поток заказов будет передан другим подсистемам.

На данный момент это в основном основной процесс бизнеса шипов.

дальнейшая абстракцияЗапрос Seckill -> средний уровень -> реальный заказПохож ли этот сценарий на асинхронный режим бизнес-процессов, который мы часто используем?

Я верю, что вы, у кого есть сердце, видели это, да, это«Производитель-Потребитель»модель.

Схема «производитель-потребитель» В процессе, часто черезочередь блокировкиили"подождите-уведомить"Такие механизмы реализуются, и между службами часто реализуются через очереди сообщений, что также является техническим методом реализации, используемым в этом реальном бою. В этой статье будет использоваться очередь сообщений RocketMQ для разделения заказов, размещенных за секунды, для достижения цели сглаживания пиков и заполнения впадин и повышения пропускной способности системы.

Далее я объясню, как использовать RocketMQ для реализации описанных выше сценариев.

3. Настоящий бой

3.1 Структура

秒杀结构

  1. Пользователь получает доступ к seckill-шлюзу seckill-gateway-service и инициирует операцию seckill для интересующего продукта. В частности, информация о товаре загружается в seckill-gateway-service при инициализации системы. При выполнении прединвентаризационной проверки трафик заказа пользователя был отфильтрован один раз в соответствии с кешем.
  2. После того, как шлюз проведет достаточную предварительную проверку заказа seckill, он доставляет сообщение заказа seckill в RocketMQ и синхронно возвращает очередь пользователю.
  3. Платформа заказа seckill seckill-order-service подписывается на сообщение заказа seckill, выполняет идемпотентную обработку сообщения и выполняет операцию реального заказа после проверки запасов продукта.

3.2 Структура базы данных

秒杀数据库结构

3.3 Конфигурация сервера имен

import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

@Component
public class MQNamesrvConfig {

    @Value("${rocketmq.nameServer.offline}")
    String offlineNamesrv;

    @Value("${rocketmq.nameServer.aliyun}")
    String aliyunNamesrv;

    /**
     * 根据环境选择nameServer地址
     * @return
     */
    public String nameSrvAddr() {
        String envType = System.getProperty("envType");
        //System.out.println(envType);
        if (StringUtils.isBlank(envType)) {
            throw new IllegalArgumentException("please insert envType");
        }
        switch (envType) {
            case "offline" : {
                return offlineNamesrv;
            }
            case "aliyun" : {
                return aliyunNamesrv;
            }
            default : {
                throw new IllegalArgumentException("please insert right envType, offline/aliyun");
            }
        }
    }
}

3.4 Протокол сообщений

Здесь самокодирование и декодирование протокола сообщения реализовано путем реализации шаблонных методов кодирования и декодирования BaseMsg (представляющих кодирование и декодирование сообщения соответственно) и путем установки свойств объекта this.

/**
 * @desc 基础协议类
 */
public abstract class BaseMsg {

    public Logger LOGGER = LoggerFactory.getLogger(this.getClass());

    /**版本号,默认1.0*/
    private String version = "1.0";
    /**主题名*/
    private String topicName;

    public abstract String encode();

    public abstract void decode(String msg);

    public String getVersion() {
        return version;
    }

    public void setVersion(String version) {
        this.version = version;
    }

    public String getTopicName() {
        return topicName;
    }

    public void setTopicName(String topicName) {
        this.topicName = topicName;
    }

    @Override
    public String toString() {
        return "BaseMsg{" +
                "version='" + version + '\'' +
                ", topicName='" + topicName + '\'' +
                '}';
    }
}
/**
 * @className OrderNofityProtocol
 * @desc 订单结果通知协议
 */
public class ChargeOrderMsgProtocol extends BaseMsg implements Serializable {

    private static final long serialVersionUID = 73717163386598209L;

    /**订单号*/
    private String orderId;
    /**用户下单手机号*/
    private String userPhoneNo;
    /**商品id*/
    private String prodId;
    /**用户交易金额*/
    private String chargeMoney;

    private Map<String, String> header;
    private Map<String, String> body;

    @Override
    public String encode() {
        // 组装消息协议头
        ImmutableMap.Builder headerBuilder = new ImmutableMap.Builder<String, String>()
                .put("version", this.getVersion())
                .put("topicName", MessageProtocolConst.SECKILL_CHARGE_ORDER_TOPIC.getTopic());
        header = headerBuilder.build();

        body = new ImmutableMap.Builder<String, String>()
                .put("orderId", this.getOrderId())
                .put("userPhoneNo", this.getUserPhoneNo())
                .put("prodId", this.getProdId())
                .put("chargeMoney", this.getChargeMoney())
                .build();

        ImmutableMap<String, Object> map = new ImmutableMap.Builder<String, Object>()
                .put("header", header)
                .put("body", body)
                .build();

        // 返回序列化消息Json串
        String ret_string = null;
        ObjectMapper objectMapper = new ObjectMapper();
        try {
            ret_string = objectMapper.writeValueAsString(map);
        } catch (JsonProcessingException e) {
            throw new RuntimeException("ChargeOrderMsgProtocol消息序列化json异常", e);
        }
        return ret_string;
    }

    @Override
    public void decode(String msg) {
        Preconditions.checkNotNull(msg);
        ObjectMapper mapper = new ObjectMapper();
        try {
            JsonNode root = mapper.readTree(msg);
            // header
            this.setVersion(root.get("header").get("version").asText());
            this.setTopicName(root.get("header").get("topicName").asText());
            // body
            this.setOrderId(root.get("body").get("orderId").asText());
            this.setUserPhoneNo(root.get("body").get("userPhoneNo").asText());
            this.setChargeMoney(root.get("body").get("chargeMoney").asText());
            this.setProdId(root.get("body").get("prodId").asText());
        } catch (IOException e) {
            throw new RuntimeException("ChargeOrderMsgProtocol消息反序列化异常", e);
        }
    }

    public String getOrderId() {
        return orderId;
    }

    public ChargeOrderMsgProtocol setOrderId(String orderId) {
        this.orderId = orderId;
        return this;
    }

    public String getUserPhoneNo() {
        return userPhoneNo;
    }

    public ChargeOrderMsgProtocol setUserPhoneNo(String userPhoneNo) {
        this.userPhoneNo = userPhoneNo;
        return this;
    }

    public String getProdId() {
        return prodId;
    }

    public ChargeOrderMsgProtocol setProdId(String prodId) {
        this.prodId = prodId;
        return this;
    }

    public String getChargeMoney() {
        return chargeMoney;
    }

    public ChargeOrderMsgProtocol setChargeMoney(String chargeMoney) {
        this.chargeMoney = chargeMoney;
        return this;
    }

    @Override
    public String toString() {
        return "ChargeOrderMsgProtocol{" +
                "orderId='" + orderId + '\'' +
                ", userPhoneNo='" + userPhoneNo + '\'' +
                ", prodId='" + prodId + '\'' +
                ", chargeMoney='" + chargeMoney + '\'' +
                ", header=" + header +
                ", body=" + body +
                "} " + super.toString();
    }
}

3.5 Инициализация производителя Instant Kill Order

Загружается @PostConstruct (т.е. init())

import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.gateway.common.config.MQNamesrvConfig;
import org.apache.rocketmq.gateway.common.util.LogExceptionWapper;
import org.apache.rocketmq.message.constant.MessageProtocolConst;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;

/**
 * @className SecKillChargeOrderProducer
 * @desc 秒杀订单生产者初始化
 */
@Component
public class SecKillChargeOrderProducer {

    private static final Logger LOGGER = LoggerFactory.getLogger(SecKillChargeOrderProducer.class);

    @Autowired
    MQNamesrvConfig namesrvConfig;

    @Value("${rocketmq.acl.accesskey}")
    String aclAccessKey;

    @Value("${rocketmq.acl.accessSecret}")
    String aclAccessSecret;


    private DefaultMQProducer defaultMQProducer;

    @PostConstruct
    public void init() {
        defaultMQProducer =
                new DefaultMQProducer
                        (MessageProtocolConst.SECKILL_CHARGE_ORDER_TOPIC.getProducerGroup(),
                                new AclClientRPCHook(new SessionCredentials(aclAccessKey, aclAccessSecret)));
        defaultMQProducer.setNamesrvAddr(namesrvConfig.nameSrvAddr());
        // 发送失败重试次数
        defaultMQProducer.setRetryTimesWhenSendFailed(3);
        try {
            defaultMQProducer.start();
        } catch (MQClientException e) {
            LOGGER.error("[秒杀订单生产者]--SecKillChargeOrderProducer加载异常!e={}", LogExceptionWapper.getStackTrace(e));
            throw new RuntimeException("[秒杀订单生产者]--SecKillChargeOrderProducer加载异常!", e);
        }
        LOGGER.info("[秒杀订单生产者]--SecKillChargeOrderProducer加载完成!");
    }

    public DefaultMQProducer getProducer() {
        return defaultMQProducer;
    }
}

3.6 Ввод заказа Seckill (производитель)

/**
* 平台下单接口
* @param chargeOrderRequest
* @return
*/
@RequestMapping(value = "charge.do", method = {RequestMethod.POST})
public @ResponseBody Result chargeOrder(@ModelAttribute ChargeOrderRequest chargeOrderRequest) {
ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
String sessionId = attributes.getSessionId();
// 下单前置参数校验
if (!secKillChargeService.checkParamsBeforeSecKillCharge(chargeOrderRequest, sessionId)) {
return Result.error(CodeMsg.PARAM_INVALID);
}
// 前置商品校验
String prodId = chargeOrderRequest.getProdId();
if (!secKillChargeService.checkProdConfigBeforeKillCharge(prodId, sessionId)) {
return Result.error(CodeMsg.PRODUCT_NOT_EXIST);
}
// 前置预减库存
if (!secKillProductConfig.preReduceProdStock(prodId)) {
return Result.error(CodeMsg.PRODUCT_STOCK_NOT_ENOUGH);
}
// 秒杀订单入队
return secKillChargeService.secKillOrderEnqueue(chargeOrderRequest, sessionId);
}

Режиссер:secKillChargeService::secKillOrderEnqueue

/**
 * 秒杀订单入队
 * @param chargeOrderRequest
 * @param sessionId
 * @return
 */
@Override
public Result secKillOrderEnqueue(ChargeOrderRequest chargeOrderRequest, String sessionId) {

    // 订单号生成,组装秒杀订单消息协议
    String orderId = UUID.randomUUID().toString();
    String phoneNo = chargeOrderRequest.getUserPhoneNum();
	
    //消息封装
    ChargeOrderMsgProtocol msgProtocol = new ChargeOrderMsgProtocol();
    msgProtocol.setUserPhoneNo(phoneNo)
        .setProdId(chargeOrderRequest.getProdId())
        .setChargeMoney(chargeOrderRequest.getChargePrice())
        .setOrderId(orderId);
    String msgBody = msgProtocol.encode();
    LOGGER.info("秒杀订单入队,消息协议={}", msgBody);

    DefaultMQProducer mqProducer = secKillChargeOrderProducer.getProducer();
    // 组装RocketMQ消息体
    Message message = new Message(MessageProtocolConst.SECKILL_CHARGE_ORDER_TOPIC.getTopic(), msgBody.getBytes());
    try {
        // 消息发送
        SendResult sendResult = mqProducer.send(message);
        //判断SendStatus
        if (sendResult == null) {
            LOGGER.error("sessionId={},秒杀订单消息投递失败,下单失败.msgBody={},sendResult=null", sessionId, msgBody);
            return Result.error(CodeMsg.BIZ_ERROR);
        }
        if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
            LOGGER.error("sessionId={},秒杀订单消息投递失败,下单失败.msgBody={},sendResult=null", sessionId, msgBody);
            return Result.error(CodeMsg.BIZ_ERROR);
        }
        ChargeOrderResponse chargeOrderResponse = new ChargeOrderResponse();
        BeanUtils.copyProperties(msgProtocol, chargeOrderResponse);
        LOGGER.info("sessionId={},秒杀订单消息投递成功,订单入队.出参chargeOrderResponse={},sendResult={}", sessionId, chargeOrderResponse.toString(), JSON.toJSONString(sendResult));
        return Result.success(CodeMsg.ORDER_INLINE, chargeOrderResponse);
    } catch (Exception e) {
        int sendRetryTimes = mqProducer.getRetryTimesWhenSendFailed();
        LOGGER.error("sessionId={},sendRetryTimes={},秒杀订单消息投递异常,下单失败.msgBody={},e={}", sessionId, sendRetryTimes, msgBody, LogExceptionWapper.getStackTrace(e));
    }
    return Result.error(CodeMsg.BIZ_ERROR);
}

3.7 Мгновенное потребление убийств

3.7.1 Определение клиента-потребителя

Мгновенно убивать клиентов, размещающих заказы

@Component
public class SecKillChargeOrderConsumer {

    private static final Logger LOGGER = LoggerFactory.getLogger(SecKillChargeOrderConsumer.class);

    @Autowired
    MQNamesrvConfig namesrvConfig;

    @Value("${rocketmq.acl.accesskey}")
    String aclAccessKey;

    @Value("${rocketmq.acl.accessSecret}")
    String aclAccessSecret;

    private DefaultMQPushConsumer defaultMQPushConsumer;

    @Resource(name = "secKillChargeOrderListenerImpl")
    private MessageListenerConcurrently messageListener;

    @PostConstruct
    public void init() {
        defaultMQPushConsumer =
                new DefaultMQPushConsumer(
                    MessageProtocolConst.SECKILL_CHARGE_ORDER_TOPIC.getConsumerGroup(),
                        new AclClientRPCHook(new SessionCredentials(aclAccessKey, aclAccessSecret)),
                        // 平均分配队列算法,hash
                        new AllocateMessageQueueAveragely());
        defaultMQPushConsumer.setNamesrvAddr(namesrvConfig.nameSrvAddr());
        // 从头开始消费
        defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        // 消费模式:集群模式
        // 集群:同一条消息 只会被一个消费者节点消费到
        // 广播:同一条消息 每个消费者都会消费到
        defaultMQPushConsumer.setMessageModel(MessageModel.CLUSTERING);
        // 注册监听器
        defaultMQPushConsumer.registerMessageListener(messageListener);
        // 设置每次拉取的消息量,默认为1
        defaultMQPushConsumer.setConsumeMessageBatchMaxSize(1);
        // 订阅所有消息
        try {
            defaultMQPushConsumer.subscribe(MessageProtocolConst.SECKILL_CHARGE_ORDER_TOPIC.getTopic(), "*");
            // 启动消费者
            defaultMQPushConsumer.start();
        } catch (MQClientException e) {
            LOGGER.error("[秒杀下单消费者]--SecKillChargeOrderConsumer加载异常!e={}", LogExceptionWapper.getStackTrace(e));
            throw new RuntimeException("[秒杀下单消费者]--SecKillChargeOrderConsumer加载异常!", e);
        }
        LOGGER.info("[秒杀下单消费者]--SecKillChargeOrderConsumer加载完成!");
    }
}

3.7.2 Реализовать базовую логику получения seckill

Реализовать логику ядра эквайринга seckill, то есть реализовать собственный MessageListenerConcurrently.

@Component
public class SecKillChargeOrderListenerImpl implements MessageListenerConcurrently {

    private static final Logger LOGGER = LoggerFactory.getLogger(SecKillChargeOrderListenerImpl.class);

    @Resource(name = "secKillOrderService")
    SecKillOrderService secKillOrderService;

    @Autowired
    SecKillProductService secKillProductService;

    /**
     * 秒杀核心消费逻辑
     * @param msgs
     * @param context
     * @return
     */
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        try {
            for (MessageExt msg : msgs) {
                // 消息解码
                String message = new String(msg.getBody());
                int reconsumeTimes = msg.getReconsumeTimes();
                String msgId = msg.getMsgId();
                String logSuffix = ",msgId=" + msgId + ",reconsumeTimes=" + reconsumeTimes;
                LOGGER.info("[秒杀订单消费者]-SecKillChargeOrderConsumer-接收到消息,message={},{}", message, logSuffix);

                // 反序列化协议实体
                ChargeOrderMsgProtocol chargeOrderMsgProtocol = new ChargeOrderMsgProtocol();
                chargeOrderMsgProtocol.decode(message);
                LOGGER.info("[秒杀订单消费者]-SecKillChargeOrderConsumer-反序列化为秒杀入库订单实体chargeOrderMsgProtocol={},{}", chargeOrderMsgProtocol.toString(), logSuffix);

                // 消费幂等:查询orderId对应订单是否已存在
                String orderId = chargeOrderMsgProtocol.getOrderId();
                OrderInfoDobj orderInfoDobj = secKillOrderService.queryOrderInfoById(orderId);
                if (orderInfoDobj != null) {
                    LOGGER.info("[秒杀订单消费者]-SecKillChargeOrderConsumer-当前订单已入库,不需要重复消费!,orderId={},{}", orderId, logSuffix);
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }

                // 业务幂等:同一个prodId+同一个userPhoneNo只有一个秒杀订单
                OrderInfoDO orderInfoDO = new OrderInfoDO();
                orderInfoDO.setProdId(chargeOrderMsgProtocol.getProdId())
                        .setUserPhoneNo(chargeOrderMsgProtocol.getUserPhoneNo());
                Result result = secKillOrderService.queryOrder(orderInfoDO);
                if (result != null && result.getCode().equals(CodeMsg.SUCCESS.getCode())) {
                    LOGGER.info("[秒杀订单消费者]-SecKillChargeOrderConsumer-当前用户={},秒杀的产品={}订单已存在,不得重复秒杀,orderId={}",
                            orderInfoDO.getUserPhoneNo(), orderInfoDO.getProdId(), orderId);
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }

                // 秒杀订单入库
                OrderInfoDO orderInfoDODB = new OrderInfoDO();
                BeanUtils.copyProperties(chargeOrderMsgProtocol, orderInfoDODB);

                // 库存校验
                String prodId = chargeOrderMsgProtocol.getProdId();
                SecKillProductDobj productDobj = secKillProductService.querySecKillProductByProdId(prodId);
                // 取库存校验
                int currentProdStock = productDobj.getProdStock();
                if (currentProdStock <= 0) {
                    LOGGER.info("[decreaseProdStock]当前商品已售罄,消息消费成功!prodId={},currStock={}", prodId, currentProdStock);
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
                // 正式下单
                if (secKillOrderService.chargeSecKillOrder(orderInfoDODB)) {
                    LOGGER.info("[秒杀订单消费者]-SecKillChargeOrderConsumer-秒杀订单入库成功,消息消费成功!,入库实体orderInfoDO={},{}", orderInfoDO.toString(), logSuffix);
                    // 模拟订单处理,直接修改订单状态为处理中
                    secKillOrderService.updateOrderStatusDealing(orderInfoDODB);
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
        } catch (Exception e) {
            LOGGER.info("[秒杀订单消费者]消费异常,e={}", LogExceptionWapper.getStackTrace(e));
        }
        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
    }
}

3.7.3 Фактическое хранение второго убийства

Фактическая операция заказа и фактический вычет запасов находятся в одной и той же локальной транзакции.

/**
 * 秒杀订单入库
 * @param orderInfoDO
 * @return
 */
@Transactional(rollbackFor = Exception.class)
@Override
public boolean chargeSecKillOrder(OrderInfoDO orderInfoDO) {
    int insertCount = 0;
    String orderId = orderInfoDO.getOrderId();
    String prodId = orderInfoDO.getProdId();

    // 减库存
    if (!secKillProductService.decreaseProdStock(prodId)) {
        LOGGER.info("[insertSecKillOrder]orderId={},prodId={},下单前减库存失败,下单失败!", orderId, prodId);
        // TODO 此处可给用户发送通知,告知秒杀下单失败,原因:商品已售罄
        return false;
    }
    // 设置产品名称
    SecKillProductDobj productInfo = secKillProductService.querySecKillProductByProdId(prodId);
    orderInfoDO.setProdName(productInfo.getProdName());
    try {
        insertCount = secKillOrderMapper.insertSecKillOrder(orderInfoDO);
    } catch (Exception e) {
        LOGGER.error("[insertSecKillOrder]orderId={},秒杀订单入库[异常],事务回滚,e={}", orderId, LogExceptionWapper.getStackTrace(e));
        String message =
                String.format("[insertSecKillOrder]orderId=%s,秒杀订单入库[异常],事务回滚", orderId);
        throw new RuntimeException(message);
    }
    if (insertCount != 1) {
        LOGGER.error("[insertSecKillOrder]orderId={},秒杀订单入库[失败],事务回滚,e={}", orderId);
        String message =
                String.format("[insertSecKillOrder]orderId=%s,秒杀订单入库[失败],事务回滚", orderId);
        throw new RuntimeException(message);
    }
    return true;
}

4. Резюме и ссылки

резюме

Посмотрите еще раз на блок-схему и посмотрите на исходный код, если вы его не понимаете.

秒杀流程图

Для таких операций, как оплата и логистика после размещения заказа, RocketMQ можно использовать для асинхронной обработки.

Весь код в этой статье взят изИсходный код большого парня

Просто чтобы учиться и изучать реальный бой RocketMQ.

использованная литература