Распределенная транзакция на основе RocketMQ — полный пример

Java

предисловие

Ранее мы говорили, что распределенные транзакции — это сложная техническая проблема. Не существует универсального решения, простых и эффективных средств.

Однако, если наша система не стремится к строгой согласованности, то наиболее часто используемой схемой является окончательная согласованность. Сегодня мы основываемся наRocketMQРеализовать программу новостей согласованности распределенных транзакций.

Код в этой статье — не просто демонстрация.Учитывая некоторые исключения, идемпотентное потребление и очереди недоставленных сообщений, постарайтесь приблизиться к надежным бизнес-сценариям.

Кроме того, в конце книги «RocketMQ Technology Insider» есть анализ процесса обработки ошибок примера кода распределенной транзакции, поэтому длина велика, и я надеюсь, что вы сможете терпеливо его просмотреть.

1. Деловые новости

Здесь я не хочу использовать много текста, чтобы вдаваться в подробности.RocketMQПринцип сообщений о транзакциях, нам нужно понять только две концепции.

  • Полусообщение, полусообщение

Временно недоступенConsumerИзрасходованные сообщения.Producerсообщение было отправлено наBrokerend, но состояние этого сообщения помечается как недоставленное, а сообщение в этом состоянии называется полусообщением. Фактически, сообщения в этом состоянии будут помещены вRMQ_SYS_TRANS_HALF_TOPICпод тему.

когдаProducerПосле того, как терминал подтвердит это дважды, т.е.Commitпосле,Consumerконец может быть потреблен; тогда, если онRollback, сообщение будет удалено и никогда не будет использовано.

  • Проверить статус транзакции

Мы думаем, что это может быть вызвано сетевыми причинами, проблемами приложений и т. д.ProducerТерминал не подтвердил это полусообщение, то в это времяBrokerСервер будет регулярно сканировать эти полусообщения и активно находитьProducerКлиент запрашивает статус сообщения.

Конечно, мы можем настроить, когда сканировать, в том числе сколько раз сканировать, и мы подробно поговорим об этом позже.

короче,RocketMQПринцип реализации сообщений о транзакциях основан на двухэтапной фиксации и просмотре состояния транзакции, чтобы определить, окончательно ли зафиксировано сообщение или отменено.

В этой статье наш код начинается с订单服务、积分服务Например. Исходя из вышеизложенного, общий процесс выглядит следующим образом:

2. Заказать услугу

В сервисе заказов мы получаем фронтальный запрос на создание заказа и сохранение соответствующих данных в локальную базу данных.

1. Таблица журнала транзакций

В сервисе заказов, помимо таблицы заказов, также требуется таблица журнала транзакций. Он определяется следующим образом:

CREATE TABLE `transaction_log` (
  `id` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL COMMENT '事务ID',
  `business` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL COMMENT '业务标识',
  `foreign_key` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL COMMENT '对应业务表中的主键',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;

Эта таблица специально используется для проверки статуса транзакции. При отправке бизнес-данных в эту таблицу также вставляется часть данных, они сосуществуют в локальной транзакции. Запросите таблицу по идентификатору транзакции. Если возвращается запись, это доказывает, что локальная транзакция была зафиксирована; если запись не возвращается, локальная транзакция может находиться в неизвестном состоянии или в состоянии отката.

2. TransactionMQProducer

Мы знаем, что черезRocketMQЧтобы отправить сообщение, вам нужно сначала создать отправителя сообщения. Стоит отметить, что при отправке транзакционного сообщения экземпляр, который мы здесь создаем, должен бытьTransactionMQProducer.

@Component
public class TransactionProducer {
	
    private String producerGroup = "order_trans_group";
    private TransactionMQProducer producer;

    //用于执行本地事务和事务状态回查的监听器
    @Autowired
    OrderTransactionListener orderTransactionListener;
    //执行任务的线程池
    ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 60,
            TimeUnit.SECONDS, new ArrayBlockingQueue<>(50));
            
    @PostConstruct
    public void init(){
        producer = new TransactionMQProducer(producerGroup);
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.setSendMsgTimeout(Integer.MAX_VALUE);
        producer.setExecutorService(executor);
        producer.setTransactionListener(orderTransactionListener);
        this.start();
    }
    private void start(){
        try {
            this.producer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }
    }
    //事务消息发送 
    public TransactionSendResult send(String data, String topic) throws MQClientException {
        Message message = new Message(topic,data.getBytes());
        return this.producer.sendMessageInTransaction(message, null);
    }
}

В приведенном выше коде главное — создать отправителя сообщения о транзакции. Здесь мы ориентируемся наOrderTransactionListener, который отвечает за выполнение локальных транзакций и просмотр состояния транзакций.

3. Прослушиватель транзакций ордеров

@Component
public class OrderTransactionListener implements TransactionListener {

    @Autowired
    OrderService orderService;

    @Autowired
    TransactionLogService transactionLogService;

    Logger logger = LoggerFactory.getLogger(this.getClass());

    @Override
    public LocalTransactionState executeLocalTransaction(Message message, Object o) {
        logger.info("开始执行本地事务....");
        LocalTransactionState state;
        try{
            String body = new String(message.getBody());
            OrderDTO order = JSONObject.parseObject(body, OrderDTO.class);
            orderService.createOrder(order,message.getTransactionId());
            state = LocalTransactionState.COMMIT_MESSAGE;
            logger.info("本地事务已提交。{}",message.getTransactionId());
        }catch (Exception e){
            logger.info("执行本地事务失败。{}",e);
            state = LocalTransactionState.ROLLBACK_MESSAGE;
        }
        return state;
    }

    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
        logger.info("开始回查本地事务状态。{}",messageExt.getTransactionId());
        LocalTransactionState state;
        String transactionId = messageExt.getTransactionId();
        if (transactionLogService.get(transactionId)>0){
            state = LocalTransactionState.COMMIT_MESSAGE;
        }else {
            state = LocalTransactionState.UNKNOW;
        }
        logger.info("结束本地事务状态查询:{}",state);
        return state;
    }
}

проходя черезproducer.sendMessageInTransactionПосле отправки сообщения транзакции, если сообщение отправлено успешно, оно будет вызвано здесьexecuteLocalTransactionспособ выполнения локальной транзакции. Здесь он завершает вставку данных заказа и журнала транзакций.

Этот метод возвращает значениеLocalTransactionStateПредставляет локальное состояние транзакции, это класс перечисления.

public enum LocalTransactionState {
    //提交事务消息,消费者可以看到此消息
    COMMIT_MESSAGE,
    //回滚事务消息,消费者不会看到此消息
    ROLLBACK_MESSAGE,
    //事务未知状态,需要调用事务状态回查,确定此消息是提交还是回滚
    UNKNOW;
}

Так,checkLocalTransactionМетод используется для запроса статуса транзакции. Здесь мы запрашиваем идентификатор транзакцииtransaction_logВ этой таблице, если результат может быть запрошен, будет отправлено сообщение о транзакции, если нет, будет возвращен неизвестный статус.

Обратите внимание, что здесь есть еще одна проблема. Если он возвращает неизвестное состояние,RocketMQ BrokerСервер будет непрерывно проверять с интервалом в 1 минуту, пока не будет достигнуто максимальное количество проверок транзакции.Если статус транзакции не был проверен сверх этого числа, сообщение будет отменено.

Конечно, мы можем настроить частоту и максимальное количество охволожений транзакций. существуетBrokerстороны, вы можете настроить его следующим образом:

brokerConfig.setTransactionCheckInterval(10000); //回查频率10秒一次
brokerConfig.setTransactionCheckMax(3);  //最大检测次数为3

4. Класс бизнес-реализации

@Service
public class OrderServiceImpl implements OrderService {
    @Autowired
    OrderMapper orderMapper;
    @Autowired
    TransactionLogMapper transactionLogMapper;
    @Autowired
    TransactionProducer producer;

    Snowflake snowflake = new Snowflake(1,1);
    Logger logger = LoggerFactory.getLogger(this.getClass());

    //执行本地事务时调用,将订单数据和事务日志写入本地数据库
    @Transactional
    @Override
    public void createOrder(OrderDTO orderDTO,String transactionId){

        //1.创建订单
        Order order = new Order();
        BeanUtils.copyProperties(orderDTO,order);
        orderMapper.createOrder(order);

        //2.写入事务日志
        TransactionLog log = new TransactionLog();
        log.setId(transactionId);
        log.setBusiness("order");
        log.setForeignKey(String.valueOf(order.getId()));
        transactionLogMapper.insert(log);

        logger.info("订单创建完成。{}",orderDTO);
    }

    //前端调用,只用于向RocketMQ发送事务消息
    @Override
    public void createOrder(OrderDTO order) throws MQClientException {
        order.setId(snowflake.nextId());
        order.setOrderNo(snowflake.nextIdStr());
        producer.send(JSON.toJSONString(order),"order");
    }
}

В классе бизнес-обслуживания заказов у ​​нас есть два метода. один дляRocketMQОтправляйте транзакционные сообщения, репозиторий реальных бизнес-данных.

Что касается того, почему это делается, то на самом деле есть некоторые причины, о которых мы поговорим позже.

5. звонок

@RestController
public class OrderController {

    @Autowired
    OrderService orderService;
    Logger logger = LoggerFactory.getLogger(this.getClass());

    @PostMapping("/create_order")
    public void createOrder(@RequestBody OrderDTO order) throws MQClientException {
        logger.info("接收到订单数据:{}",order.getCommodityCode());
        orderService.createOrder(order);
    }
}

6. Резюме

Бизнес-логика сервиса заказов завершена. Подытожим процесс следующим образом:

Если рассматривать исключения, то основные моменты здесь следующие:

  • Первый вызов createOrder отправляет сообщение о транзакции. Если отправка не удалась, что привело к ошибке, будет возвращено исключение, и в это время никакая защита данных не будет задействована.
  • Если сообщение о транзакции отправлено успешно, но при выполнении локальной транзакции возникает исключение, ни данные заказа, ни журнал транзакций не будут сохранены, поскольку они находятся в локальной транзакции.
  • Если локальная транзакция выполняется, но состояние локальной транзакции не возвращается вовремя или возвращается неизвестное состояние. Тогда это будетBrokerРегулярно проверяйте статус транзакций, а затем по таблице журнала транзакций можно определить, выполнен ли заказ, и записать его в базу данных.

Основываясь на этих элементах, мы можем сказать, что согласованность обслуживания заказов и сообщений о транзакциях гарантирована. Затем следующим шагом является то, как точечный сервис может правильно использовать данные заказа и выполнять соответствующие бизнес-операции.

3. Сервис баллов

В точечном сервисе в основном используются данные заказа, а затем в соответствии с содержанием заказа добавляются баллы соответствующим пользователям.

1. Таблица рекордов очков

CREATE TABLE `t_points` (
  `id` bigint(16) NOT NULL COMMENT '主键',
  `user_id` bigint(16) NOT NULL COMMENT '用户id',
  `order_no` bigint(16) NOT NULL COMMENT '订单编号',
  `points` int(4) NOT NULL COMMENT '积分',
  `remarks` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL COMMENT '备注',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;

Здесь мы ориентируемся наorder_noполе, которое является вариантом для достижения идемпотентного потребления.

2. Потребительский старт

@Component
public class Consumer {

    String consumerGroup = "consumer-group";
    DefaultMQPushConsumer consumer;

    @Autowired
    OrderListener orderListener;
    
    @PostConstruct
    public void init() throws MQClientException {
        consumer = new DefaultMQPushConsumer(consumerGroup);
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.subscribe("order","*");
        consumer.registerMessageListener(orderListener);
        consumer.start();
    }
}

Запустить потребителя относительно просто, мы указываем, какой потреблятьtopicи слушатель в порядке.

3. Потребительский слушатель

@Component
public class OrderListener implements MessageListenerConcurrently {

    @Autowired
    PointsService pointsService;
    Logger logger = LoggerFactory.getLogger(this.getClass());

    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {
        logger.info("消费者线程监听到消息。");
        try{
            for (MessageExt message:list) {
                logger.info("开始处理订单数据,准备增加积分....");
                OrderDTO order  = JSONObject.parseObject(message.getBody(), OrderDTO.class);
                pointsService.increasePoints(order);
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }catch (Exception e){
            logger.error("处理消费者数据发生异常。{}",e);
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
    }
}

Прослушав сообщение, вызовите класс бизнес-обслуживания для его обработки. Возврат после завершения обработкиCONSUME_SUCCESSотправить, если обработка не удалась, вернутьRECONSUME_LATERпопробовать еще раз.

4. Увеличьте точки

Здесь главное хранить интегральные данные. Но обратите внимание, что вам нужно выносить суждения перед входом в библиотеку, чтобы достичь идемпотентного потребления.

@Service
public class PointsServiceImpl implements PointsService {

    @Autowired
    PointsMapper pointsMapper;

    Snowflake snowflake = new Snowflake(1,1);
    Logger logger = LoggerFactory.getLogger(this.getClass());

    @Override
    public void increasePoints(OrderDTO order) {
		
        //入库之前先查询,实现幂等
        if (pointsMapper.getByOrderNo(order.getOrderNo())>0){
            logger.info("积分添加完成,订单已处理。{}",order.getOrderNo());
        }else{
            Points points = new Points();
            points.setId(snowflake.nextId());
            points.setUserId(order.getUserId());
            points.setOrderNo(order.getOrderNo());
            Double amount = order.getAmount();
            points.setPoints(amount.intValue()*10);
            points.setRemarks("商品消费共【"+order.getAmount()+"】元,获得积分"+points.getPoints());
            pointsMapper.insert(points);
            logger.info("已为订单号码{}增加积分。",points.getOrderNo());
        }
    }
}

5. Идемпотентное потребление

Есть много способов добиться идемпотентного потребления, как это сделать, зависит от вашей ситуации.

Например, в этом примере мы напрямую связываем номер заказа и запись точек в той же таблице, а перед добавлением точек мы можем проверить, был ли заказ обработан.

В качестве альтернативы мы также можем создать дополнительную таблицу для записи обработки заказа.

Кроме того, вы можете поместить эту информацию непосредственно вredisВ кэше кэш запрашивается перед хранением.

Независимо от того, как это сделать, общая идея состоит в том, чтобы проверить, было ли сообщение обработано, прежде чем выполнять бизнес. Итак, вот проблема первичного ключа данных. В этом примере мы используем номер заказа в качестве первичного ключа, и мы также можем использовать идентификатор транзакции в качестве первичного ключа. Если это обычное сообщение, мы также можем создать уникальное сообщение. ID в качестве первичного ключа.

6. Ненормальное потребление

Мы знаем, что когда потребитель не сможет обработать, он вернетRECONSUME_LATER, пусть сообщение будет повторено, по умолчанию до 16 повторов.

Что же делать, если сообщение не было обработано должным образом по особым причинам?

Мы рассматриваем два пути решения этой проблемы.

Во-первых, установите количество повторных попыток сообщения в коде.Если указанное количество раз будет достигнуто, будет отправлено электронное письмо или SMS, чтобы уведомить бизнес-сторону о ручном вмешательстве в обработку.

@Component
public class OrderListener implements MessageListenerConcurrently {

    Logger logger = LoggerFactory.getLogger(this.getClass());

    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {
        logger.info("消费者线程监听到消息。");
        for (MessageExt message:list) {
            if (!processor(message)){
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }

    /**
     * 消息处理,第3次处理失败后,发送邮件通知人工介入
     * @param message
     * @return
     */
    private boolean processor(MessageExt message){
        String body = new String(message.getBody());
        try {
            logger.info("消息处理....{}",body);
            int k = 1/0;
            return true;
        }catch (Exception e){
            if(message.getReconsumeTimes()>=3){
                logger.error("消息重试已达最大次数,将通知业务人员排查问题。{}",message.getMsgId());
                sendMail(message);
                return true;
            }
            return false;
        }
    }
}

Во-вторых, подождите, пока сообщение повторит максимальное количество попыток, прежде чем попасть в очередь недоставленных сообщений.

Максимальное количество повторных попыток сообщения по умолчанию составляет 16 раз, и мы также можем установить это количество раз на стороне потребителя.

consumer.setMaxReconsumeTimes(3);//设置消息重试最大次数

Название темы очереди недоставленных сообщений:%DLQ% + 消费者组名称, например, в данных заказа мы задаем имя группы потребителей:

String consumerGroup = "order-consumer-group";

Тогда этот потребитель, соответствующее имя темы очереди недоставленных сообщений%DLQ%order-consumer-group

Как показано выше, нам также нужно нажатьTOPIC配置, чтобы изменитьpermсвойство, измените его на 6.

Наконец, вы можете отслеживать эту тему через программный код, чтобы уведомить о ручном вмешательстве или непосредственно просматривать обработку на консоли. Благодаря идемпотентному потреблению и обработке недоставленных сообщений в основном гарантируется, что сообщение будет обработано.

В-четвертых, пример кода в RocketMQ Technology Insider.

У автора есть книга RocketMQ Technology Insider, а в главе 9.4 есть фрагмент кода распределенной транзакции.

Однако, прочитав ее, автор чувствует, что в ней есть проблема с процессом, которая вызовет нестыковки в местных делах, и разберем ее ниже.

Здесь мы в основном фокусируемся на процессе заказа бизнес-класса обслуживания и прослушивателе транзакций в книге.

В книге псевдокод для размещения заказа выглядит следующим образом:

public Map createOrder(){
    Map result = new HashMap();
    //执行下订单相关的业务流程,例如操作本地数据库落库相关代码
    //生成事务消息唯一业务标识,将该业务标识组装到待发送的消息体中,方便消息端进行幂等消费。
    //调用消息客户端API,发送事务prepare消息。
    //返回结果,提交事务
    return result;
}

Вышеупомянутый первый шаг, отправка сообщения о транзакции, вам нужно реализовать TransactionListener, чтобы добиться реализации местных дел и местных дел обратно, чтобы проверить.

public class OrderTransactionListenerImpl implements TransactionListener {
    @Override
    public LocalTransactionState executeLocalTransaction(Message message, Object o) {

        //从消息体中获取业务唯一ID
        String bizUniNo = message.getUserProperty("bizUniNo");
        //将bizUniNo入库,表名:t_message_transaction,表结构 bizUniNo(主键),业务类型。
        return LocalTransactionState.UNKNOW;
    }

    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt message) {
        //从消息体中获取业务唯一ID
        String bizUniNo = message.getUserProperty("bizUniNo");
        //如果本地事务表(t_message_transaction)存在记录,则认为提交;如果不存在返回未知。
        //如果多次回查还是未查到消息,则回滚。
        if (query(bizUniNo)>0){
            return LocalTransactionState.COMMIT_MESSAGE;
        }else{
            return LocalTransactionState.UNKNOW;
        }
    }
    //查询数据库是否存在记录
    public int query(String bizUniNo){
        //select count(1) from t_message_transaction where biz_uni_no = #{bizUniNo}
        return 1;
    }
}

Приведенный выше код скопирован автором в этой книге.Если следовать этому методу, то действительно есть проблема.Давайте проанализируем его.

1. Порядок ненормальный

Рассмотрим приведенный выше псевдокод размещения заказа, который содержит две операции: складирование заказа и отправку сообщения о транзакции.

Итак, продолжаем думать:

  • Если возникнет исключение при помещении заказа на склад, это не проблема, т.к. сообщение о транзакции отправлено не будет;
  • Если складирование заказа завершено, но сообщение о транзакции отправлено и сообщается об ошибке. Это не проблема, данные заказа будут откатываться;
  • Если выполнение складирования заказа завершено, то ошибки в отправке сообщения о транзакции нет. но возвращается неSEND_OKСтатус, это проблематично.

Поскольку только сообщение транзакции отправки является успешным, а статус отправкиSEND_OK, будет выполнена локальная транзакция в слушателе, чтобыt_message_transactionТаблицы записываются в журнал транзакций.

Дальше будет сцена: введены локальные данные заказа, а потому возврата нетSEND_OKсостояние, из-за чего журнал транзакций в локальной транзакции не выполняется. Потом это сообщение о транзакции будет рано или поздно откатываться.Последняя проблема заключается в том, что пользователь успешно разместил заказ, но баллы не начислились.

2. Исключение выполнения локальной транзакции

На самом деле, первую проблему тоже можно обойти. То есть после отправки сообщения о транзакции оценивается, является ли состояние отправкиSEND_OK, если нет, откатить данные заказа, выдав исключение.

Однако есть вторая проблема:

Что делать, если данные заказа и сообщения о транзакциях отправляются, но что делать при выполнении локальных транзакций?

Если это так, это также приведет к хранению данных локального заказа, но журнал транзакций не был записан. Эта запись не может быть запрошена, когда статус транзакции проверяется, и, наконец, сообщение транзакции может быть только откатывается только Отказ Конечное явление также является то, что пользователь успешно разместил заказ, но не увеличил точки.

Но в книге у автора есть такой отрывок:

executeLocalTransaction, этот метод в основном устанавливает состояние локальной транзакции в транзакции с бизнес-кодом. Например, вOrderService#createOrder, если локальная транзакция успешно зафиксирована, этот метод также будет успешно зафиксирован. Поэтому здесь в основном кt_message_transactionДобавьте запись, и когда транзакция проверяется, если есть запись, считается, что сообщение нужно отправить.

Я понимаю, что автор имеет в виду в этом отрывке то, что все они касаются местного дела. еслиcreateOrderСпособ успешно выполняется, тоexecuteLocalTransactionСпособ также успешно выполняется; если какая-либо сторона не удается, транзакция возвращается назад.

Однако, если мы анализируем из исходного кода, если локальное выполнение транзакции сообщает об ошибке, данные заказа не будут откатываться.

3. Анализ исходного кода

Во-первых, нам нужно знать,executeLocalTransactionМетоды иcreateOrderМетод действительно находится в транзакции.

Это потому чтоexecuteLocalTransactionМетоды вызываются синхронно после отправки сообщения транзакции, поэтому они находятся в транзакции.

Давайте посмотрим на процесс отправки сообщений о транзакциях в исходном коде:

public TransactionSendResult sendMessageInTransaction(Message msg, 
                        LocalTransactionExecuter localTransactionExecuter, 
                        Object arg)throws MQClientException {
	
    //发送事务消息返回结果
    SendResult sendResult = null;
    //如果发送消息失败,抛出异常
    try {
    	sendResult = this.send(msg);
    } catch (Exception var11) {
    	throw new MQClientException("send message Exception", var11);
    }
    //初始化本地事务状态:未知状态
    LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
    Throwable localException = null;
    switch(sendResult.getSendStatus()) {
    //如果发送事务消息状态为send_ok
    case SEND_OK:
        try {
            //执行本地事务方法
            if (transactionListener != null) {
                this.log.debug("Used new transaction API");
                localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
            }
        } catch (Throwable var10) {
            this.log.info("executeLocalTransactionBranch exception", var10);
            this.log.info(msg.toString());
            localException = var10;
        }
        break;
	//如果发送事务状态不是send_ok,该事务消息会被回滚
	case FLUSH_DISK_TIMEOUT:
	case FLUSH_SLAVE_TIMEOUT:
	case SLAVE_NOT_AVAILABLE:
	    localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
	}
	//结束事务,就是根据本地事务状态,执行提交、回滚或暂不处理事务
	try {
	    this.endTransaction(sendResult, localTransactionState, localException);
	} catch (Exception var9) {
	    this.log.warn("", var9);
	}
    TransactionSendResult transactionSendResult = new TransactionSendResult();
    transactionSendResult.setSendStatus(sendResult.getSendStatus());
    transactionSendResult.setMessageQueue(sendResult.getMessageQueue());
    transactionSendResult.setMsgId(sendResult.getMsgId());
    transactionSendResult.setQueueOffset(sendResult.getQueueOffset());
    transactionSendResult.setTransactionId(sendResult.getTransactionId());
    transactionSendResult.setLocalTransactionState(localTransactionState);
    return transactionSendResult;
}

Приведенный выше код представляет собой процесс отправки сообщений о транзакциях. Давайте сосредоточимся на этом, если сообщение о транзакции отправлено успешно и статус возвратаSEND_OK, затем запустите прослушиватель вexecuteLocalTransactionметоды, что означает, что они находятся в транзакции.

Однако во время выполнения он вручную ловитThrowableаномальный. Это означает, что даже если выполнение локальной транзакции завершится ошибкой, откат не будет запущен.

До сих пор мы очень ясно давали понять, что если мы напишем код в соответствии с процессом, описанным в книге, эта часть станет скрытой опасностью.

Если мы хотим избежать этой проблемы, мы можем только изменитьrocket-clientкод, например:

try {
    //执行本地事务方法
    if (transactionListener != null) {
        this.log.debug("Used new transaction API");
        localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
    }
} catch (Throwable var10) {
    this.log.info("executeLocalTransactionBranch exception", var10);
    this.log.info(msg.toString());
    localException = var10;
    throw new MQClientException(e.getMessage(),e);
}   

Автор модифицировал исходный код и протестировал его, так же возможно и генерирование исключений вручную. Таким образом, если при выполнении локальной транзакции произойдет ошибка, данные заказа также будут отброшены.

На данный момент вы можете ответить на вопрос в главе 2.4 этой статьи:

Почему в заказе бизнес-услуги должен иметь два метода. Один используется для отправки сообщений транзакций в RocketMQ, а другой используется для хранения реальных бизнес-данных.

Суммировать

Эта статья посвященаRocketMQСлучай распределенной транзакции для достижения окончательной согласованности.

Кроме того, я также рассказал об образце кода распределенной транзакции в книге «RocketMQ Technology Insider» и возможных нештатных проблемах. По этому пункту я также надеюсь, что друзья с разными взглядами будут активно оставлять сообщения и общаться друг с другом.