Распределенная транзакция — такое простое решение RocketMQ

RocketMQ

предисловие

​ Для более популярной распределенной архитектуры, хотя она дает ряд преимуществ, таких как поддержка высокой степени параллелизма и высокой доступности кластеров. В то же время это также приносит ряд проблем, сегодня мы будем одной из них -Распределенная транзакция.

в традиционномall inСогласованность транзакций одного источника данных в проекте зависит от одномашинной транзакции, но если она поднимается до распределенного проекта, то согласованность транзакции не может быть достигнута только за счет опоры на одномашинную транзакцию. , он основан на распределенных транзакциях.

представлять

В настоящее время основные решения для распределенных транзакций в отрасли можно условно разделить на два типа.

  • сильная консистенция
  • окончательная согласованность

сильная консистенция

​ Основные решения представляют собой 2PC и Tcc, которые подходят для сценариев финансовых транзакций.

окончательная согласованность

​ Основное решение означает, что сообщения о транзакциях RocketMQ подходят для общих сценариев точечного заказа, 1. Например, создать заказ 2. Если заказ успешно создан 3. Увеличить баллы покупателя Независимо от того, что происходит в середине, пока заказ выполнен успешно, баллы покупателя гарантированно увеличиваются. Гарантированная окончательная согласованность

Архитектура реализации

Введение в терминологию

  • ПОЛОВИННОЕ СООБЩЕНИЕ: сообщение о транзакции, также известное как половинное сообщение, указывает, что сообщение находится в состоянии «временно невозможно доставить» и не будет использовано потребителем.После того, как сервер получит ответ фиксации или отката на сообщение от производителя , сообщение будет доставлено в обычном режиме или откатить (отбросить) сообщение
  • RMQ_SYS_TRANS_HALF_TOPIC : после того, как половинное сообщение будет доставлено на сервер Mq, оно будет сохранено в очереди потребления с темой RMQ_SYS_TRANS_HALF_TOPIC.
  • RMQ_SYS_TRANS_OP_HALF_TOPIC: после обработки половинного сообщения путем фиксации или отката оно будет сохранено в очереди с темой RMQ_SYS_TRANS_OP_HALF_TOPIC, что указывает на то, что половинное сообщение обработано.

В RocketMQ основной идеей является **двухэтапная отправка и регулярная проверка**.

Блок-схема выглядит следующим образом:

image-20190726114320498

1. Сначала инициатор транзакции отправляет полусообщение в RocketMQ

2. RocketMQ отвечает инициатору транзакции, и полусообщение успешно отправлено.

3. Инициатор транзакции отправляет локальную транзакцию

4. По результату локальной транзакции полусообщение RocketMQ является фиксацией или откатом.

5. Если уведомление на шаге 4 не получено, RocketMQ проверяет инициатора транзакции.

6. Инициатор транзакции получает уведомление о проверке, чтобы проверить статус локального сообщения.

7. Возвращаем результат проверки в RocketMQ и полу-сообщение коммита/отката в соответствии с результатом

8. Если брокер получает фиксацию, он отправит половинное сообщение из очереди trans_half в реальную бизнес-очередь. Если получен откат или истек срок действия половинного сообщения, оно отправляется в очередь trans_op_half.

9. Если полусообщение зафиксировано, метод подписки на сообщение может прочитать и использовать сообщение.Пока нисходящее потребление не удается и гарантируется повторная попытка, окончательная согласованность сообщения может быть гарантирована.

Проанализируйте возможные сценарии

1. Половинное сообщение успешно отправлено, но локальная транзакция не удалась. Откат — это полусообщение, и нижестоящий бизнес об этом не знает, что нормально.

2. Полусообщение успешно отправлено, и локальная транзакция успешно выполнена. Но шаг 4 информирует брокера о том, что отправка не удалась по сетевым причинам, но у брокера есть механизм опроса для запроса статуса локальной транзакции в соответствии с уникальным идентификатором, тем самым отправляя половину сообщений.

С помощью вышеуказанных шагов реализуется сообщение о транзакции RocketMQ.

Пример

Вот пример, чтобы поговорить о конкретном кодировании реализации распределенных транзакций RocketMQ.

Сценарий: в сценарии заказа после успешной оплаты заказа на соответствующей учетной записи соответствующего покупателя необходимо увеличить баллы. (Пока игнорируйте простой анализ логистических запасов.)

Очевидно две услуги, 1. Сервис заказа 2. Сервис баллов

После того, как пользователь оплатил, 1. Измените статус заказа и оплатил 2. Уведомите пункт службы, чтобы увеличить баллы для соответствующего покупателя.

структура объекта

Заказ

/**
 * @author yukong
 * @date 2019-07-25 15:18
 * 订单  省略其他字段
 */
@Data
public class Order {

    /**
     * 订单号
     */
    private String orderNo;

    /**
     * 买家id
     */
    private Integer buyerId;

    /**
     * 支付状态 0 已支付 1 未支付 2 已超时
     */
    private Integer payStatus;

    /**
     * 下单日期
     */
    private Date createDate;

    /**
     * 金额
     */
    private Long amount;



}

Баллы добавлены записи

/**
 * @author yukong
 * @date 2019-07-25 15:32
 * 积分添加记录表
 */
@Data
public class PointRecord {

    /**
     * 订单号
     */
    private String orderNo;

    /**
     * 用户id
     */
    private Integer userId;

}

Во-первых, нам нужно реализовать бизнес-код, который также должен изменить статус заказа, а затем записать запись о добавлении баллов (которую можно использовать для проверки транзакции, чтобы определить, разрешено ли успешное выполнение локальной транзакции).

/**
 * @author yukong
 * @date 2019-07-25 15:14
 */
@Service("payService")
@Slf4j
public class PayService {


    @Autowired
    private OrderMapper orderMapper;

    @Autowired
    private PointRecordMapper pointRecordMapper;

    /**
     * 支付功能:
     *  如果支付成功 则下游业务 也就是积分服务对应的账号需要增加积分
     *  如果支付失败,则下游业务无感知
     */
    @Transactional(rollbackFor = Exception.class)
    public void pay(String orderNo, Integer buyerId) {
        // 1、构造积分添加记录表
        PointRecord record = new PointRecord();
        record.setOrderNo(orderNo);
        record.setUserId(buyerId);
        // 2、存入数据库
        pointRecordMapper.insert(record);
         // 3、修改订单状态 为已支付
        Order order = new Order();
        order.setOrderNo(orderNo);
        order.setBuyerId(buyerId);
        //4、 更新订单信息
        orderMapper.updateOrder(order);

        log.info("执行本地事务,pay() ");
    }

    public Boolean checkPayStatus(String orderNo) {
        // 根据判断是否有PointRecord这个记录来 确实是否支付成成功 用于事务回查判断本地事务是否执行成功
        return Objects.nonNull(pointRecordMapper.getPointRecordByOrderNo(orderNo));
    }

}

Следующим шагом является реализация кода инициатора транзакции, который также является отправителем полусообщения.

/**
 * @author yukong
 * @date 2019-07-25 14:48
 * 事务消息生产者
 */
@Component
@Slf4j
public class TransactionProducer implements InitializingBean {

    private  TransactionMQProducer producer;

    @Autowired
    private RocketMQProperties rocketMQProperties;

    @Autowired
    private TransactionListener transactionListener;


    /**
     * 构造生产者
     * @throws Exception
     */
    @Override
    public void afterPropertiesSet() throws Exception {

        producer = new TransactionMQProducer(rocketMQProperties.getTransactionProducerGroupName());
        producer.setNamesrvAddr(rocketMQProperties.getNamesrvAddr());
        ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("transaction-thread-name-%s").build();
        ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 5, 60,
                TimeUnit.SECONDS, new ArrayBlockingQueue<>(30), threadFactory);
        producer.setExecutorService(executor);

        producer.setTransactionListener(transactionListener);

        producer.start();

    }

    /**
     * 真正的事物消息发送者
     */
    public void send() throws JsonProcessingException, UnsupportedEncodingException, MQClientException {

        ObjectMapper objectMapper = new ObjectMapper();

        // 模拟接受前台的支付请求
        String orderNo = UUID.randomUUID().toString();
        Integer userId = 1;
        // 构造发送的事务 消息
        PointRecord record = new PointRecord();
        record.setUserId(userId);
        record.setOrderNo(orderNo);

        Message message = new Message(rocketMQProperties.getTopic(), "", record.getOrderNo(),
                objectMapper.writeValueAsString(record).getBytes(RemotingHelper.DEFAULT_CHARSET));

        producer.sendMessageInTransaction(message, null);

        log.info("发送事务消息, topic = {}, body = {}", rocketMQProperties.getTopic(), record);
    }
}

Далее нам нужно реализовать кодирование двухэтапной фиксации сообщения о транзакции и просмотр локального статуса транзакции в сообщении о транзакции.

/**
 * @author yukong
 * @date 2019-07-25 15:08
 * 事务消息 回调监听器
 */
@Component
@Slf4j
public class PointTransactionListener implements TransactionListener {

    @Autowired
    private PayService payService;



    /**
     * 根据消息发送的结果 判断是否执行本地事务
     * @param message
     * @param o
     * @return
     */
    @Override
    public LocalTransactionState executeLocalTransaction(Message message, Object o) {
        // 根据本地事务执行成与否判断 事务消息是否需要commit与 rollback
        ObjectMapper objectMapper = new ObjectMapper();
        LocalTransactionState state = LocalTransactionState.UNKNOW;
        try {
            PointRecord record = objectMapper.readValue(message.getBody(), PointRecord.class);
            payService.pay(record.getOrderNo(), record.getUserId());
            state = LocalTransactionState.ROLLBACK_MESSAGE;
        } catch (UnsupportedEncodingException e) {
            log.error("反序列化消息 不支持的字符编码:{}", e);
            state = LocalTransactionState.ROLLBACK_MESSAGE;
        } catch (IOException e) {
            log.error("反序列化消息失败 io异常:{}", e);
            state = LocalTransactionState.ROLLBACK_MESSAGE;
        }
        return state;
    }

    /**
     * RocketMQ 回调 根据本地事务是否执行成功 告诉broker 此消息是否投递成功
     * @param messageExt
     * @return
     */
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
        ObjectMapper objectMapper = new ObjectMapper();
        LocalTransactionState state = LocalTransactionState.UNKNOW;
        PointRecord record = null;
        try {
            record = objectMapper.readValue(messageExt.getBody(), PointRecord.class);
        } catch (IOException e) {
            log.error("回调检查本地事务状态异常: ={}", e);

        }
        try {
            //根据是否有transaction_id对应转账记录 来判断事务是否执行成功
            boolean isCommit = payService.checkPayStatus(record.getOrderNo());
            if (isCommit) {
                state = LocalTransactionState.COMMIT_MESSAGE;
            } else {
                state = LocalTransactionState.ROLLBACK_MESSAGE;
            }
        } catch (Exception e) {
            log.error("回调检查本地事务状态异常: ={}", e);
        }
        return state;

    }
}

Таким образом, мы достигаем конечной согласованности распределенных транзакций.

Конкретный код потребителя не записывается, пока восходящая локальная транзакция выполняется успешно, а сообщение транзакции успешно доставлено в соответствующую тему, поэтому нижестоящий бизнес не знает о восходящем направлении, поэтому потребителю нужно только обеспечить идемпотентность.

Сегодня мы познакомимся с решением окончательной согласованности распределенных транзакций, реализованным RockerMQ.Заинтересованные дети могут подумать о том, как RocketMQ реализует фиксацию/откат полусообщения и как реализовать механизм проверки транзакции. Далее мы продолжим разбирать два других решения для распределенных транзакций.Если вас интересует детская обувь, то можете обратить внимание на мой официальный аккаунт.