Анализ исходного кода системы распределенных транзакций Seata в режиме TCC

задняя часть исходный код

Что такое Сеата

SeataЭто платформа для распределенных транзакций, недавно открытая компанией Alibaba.github.com/seata/seata. Структура включает в себя группуTXC(Облачная версия называетсяGTS) и Ant FinancialTCCДва режима, всего несколько месяцевGithubВверхstarИх число приближается к 10 000, и в настоящее время это единственное решение для распределенных транзакций, одобренное крупными производителями.

TXCсуществуетSeataон жеATРежим, что означает, что метод компенсации автоматически генерируется платформой, которая полностью экранирована для пользователей. Пользователи могут использовать распределенные транзакции, как если бы они использовали локальные транзакции. Недостатком является то, что поддерживаются только реляционные базы данных (в настоящее время поддерживаютсяMySQL), представляяSeata ATСлужбе требуется локальное хранилище таблицrollback_info, Уровень изоляции по умолчаниюRUПрименимые сценарии ограничены.

TCCЭто не новая концепция, она существует уже давно, пользователи определяют ее поtry/confirm/cancelТри метода имитируют двухфазную фиксацию на уровне приложения, разница в том, что в TCCtryЭтот метод также должен управлять базой данных для блокировки ресурсов.Последующие два метода компенсации автоматически вызываются платформой для выполнения отправки и отката ресурсов соответственно, что аналогично простому уровню хранения.2PCНе совсем то же самое. Муравей ФинансовыйSeataвнес свой вкладTCCРеализация, говорят, что он развивался более десяти лет и широко используется в финансах, торговле, складировании и других областях.

История рождения распределенных транзакций

Ранние приложения имели единую архитектуру. Например, учетная запись, сумма и система заказов, связанные с платежными услугами, обрабатывались одним приложением. Нижний уровень имел доступ к одному и тому же экземпляру базы данных, а операции с естественными транзакциями также были локальными транзакциями.SpringЭто может быть легко реализовано, однако из-за увеличения масштабов один сервис необходимо разделить на три независимых сервиса.RPCКогда вызов сделан, данные также существуют в разных экземплярах базы данных.Поскольку бизнес-операция включает в себя изменение нескольких данных базы данных в это время, больше нельзя полагаться на локальные транзакции и можно решить только через структуру распределенных транзакций. .

TCC — это решение для распределенных транзакций, оно относится к типу гибкой компенсации, преимущество в том, что оно простое для понимания и толькоtryФазовая блокировка обеспечивает лучшую производительность параллелизма, но недостатком является стоимость преобразования кода.

Что такое TCC В этой статье мы не будем вдаваться в подробности, сама концепция TCC не сложна.

Как использовать SeataTCC

Прежде чем анализировать исходный код, мы кратко упомянем следующееSeata TCCИспользование шаблона полезно для последующего понимания всейTCCПроцесс.

Участники Seata TCC

SeataсерединаTCCТребования режимаTCCУчастники сервиса добавляются на интерфейс@TwoPhaseBusinessActionпримечание, примечаниеTCCимя интерфейса (глобально уникальное),TCCинтерфейсconfirmиcancelИмя метода, используемого для последующих вызовов отражения фреймворка, приведено ниже.TCCПример интерфейса:

public interface TccAction {
    @TwoPhaseBusinessAction(name = "yourTccActionName", commitMethod = "confirm", rollbackMethod = "cancel")
    public boolean try(BusinessActionContext businessActionContext, int a, int b);
    public boolean confirm(BusinessActionContext businessActionContext);
    public boolean cancel(BusinessActionContext businessActionContext);
}

Затем определите класс реализацииImplРеализуйте этот интерфейс, чтобы предоставить конкретные реализации для трех методов. Наконец, служба участника публикуется и регистрируется на удаленном конце, в основном для последующих действий.SeataФреймворк обращается к участникамconfirmилиcancelметод закрывает циклTCCДела.

Инициатор Seata TCC

Seata TCCИнициатор похож на нашу картинку вышеpayment service, участникам необходимо увеличить бизнес-метод@GlobalTransactionalАннотация, используемая для открытия глобальной транзакции регистрации аспекта, вызываемой в бизнес-методе.TCCнекоторые из сторонtryметод, как только бизнес-метод будет успешно вызван,SeataFramework будет информироватьTCПерезвоните эти партииconfirmиcancelметод.

Анализ исходного кода

SeataсерединаTCCИсходный код паттерна не сложен, в основном он ориентирован на:

module class Функции
seata-spring GlobalTransactionalInterceptor.class Глобальная логика аспекта транзакции, включая регистрацию глобальных транзакций и получение XID
seata-spring TccActionInterceptor.class Логика аспекта участника TCC
seata-tcc TCCResourceManager.class Разобрать TCC Bean и сохранить ресурсы TCC для последующих обратных вызовов.
seata-tcc ActionInterceptorHandler.class Реализация регистрации транзакций филиала ТСС
seata-server DefaultCoordinator.класс, FileTransactionStoreManager.класс В основном реализация TC, хранение транзакций и т.д.

Подпишитесь на ресурсы TCC

SeataОдинTCCинтерфейс называетсяTCC Resources, структура которого следующая:

public class TCCResource implements Resource {

    private String resourceGroupId = "DEFAULT";

    private String appName;

    private String actionName; // TCC 接口名称     

    private Object targetBean; // TCC Bean

    private Method prepareMethod; // try 方法

    private String commitMethodName;

    private Method commitMethod; // confirm 方法

    private String rollbackMethodName;

    private Method rollbackMethod; // cancel 方法

    // …… 省略
}

SeataРазбор для существования в приложенииTCC Bean, затем черезparserRemotingServiceInfoметод созданияTCCResourceобъект, а затем вызватьTCCResourceManagerКатегорияregisterResourceметод, будетTCCResourceОбъекты сохраняются локальноtccResourceCache, этоConcurrentHashMapструктура, при прохожденииRmRpcClientвTCCResourceизresourceId,addressПодождите, пока информация будет зарегистрирована на сервере для удобства последующих действий.TCпройти черезRPCПерезвоните по правильному адресу.

// 解析 TCCResource 的部分代码
Class<?> interfaceClass = remotingBeanDesc.getInterfaceClass();
Method[] methods = interfaceClass.getMethods();
if(isService(bean, beanName)){
    try {
        // 如果是 TCC service Bean,解析并注册该 resource
        Object targetBean = remotingBeanDesc.getTargetBean();
        for(Method m : methods){
            TwoPhaseBusinessAction twoPhaseBusinessAction = m.getAnnotation(TwoPhaseBusinessAction.class);
            if(twoPhaseBusinessAction != null){
                // 如果有 TCC 参与方注解,定义一个 TCCResource,
                TCCResource tccResource = new TCCResource();
                tccResource.setActionName(twoPhaseBusinessAction.name());
                // TCC Bean
                tccResource.setTargetBean(targetBean); 
                // try 方法
                tccResource.setPrepareMethod(m); 
                // confirm 方法名称
                tccResource.setCommitMethodName(twoPhaseBusinessAction.commitMethod());
                // confirm 方法对象
                tccResource.setCommitMethod(ReflectionUtil.getMethod(interfaceClass, twoPhaseBusinessAction.commitMethod(), new Class[]{BusinessActionContext.class}));
                // cancel 方法名称
                tccResource.setRollbackMethodName(twoPhaseBusinessAction.rollbackMethod());
                // cancel 方法对象
                tccResource.setRollbackMethod(ReflectionUtil.getMethod(interfaceClass, twoPhaseBusinessAction.rollbackMethod(), new Class[]{BusinessActionContext.class}));
                // 调用到 TCCResourceManager 的 registerResource 方法
                DefaultResourceManager.get().registerResource(tccResource);
            }
        }
    }catch (Throwable t){
        throw new FrameworkException(t, "parser remting service error");
    }
}

Давайте взглянемTCCResourceManagerизregisterResourceРеализация метода:

// 内存中保存的 resourceId 和 TCCResource 的映射关系
private Map<String, Resource> tccResourceCache = new ConcurrentHashMap<String, Resource>();

@Override
public void registerResource(Resource resource) {
    TCCResource tccResource = (TCCResource) resource;
    tccResourceCache.put(tccResource.getResourceId(), tccResource);
    // 调用父类的方法通过 RPC 注册到远端
    super.registerResource(tccResource);
}

ПосмотримTCCResourceКак зарегистрироваться на сервере:

public void registerResource(Resource resource) {
    // 拿到 RmRpcClient 实例,调用其 registerResource 方法
    RmRpcClient.getInstance().registerResource(resource.getResourceGroupId(), resource.getResourceId());
}

public void registerResource(String resourceGroupId, String resourceId) {
    if (LOGGER.isInfoEnabled()) {
        LOGGER.info("register to RM resourceId:" + resourceId);
    }
    synchronized (channels) {
        for (Map.Entry<String, Channel> entry : channels.entrySet()) {
            String serverAddress = entry.getKey();
            Channel rmChannel = entry.getValue();
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info("register resource, resourceId:" + resourceId);
            }
            // 注册 resourceId,远端将其解析为一个 RpcContext 保存在内存中
            sendRegisterMessage(serverAddress, rmChannel, resourceId);
        }
    }
}

GlobalTransaction регистрирует глобальные транзакции

GlobalTransactionАннотация — это точка входа в глобальную транзакцию, а ее аспектная логика реализована вGlobalTransactionalInterceptorв классе. Если принято решение войти@GlobalTransactionМодифицированный метод вызоветhandleGlobalTransactionМетод входит в логику аспекта, где ключевой методtransactionalTemplateизexecuteметод.

public Object execute(TransactionalExecutor business) throws Throwable {
    
    // 如果上游已经有 xid 传过来说明自己是下游,直接参与到这个全局事务中就可以,不必新开一个,角色是 Participant
    // 如果上游没有 xid 传递过来,说明自己是发起方,新开启一个全局事务,角色是 Launcher
    GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();

    // …… …… 省略 

    try {

        // 开启全局事务
        beginTransaction(txInfo, tx);

        Object rs = null;
        try {

            // 调用业务方法
            rs = business.execute();

        } catch (Throwable ex) {

            // 如果抛异常,通知 TC 回滚全局事务
            completeTransactionAfterThrowing(txInfo,tx,ex);
            throw ex;
        }

        // 如果不抛异常,通知 TC 提交全局事务
        commitTransaction(tx);

        return rs;
    } 

    // …… …… 省略
}

beginTransactionметод называетсяtransactionManagerизbeginметод:

// 客户端
@Override
public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
    throws TransactionException {
    GlobalBeginRequest request = new GlobalBeginRequest();
    request.setTransactionName(name);
    request.setTimeout(timeout);
    // 发送 RPC,获取 TC 下发的 xid
    GlobalBeginResponse response = (GlobalBeginResponse)syncCall(request);
    return response.getXid();
}

// 服务端
@Override
public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
    throws TransactionException {
    // 全局事务用 GlobalSession 来表示
    GlobalSession session = GlobalSession.createGlobalSession(
        applicationId, transactionServiceGroup, name, timeout);
    session.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
    // 将 GlobalSession 写入文件存储
    session.begin();
    // 返回 UUID 作为全局事务 ID
    return XID.generateXID(session.getTransactionId());
}

TwoPhaseBusinessAction зарегистрировать транзакцию филиала

Когда глобальная транзакция вызывает бизнес-метод, он войдетTCCАспектная логика участников в основном реализуется вTccActionInterceptorкласс, ключевой методactionInterceptorHandlerизproceedметод.

public Map<String, Object> proceed(Method method, Object[] arguments, TwoPhaseBusinessAction businessAction, Callback<Object> targetCallback) throws Throwable {
    
    // …… …… 省略

    // 创建分支事务
    String branchId = doTccActionLogStore(method, arguments, businessAction, actionContext);
    actionContext.setBranchId(branchId);
    
    // 记录方法参数
    Class<?>[] types = method.getParameterTypes();
    int argIndex = 0;
    for (Class<?> cls : types) {
        if (cls.getName().equals(BusinessActionContext.class.getName())) {
            arguments[argIndex] = actionContext;
            break;
        }
        argIndex++;
    }
    
    // …… …… 省略
}

doTccActionLogStoreМетод отвечает за регистрацию транзакций филиала:

// 客户端
protected String doTccActionLogStore(Method method, Object[] arguments, TwoPhaseBusinessAction businessAction, BusinessActionContext actionContext) {
    String actionName = actionContext.getActionName();
    // 拿到全局事务 ID
    String xid = actionContext.getXid();
    
    // …… …… 省略

    try {
        // resourceManager 通过 RPC 向 TC 注册分支事务
        Long branchId = DefaultResourceManager.get().branchRegister(BranchType.TCC, actionName, null, xid, applicationContextStr, null);
        // 拿到 TC 返回的分支事务 ID
        return String.valueOf(branchId);
    }

    // …… …… 省略
}

// 服务端
@Override
public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid,
                            String applicationData, String lockKeys) throws TransactionException {
    GlobalSession globalSession = assertGlobalSession(XID.getTransactionId(xid), GlobalStatus.Begin);
    // 分支事务用 BranchSession 表示,新建一个 BranchSession
    BranchSession branchSession = SessionHelper.newBranchByGlobal(globalSession, branchType, resourceId,
        applicationData, lockKeys, clientId);

    if (!branchSession.lock()) {
        throw new TransactionException(LockKeyConflict);
    }
    try {
        // 将分支事务加入全局事务中,也会写文件
        globalSession.addBranch(branchSession);
    } catch (RuntimeException ex) {
        throw new TransactionException(FailedToAddBranch);
    }
    // 返回分支事务 ID
    return branchSession.getBranchId();
}

Метод компенсации участника обратного звонка TC

Транзакция филиала зарегистрирована, и бизнес-метод успешно вызывается для уведомленияTCЗафиксируйте глобальную транзакцию.

@Override
public void commit() throws TransactionException {
    // 如果是参与者,无需发起提交请求
    if (role == GlobalTransactionRole.Participant) {
        return;
    }
    // 由 TM 向 TC 发出提交全局事务的请求
    status = transactionManager.commit(xid);
}

TCполученный клиентTMизcommitПосле запроса:

@Override
public GlobalStatus commit(String xid) throws TransactionException {
    // 根据 xid 找出 GlobalSession
    GlobalSession globalSession = SessionHolder.findGlobalSession(XID.getTransactionId(xid));
    if (globalSession == null) {
        return GlobalStatus.Finished;
    }
    GlobalStatus status = globalSession.getStatus();

    // 关闭这个 GlobalSession,不让后续的分支事务再注册上来
    globalSession.closeAndClean(); 

    if (status == GlobalStatus.Begin) {
        // 修改状态为提交进行中
        globalSession.changeStatus(GlobalStatus.Committing);
        // 一旦分支事务中存在 TCC,做同步提交,其实 TCC 分支也可以异步提交,要求高性能时可以选择异步
        if (globalSession.canBeCommittedAsync()) {
            asyncCommit(globalSession);
        } else {
            doGlobalCommit(globalSession, false);
        }
    }
    return globalSession.getStatus();
}

doGlobalCommitэто ключевой метод, на котором мы фокусируемся, и мы игнорируем вторичную логику в нем:

@Override
public void doGlobalCommit(GlobalSession globalSession, boolean retrying) throws TransactionException {
    for (BranchSession branchSession : globalSession.getSortedBranches()) {
        
        // …… …… 省略

        try {
            // 调用 DefaultCoordinator 的 branchCommit 方法做分支提交
            // 参数有分支事务 id,resourceId 用来寻找对应的 TCCResource 和补偿方法参数信息
            BranchStatus branchStatus = resourceManagerInbound.branchCommit(branchSession.getBranchType(),
                XID.generateXID(branchSession.getTransactionId()), branchSession.getBranchId(),
                branchSession.getResourceId(), branchSession.getApplicationData());
        }
    }

    // …… …… 省略
}

серверная частьDefaultCoordinatorв классеbranchCommitметод испускаетRPCзапрос, звоните в соответствующуюTCCResourceпровайдер:

@Override
public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId,
                                    String applicationData)
    throws TransactionException {
    
    // …… …… 省略
    // 获取全局事务和分支事务
    GlobalSession globalSession = SessionHolder.findGlobalSession(XID.getTransactionId(xid));
        BranchSession branchSession = globalSession.getBranch(branchId);
    // 根据 resourceId 找到对应的 channel 和 RpcContext 
    BranchCommitResponse response = (BranchCommitResponse)messageSender.sendSyncRequest(resourceId,
        branchSession.getClientId(), request);
    // 返回分支事务提交状态
    return response.getBranchStatus();

    // …… …… 省略
}

Клиент, естественно, получает фиксацию веткиRPCзапрос, а затем локально найти ранее проанализированный и сохраненныйTCCResourceВыполните вызов отражения метода компенсации. Ниже мы перехватываем ключевые шаги для анализа.

@Override
public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) throws TransactionException {
    // 根据 resourceId 找出内存中保留的 TCCResource 对象
    TCCResource tccResource = (TCCResource) tccResourceCache.get(resourceId);
    if(tccResource == null){
        throw new ShouldNeverHappenException("TCC resource is not exist, resourceId:" + resourceId);
    }
    // 获取 targetBean 和相应的 method 对象
    Object targetTCCBean = tccResource.getTargetBean();
    Method commitMethod = tccResource.getCommitMethod();
    try {
        boolean result = false;
        // 取出补偿方法参数信息
        BusinessActionContext businessActionContext = getBusinessActionContext(xid, branchId, resourceId, applicationData);
        // 反射调用补偿方法
        Object ret = commitMethod.invoke(targetTCCBean, businessActionContext);
        // 返回状态
        return result ? BranchStatus.PhaseTwo_Committed:BranchStatus.PhaseTwo_CommitFailed_Retryable;
    }
    // …… …… 省略
}

Транзакционное хранение

Что касается того, как модуль Seata TC выполняет хранение транзакций, в некоторых статьях в Интернете уже подробно объясняется, например,Углубленный анализ универсального решения для распределенных транзакций Seata-Server, поэтому здесь не повторяется.

Стоит упомянуть, что,TCМожно стать узким местом в производительности всей службы распределенных транзакций, так как это сделать高性能и高可用Важно, чтобы текущее хранилищеFile, код также имеет околоDB Store ModeизTODOэлемент, файл сравнивается сDBПроизводительность однозначно лучше, но юзабилити будет чуть хуже.Как это можно гарантированно ждать до доработки?HA ClusterВернитесь после публикации.

Суммировать

весьSeataРамка оTCCЧасть исходного кода не сложна. В этой статье только выбран ключевой код в некоторых классах для отображения, игнорируя некоторую логику суждений и обработку исключений. Автор считает, чтоSeata TCCоTCCТакже стоит обратить внимание на инкапсуляцию исключений и пользовательскую обработку, а также на дизайн различных скрытых точек расширения пользователя.

МуравейSOFA Channelя сделал предыдущийSeata TCC Поделиться Seata TCCЭто также упоминается в объяснении,TCCСложность фреймворка не в нем самом, а в том, как написать хорошийTCCИнтерфейс, если вас интересует эта часть, вы можете щелкнуть ссылку, чтобы узнать о ней больше.

напиши в конце

Это молодой человек в халате программиста, который время от времени обновляется. Не только делиться гиковскими технологиями, но и записывать фейерверки в мире.