Углубленный анализ универсального решения для распределенных транзакций Seata-Client

Java

1. Предпосылки

В предыдущих статьях было представлено общее введение Seata, как его использовать и принципиальный анализ Seata-Server.Если вам интересно, вы можете прочитать следующие статьи:

В этой статье будут представлены две другие важные роли в Seata.TM(менеджер по сделкам) иRM(Исследователь), сначала давайте посмотрим на следующую картинку:

предыдущая статья дляTCПринцип подробно описан, для ТМ и РМ мы видим, что они принадлежатclientроли, их соответствующие функции заключаются в следующем:

  • TM(Диспетчер транзакций): используется для управления всей распределенной транзакцией и инициирования глобальных транзакций.Begin/Commit/Rollback.
  • RM(资源管理器): используется для регистрации вашей собственной транзакции филиала, принятьTCизCommitилиRollbackпросить.

2.Seata-Spring

Сначала давайте представим некоторыеSeata-clientсерединаSpringмодуль,SeataЧерез этот модуль к себеTMа такжеRMВыполните инициализацию и сканирование аннотаций для режима AT и режима TCC, а также инициализируйте ресурсы, необходимые для этих режимов. существуетSeataодин из проектовspringмодуль, который содержит наши иspringродственная логика,GlobalTransactionScannerэто основной класс:

public class GlobalTransactionScanner extends AbstractAutoProxyCreator implements InitializingBean,ApplicationContextAware,
        DisposableBean

Приведенный выше код является определением класса, в первую очередь он наследуетAbstractAutoProxyCreatorДостигнутоwrapIfNecessaryметод реализует аспектный прокси для нашего метода, который реализуетInitializingBeanИнтерфейс используется для инициализации нашего клиента, который реализуетApplicationContextAwareчтобы спасти нашуspringконтейнер, реализованныйDisposableBeanдля плавного выключения.

Сначала взгляните на wrapIfNecessary, который наследует реализацию AbstractAutoProxyCreator.

    protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
        if (PROXYED_SET.contains(beanName)) {
            return bean;
        }
        interceptor = null;
        //check TCC proxy
        if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {
            //TCC interceptor, proxy bean of sofa:reference/dubbo:reference, and LocalTCC
            interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));
        } else {
            Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);
            Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);
            if (!existsAnnotation(new Class[]{serviceInterface})
                    && !existsAnnotation(interfacesIfJdk)) {
                return bean;
            }
            if (interceptor == null) {
                interceptor = new GlobalTransactionalInterceptor(failureHandlerHook);
            }
        }
        if (!AopUtils.isAopProxy(bean)) {
            bean = super.wrapIfNecessary(bean, beanName, cacheKey);
        } else {
            AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);
            Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));
            for (Advisor avr : advisor) {
                advised.addAdvisor(0, avr);
            }
        }
        PROXYED_SET.add(beanName);   
        return bean;
    }
  • Шаг 1: проверьте текущийbeanNameРазобрались ли с этим? Если с этим разобрались на этот раз, с ним не разберутся.
  • Шаг 2: По аннотации найдите соответствующий режимInteceptor, тут три случая первыйTCC, второй - перехватчик глобальной ТМ управления транзакциями, третий - нет аннотации, если нет, то вернитесь напрямую.
  • Шаг 3: Поместите соответствующийinterceptorдобавить к текущемуBean.

Тогда посмотрите изInitializingBeanреализовано вafterPropertiesSet, это верноSeataинициализация:

    public void afterPropertiesSet() {
        initClient();

    }
    private void initClient() {
     
        //init TM
        TMClient.init(applicationId, txServiceGroup);
        //init RM
        RMClient.init(applicationId, txServiceGroup);
        registerSpringShutdownHook();
    }
    private void registerSpringShutdownHook() {
        if (applicationContext instanceof ConfigurableApplicationContext) {
            ((ConfigurableApplicationContext) applicationContext).registerShutdownHook();
            ShutdownHook.removeRuntimeShutdownHook();
        }
        ShutdownHook.getInstance().addDisposable(TmRpcClient.getInstance(applicationId, txServiceGroup));
        ShutdownHook.getInstance().addDisposable(RmRpcClient.getInstance(applicationId, txServiceGroup));
    }    

Логика приведенного выше кода относительно ясна:

  • Шаг 1: ИнициализироватьTMклиент, сюда будут отправленыServerзарегистрироватьTM.
  • Шаг 2: ИнициализироватьRMКлиент, здесь будет зарегистрирован сервер с серверомRM.
  • Шаг 3: ЗарегистрируйтесьShutdownHook, за которым последуетTMа такжеRMЗакройте изящно.

Обратите внимание, что при инициализации здесь будут инициализированы два клиента, а именноTMклиент иRMклиент, думают многиеTMа такжеRMЭто тот же клиент, который используется, поэтому вам нужно обратить внимание на это.

2.1 Interceptor

В первой части приведенной выше логики мы видим, что у нас есть два бизнес-ядра.Interceptor,одинGlobalTransactionalInterceptorИспользуется для управления глобальными транзакциями (открытие, фиксация, откат), другойTccActionInterceptorИспользуется для обработки режима TCC. Знакомые с Seata друзья спросят про режим AT, почему только режим TCC, здесь режим AT представляет собой автоматическую обработку транзакций, нам не нужны аспекты

2.1.1 GlobalTransactionalInterceptor

Сначала взгляните на GlobalTransactionalInterceptor#invoke:

    public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
        Class<?> targetClass = (methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null);
        Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);
        final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);

        final GlobalTransactional globalTransactionalAnnotation = getAnnotation(method, GlobalTransactional.class);
        final GlobalLock globalLockAnnotation = getAnnotation(method, GlobalLock.class);
        if (globalTransactionalAnnotation != null) {
            return handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation);
        } else if (globalLockAnnotation != null) {
            return handleGlobalLock(methodInvocation);
        } else {
            return methodInvocation.proceed();
        }
    }
  • Шаг 1: Получите оригинал из прокси-классаMethod
  • Шаг 2: получитьMethodаннотации в
  • Шаг 3: Если есть@GlobalTransactionalАннотация выполняет логику аспекта handleGlobalTransaction, которая также является логикой нашей глобальной транзакции.
  • Шаг 4: Если есть@GlobalLockАннотация, выполняется логика аспекта handleGlobalLock. Эта аннотация используется для некоторой блокировки базы данных, отличной от режима AT. После добавления этой аннотации он будет запрашивать, заблокированы ли соответствующие данные перед выполнением инструкции Sql, но не присоединится к глобальной транзакции. .

handleGlobalTransactionЛогика следующая:

    private Object handleGlobalTransaction(final MethodInvocation methodInvocation,
                                           final GlobalTransactional globalTrxAnno) throws Throwable {

        return transactionalTemplate.execute(new TransactionalExecutor() {

            @Override
            public Object execute() throws Throwable {
                return methodInvocation.proceed();
            }

        });
    }
    TransactionalTemplate#execute
        public Object execute(TransactionalExecutor business) throws Throwable {
        // 1. get or create a transaction
        GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();
        // 1.1 get transactionInfo
        TransactionInfo txInfo = business.getTransactionInfo();
        if (txInfo == null) {
            throw new ShouldNeverHappenException("transactionInfo does not exist");
        }
        try {
            // 2. begin transaction
            beginTransaction(txInfo, tx);
            Object rs = null;
            try {
                // Do Your Business
                rs = business.execute();

            } catch (Throwable ex) {
                // 3.the needed business exception to rollback.
                completeTransactionAfterThrowing(txInfo,tx,ex);
                throw ex;
            }
            // 4. everything is fine, commit.
            commitTransaction(tx);
            return rs;
        } finally {
            //5. clear
            triggerAfterCompletion();
            cleanUp();
        }
    }

существуетhandleGlobalTransactionЛейтенант даст конкретную реализациюTransactionalTemplate#executeсделано, конкретные шаги заключаются в следующем:

  • Шаг 1: Получите текущую глобальную транзакцию, если нет, создайте ее.
  • Шаг 2: Получите информацию о транзакциях в бизнесе, включая время ожидания и т. д.
  • Шаг 3: Откройте глобальную транзакцию
  • Шаг 4: Если для обработки исключения было создано исключение, выполните откат.
  • Шаг 5: Если исключения нет, зафиксируйте глобальную транзакцию.
  • Шаг 6: Очистите текущую информацию о контексте транзакции.

2.1.2 TccActionInterceptor

Давайте сначала посмотрим, как используется TccActionInterceptor:

    @TwoPhaseBusinessAction(name = "TccActionOne" , commitMethod = "commit", rollbackMethod = "rollback")
    public boolean prepare(BusinessActionContext actionContext, int a);

    public boolean commit(BusinessActionContext actionContext);
    
    public boolean rollback(BusinessActionContext actionContext);

Вообще говоря, будет определено три метода, один — это метод try этапа, а другой — фиксация и откат второго этапа.Первый параметр каждого метода — контекст нашей транзакции.Здесь нам не нужно заботиться что он будет самодостаточным в нашем аспекте.

Далее давайте посмотрим, как обрабатываются перехватчики, связанные с TCC:

public Object invoke(final MethodInvocation invocation) throws Throwable {
		Method method = getActionInterfaceMethod(invocation);
		TwoPhaseBusinessAction businessAction = method.getAnnotation(TwoPhaseBusinessAction.class);	
		//try method
	    if(businessAction != null) {
			if(StringUtils.isBlank(RootContext.getXID())){
				//not in distribute transaction
				return invocation.proceed();
			}
	    	Object[] methodArgs = invocation.getArguments();
	    	//Handler the TCC Aspect
			Map<String, Object> ret = actionInterceptorHandler.proceed(method, methodArgs, businessAction, new Callback<Object>(){
				@Override
				public Object execute() throws Throwable {
					return invocation.proceed();
				}
	    	});
	    	//return the final result
	    	return ret.get(Constants.TCC_METHOD_RESULT);
	    }
		return invocation.proceed();
	}
  • Шаг 1: Получите оригиналMethod.
  • Шаг 2: Определите, входит ли он в глобальную транзакцию, то есть выполняется ли самый внешний уровень всей логической службыGlobalTransactionalInterceptor. Если он больше не выполняется напрямую.
  • Шаг 3: ВыполнитьTCCАспект, основная логика находится вactionInterceptorHandler#proceedсередина.

посмотри сноваactionInterceptorHandler#proceedСюда:

 public Map<String, Object> proceed(Method method, Object[] arguments, TwoPhaseBusinessAction businessAction, Callback<Object> targetCallback) throws Throwable {
		Map<String, Object> ret = new HashMap<String, Object>(16);
		
		//TCC name
        String actionName = businessAction.name();
        String xid = RootContext.getXID();
        BusinessActionContext actionContext = new BusinessActionContext();
        actionContext.setXid(xid);
        //set action anme
        actionContext.setActionName(actionName)

        //Creating Branch Record
        String branchId = doTccActionLogStore(method, arguments, businessAction, actionContext);
        actionContext.setBranchId(branchId);
        
        //set the parameter whose type is BusinessActionContext
        Class<?>[] types = method.getParameterTypes();
        int argIndex = 0;
        for (Class<?> cls : types) {
            if (cls.getName().equals(BusinessActionContext.class.getName())) {
            	arguments[argIndex] = actionContext;
                break;
            }
            argIndex++;
        }
        //the final parameters of the try method
        ret.put(Constants.TCC_METHOD_ARGUMENTS, arguments);
        //the final result
        ret.put(Constants.TCC_METHOD_RESULT, targetCallback.execute());
        return ret;
	}

  • Шаг 1: Получите некоторую информацию о транзакции, такую ​​какTCCимя, эта транзакцияXIDЖдать.
  • Шаг 2: СоздайтеBranchтранзакции, одна местнаяcontextпоместите это в контекстcommitа такжеrollbackинформация хранится, другая для нашегоSeata-ServerЗарегистрируйте транзакции филиала для последующего управления.
  • Шаг 3: Заполните параметры метода, который является нашимBusinessActionContext.

2.2 Резюме

Было проанализировано несколько общих компонентов Spring.Основных классов в основном три, одинScanner,дваInterceptor. В целом это относительно просто. По сути, то, что делает Spring, это некоторая инициализация нашего клиента. Далее, давайте более подробно рассмотрим роль TM.

3. Менеджер транзакций ТМ

В предыдущей главе мы говорили оGlobalTransactionalInterceptorВ этом аспекте перехватчика мы знаем, что этот перехватчик делает то, что должен делать наш TM, например, открытие транзакции, фиксацию транзакции и откат транзакции. Это только отправная точка нашей общей логики. Конкретная клиентская логика находится в нашем DefaultTransactionManager. Код в этом классе выглядит следующим образом:

public class DefaultTransactionManager implements TransactionManager {

    @Override
    public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
        throws TransactionException {
        GlobalBeginRequest request = new GlobalBeginRequest();
        request.setTransactionName(name);
        request.setTimeout(timeout);
        GlobalBeginResponse response = (GlobalBeginResponse)syncCall(request);
        return response.getXid();
    }

    @Override
    public GlobalStatus commit(String xid) throws TransactionException {
        GlobalCommitRequest globalCommit = new GlobalCommitRequest();
        globalCommit.setXid(xid);
        GlobalCommitResponse response = (GlobalCommitResponse)syncCall(globalCommit);
        return response.getGlobalStatus();
    }

    @Override
    public GlobalStatus rollback(String xid) throws TransactionException {
        GlobalRollbackRequest globalRollback = new GlobalRollbackRequest();
        globalRollback.setXid(xid);
        GlobalRollbackResponse response = (GlobalRollbackResponse)syncCall(globalRollback);
        return response.getGlobalStatus();
    }

    @Override
    public GlobalStatus getStatus(String xid) throws TransactionException {
        GlobalStatusRequest queryGlobalStatus = new GlobalStatusRequest();
        queryGlobalStatus.setXid(xid);
        GlobalStatusResponse response = (GlobalStatusResponse)syncCall(queryGlobalStatus);
        return response.getGlobalStatus();
    }

    private AbstractTransactionResponse syncCall(AbstractTransactionRequest request) throws TransactionException {
        try {
            return (AbstractTransactionResponse)TmRpcClient.getInstance().sendMsgWithResponse(request);
        } catch (TimeoutException toe) {
            throw new TransactionException(TransactionExceptionCode.IO, toe);
        }
    }
}

существуетDefaultTransactionManagerОбщая логика относительно проста, и есть четыре метода:

  • beginServerположить началоGlobalBeginRequestЗапрос на запуск глобальной транзакции.
  • commitServerположить началоGlobalCommitRequestЗапрос на совершение глобальной транзакции.
  • rollbackServerположить началоGlobalRollbackRequestЗапрос на откат глобальной транзакции.
  • getStatusServerположить началоGlobalStatusRequestЗапрос на запрос глобальной информации о статусе транзакции.

4. Обозреватель РМ

существуетSeataтекущий менеджментRMЕсть два режима: одинATРежим, который требует поддержки транзакционной базы данных, будет автоматически записывать моментальный снимок до модификации и моментальный снимок после модификации для фиксации и отката.TCCрежим, который также можно рассматривать какMTрежим, для случаев, когда режим AT не поддерживается, выполните фиксацию и откат вручную. Далее мы подробно проанализируем принципы реализации этих двух режимов.

4.1 Управление ресурсами АТ

ATнужно использовать в режимеSeataОбщая логика реализации предоставленного агента источника данных показана на следующем рисунке:

В нашей программе выполнитьsqlутверждение, используете ли выmybatis, или использовать напрямуюjdbcTemplate, выполните следующие действия:

  • Шаг 1: Получите подключение к базе данных из источника данных.
  • Шаг 2: Получите его из соединенияStatement.
  • Шаг 3: Выполните наше заявление через Заявлениеsqlутверждение

Таким образом, мы можем положитьDataSource,Connection,StatementПрокси, а затем выполнить некоторые из нашей специальной логики, чтобы завершить наш режим AT.

4.1.1 DataSourceProxy

В DataSourceProxy не так много бизнес-логики, просто получитьConnectionиспользуйте нашConnectionProxyПрокси-класс инкапсулирован, и код выглядит следующим образом:

    public ConnectionProxy getConnection() throws SQLException {
        Connection targetConnection = targetDataSource.getConnection();
        return new ConnectionProxy(this, targetConnection);
    }

Сначала через наш прокси доDataSourceполучить соединение, затем использоватьConnectionProxyпрокси это.

4.1.2 ConnectionProxy

ConnectionProxyВ основном делайте три вещи, первая — генерировать агентовStatement, второе — сохранить наш контекст соединения: заблокированный ключ, undoLog и т. д., третье — прокси для выполнения нашей локальной транзакцииcommitа такжеrollback.

Сначала посмотрите на сгенерированный проксиStatement:

    @Override
    public Statement createStatement() throws SQLException {
        Statement targetStatement = getTargetConnection().createStatement();
        return new StatementProxy(this, targetStatement);
    }

    @Override
    public PreparedStatement prepareStatement(String sql) throws SQLException {
        PreparedStatement targetPreparedStatement = getTargetConnection().prepareStatement(sql);
        return new PreparedStatementProxy(this, targetPreparedStatement, sql);
    }

Здесь также напрямую генерируется через наше исходное соединениеStatement, а затем прокси.

Далее, давайте взглянем на управление нашим контекстом.Всем известно, что одна из наших транзакций на самом деле соответствует соединению с базой данных.sqlизundologа такжеlockKeyрегистрируются в контексте соединения. Как показано в следующем коде:

    /**
     * append sqlUndoLog
     *
     * @param sqlUndoLog the sql undo log
     */
    public void appendUndoLog(SQLUndoLog sqlUndoLog) {
        context.appendUndoItem(sqlUndoLog);
    }

    /**
     * append lockKey
     *
     * @param lockKey the lock key
     */
    public void appendLockKey(String lockKey) {
        context.appendLockKey(lockKey);
    }

Код здесь простой,lockKeyа такжеundologвсе используютlistсохранить, напрямуюaddВот и все.

Когда наша локальная транзакция завершена, нам нужно вызватьConnectionизcommitилиrollbackдля фиксации или отката транзакции. Здесь нам также нужно проксировать эти два метода, чтобы завершить нашу обработку транзакций филиала, давайте сначала посмотримcommitметод.

    public void commit() throws SQLException {
        if (context.inGlobalTransaction()) {
            processGlobalTransactionCommit();
        } else if (context.isGlobalLockRequire()) {
            processLocalCommitWithGlobalLocks();
        } else {
            targetConnection.commit();
        }
    }
    private void processGlobalTransactionCommit() throws SQLException {
        try {
            register();
        } catch (TransactionException e) {
            recognizeLockKeyConflictException(e);
        }
        try {
            if (context.hasUndoLog()) {
                UndoLogManager.flushUndoLogs(this);
            }
            targetConnection.commit();
        } catch (Throwable ex) {
            report(false);
            if (ex instanceof SQLException) {
                throw new SQLException(ex);
            }
        }
        report(true);
        context.reset();
    }
    
  • Шаг 1: СуждениеcontextВходит ли он в глобальную транзакцию, если да, отправьте его и перейдите к шагу 2.
  • Шаг 2: Зарегистрируйте транзакцию филиала и добавьте глобальную блокировку, а также создайте исключение, если глобальная блокировка не заблокируется.
  • Шаг 3: Еслиcontextимеютundolog, тоUnlogслить в базу.
  • Шаг 4: Зафиксируйте локальную транзакцию.
  • Шаг 5: Сообщите о состоянии локальной транзакции, сообщите о сбое, если есть исключение, и сообщите о нормальном состоянии, если проблем нет.

Выше описан процесс отправки транзакции, когдаcontextВ процессе глобальной блокировки будет выполняться запрос глобальной блокировки, который относительно прост и здесь повторяться не будет.contextВ приведенном выше случае транзакция также не будет зафиксирована напрямую.

о насrollbackКод относительно прост:

    public void rollback() throws SQLException {
        targetConnection.rollback();
        if (context.inGlobalTransaction()) {
            if (context.isBranchRegistered()) {
                report(false);
            }
        }
        context.reset();
    }

  • Шаг 1: Сначала зафиксируйте локальную транзакцию.
  • Шаг 2: Определите, входит ли он в глобальную транзакцию.
  • Шаг 3: Если да, определите, была ли зарегистрирована транзакция филиала.
  • Шаг 4: Если он был зарегистрирован, сообщите об исключении сбоя транзакции непосредственно клиенту.

Внимательные друзья могут обнаружить, что если наша локальная транзакция завершится ошибкой после фиксации или отката, будет ли результат нашей распределенной транзакции правильным? Здесь не о чем беспокоиться, и наш сервер имеет идеальное обнаружение тайм-аута, повторные попытки и другие механизмы, которые помогают нам справляться с этими особыми ситуациями.

4.1.3 StatementProxy

Обычно мы используемstatementпозвонюexecuteXXXметод для выполнения нашегоsqlзаявление, поэтому в нашемProxyВы можете использовать этот метод вsqlПри выполнении некоторой логики, которую нам нужно сделать, давайте посмотримexecuteКод метода:

    public boolean execute(String sql) throws SQLException {
        this.targetSQL = sql;
        return ExecuteTemplate.execute(this, new StatementCallback<Boolean, T>() {
            @Override
            public Boolean execute(T statement, Object... args) throws SQLException {
                return statement.execute((String) args[0]);
            }
        }, sql);
    }

Здесь логика напрямую передана намExecuteTemplateДля выполнения есть следующий код:

    public static <T, S extends Statement> T execute(SQLRecognizer sqlRecognizer,
                                                     StatementProxy<S> statementProxy,
                                                     StatementCallback<T, S> statementCallback,
                                                     Object... args) throws SQLException {

        if (!RootContext.inGlobalTransaction() && !RootContext.requireGlobalLock()) {
            // Just work as original statement
            return statementCallback.execute(statementProxy.getTargetStatement(), args);
        }

        if (sqlRecognizer == null) {
            sqlRecognizer = SQLVisitorFactory.get(
                    statementProxy.getTargetSQL(),
                    statementProxy.getConnectionProxy().getDbType());
        }
        Executor<T> executor = null;
        if (sqlRecognizer == null) {
            executor = new PlainExecutor<T, S>(statementProxy, statementCallback);
        } else {
            switch (sqlRecognizer.getSQLType()) {
                case INSERT:
                    executor = new InsertExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer);
                    break;
                case UPDATE:
                    executor = new UpdateExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer);
                    break;
                case DELETE:
                    executor = new DeleteExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer);
                    break;
                case SELECT_FOR_UPDATE:
                    executor = new SelectForUpdateExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer);
                    break;
                default:
                    executor = new PlainExecutor<T, S>(statementProxy, statementCallback);
                    break;
            }
        }
        T rs = null;
        try {
            rs = executor.execute(args);
        } catch (Throwable ex) {
            if (!(ex instanceof SQLException)) {
                // Turn other exception into SQLException
                ex = new SQLException(ex);
            }
            throw (SQLException)ex;
        }
        return rs;
    }
}

Вот наша реализация проксиsqlОсновная логика, шаги следующие:

  • Шаг 1: Если вы не участвуете в глобальной транзакции и вам не нужно запрашивать глобальную блокировку, выполните оригинал напрямую.Statement.
  • Шаг 2: Если нет входящегоsqlраспознаватель, то нам нужно сгенерироватьsqlРаспознаватель, здесь мы позаимствуем Друидаsql, мы получилиsqlРаспознаватель , мы можем получить различные типыsqlНекоторые условия утверждения, такие какSQLUpdateRecognizerиспользуется дляupdateизsqlРаспознаватель, мы можем напрямую получить имя таблицы, условный оператор, обновленное поле, обновленное значение поля и т. д.
  • Шаг 3: СогласноsqlТипы распознавателей для создания наших различных типов исполнителей.
  • Шаг 4: Выполните наш оператор sql через исполнителя на третьем шаге.

Вот пятьExecutor:INSERT,UPDATE,DELETEИсполнитель запишет отмену и глобальную блокировку,SELECT_FOR_UPDATEТолько запрашивать глобальную блокировку, есть значение по умолчанию, которое мы сейчас не поддерживаем, ничего не будем делать и напрямую выполним нашsqlутверждение.

Для INSERT, UPDATE, DELETE исполнители будут наследовать нашиAbstractDMLBaseExecutor:

    protected T executeAutoCommitFalse(Object[] args) throws Throwable {
        TableRecords beforeImage = beforeImage();
        T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
        TableRecords afterImage = afterImage(beforeImage);
        prepareUndoLog(beforeImage, afterImage);
        return result;
    }

    protected abstract TableRecords beforeImage() throws SQLException;


    protected abstract TableRecords afterImage(TableRecords beforeImage) throws SQLException;
    

существуетAbstractDMLBaseExecutorвыполнять логику вexecuteAutoCommitFalseЭтот метод, шаги следующие:

  • Шаг 1: Получите текущее выполнениеsqlСнимок затронутых строк ранее, здесьbeforeImageБудут переопределены различными типами операторов SQL.
  • Шаг 2: Выполните текущийsqlутверждение и получить результат.
  • Шаг 3. Получите исполнениеsqlСнимки позже, здесьafterIamgeТакже перереализован различными типами операторов SQL.
  • Шаг 4: ПоместитеundologГотово, здесь будет сохранен нашConnectionContextсередина.
    protected void prepareUndoLog(TableRecords beforeImage, TableRecords afterImage) throws SQLException {
        if (beforeImage.getRows().size() == 0 && afterImage.getRows().size() == 0) {
            return;
        }

        ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();

        TableRecords lockKeyRecords = sqlRecognizer.getSQLType() == SQLType.DELETE ? beforeImage : afterImage;
        String lockKeys = buildLockKey(lockKeyRecords);
        connectionProxy.appendLockKey(lockKeys);

        SQLUndoLog sqlUndoLog = buildUndoItem(beforeImage, afterImage);
        connectionProxy.appendUndoLog(sqlUndoLog);
    }

ПодготовитьUndoLogполучит нашConnectionProxy, поставь нашUndologа такжеLockKeyСохраните его для последующих локальных транзакцийcommitа такжеrollbackиспользовать, как было сказано выше.

4.1.4 Фиксация и откат транзакций ветки

Вышеупомянутые 4.1.1-4.1.3 все относятся к первому этапу нашей распределенной транзакции, которая заключается в регистрации нашей транзакции филиала дляServer, в то время как фиксация ветки второго этапа и откат ветки находятся в нашемDataSourceManager, есть следующий код для фиксации транзакции ветки:

public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) throws TransactionException {
        return asyncWorker.branchCommit(branchType, xid, branchId, resourceId, applicationData);
    }
public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) throws TransactionException {
        if (!ASYNC_COMMIT_BUFFER.offer(new Phase2Context(branchType, xid, branchId, resourceId, applicationData))) {
            LOGGER.warn("Async commit buffer is FULL. Rejected branch [" + branchId + "/" + xid + "] will be handled by housekeeping later.");
        }
        return BranchStatus.PhaseTwo_Committed;
    }

Здесь информация, представленная нашей транзакцией ветки, ставится в очередь и обрабатывается асинхронно, то есть мы удаляем нашу ветку асинхронно.undologданные, так как после отправкиundologДанные бесполезны.

Здесь можно спросить, что если машина выйдет из строя при асинхронной подаче этой информации в очередь, то асинхронное удаление не будет выполненоundologлогика, то этоundologСтанут ли они постоянными грязными данными? здесьSeataЧтобы этого не произошло, некоторые старые данные undolog будут регулярно сканироваться, а затем удаляться, что не будет загрязнять наши данные.

Для отката транзакции нашей ветки у нас есть следующий код:

    public BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) throws TransactionException {
        DataSourceProxy dataSourceProxy = get(resourceId);
        if (dataSourceProxy == null) {
            throw new ShouldNeverHappenException();
        }
        try {
            UndoLogManager.undo(dataSourceProxy, xid, branchId);
        } catch (TransactionException te) {
            if (te.getCode() == TransactionExceptionCode.BranchRollbackFailed_Unretriable) {
                return BranchStatus.PhaseTwo_RollbackFailed_Unretryable;
            } else {
                return BranchStatus.PhaseTwo_RollbackFailed_Retryable;
            }
        }
        return BranchStatus.PhaseTwo_Rollbacked;

    }

Здесь мы сначала получим наш источник данных, а затем вызовем наш менеджер журналов повторов.undoметод повтора журнала,undoМетод длинный и не будет опубликован здесь Основная логика состоит в том, чтобы найти нашundologЗатем повторите снимок внутри нашей базы данных.

4.2 Управление ресурсами ТСС

TCCнетATУправление ресурсами режима настолько сложное, что часть основной логики находится в предыдущемInterceptorЭто было объяснено в , таких как сохранение двухэтапного метода. В основном здесьTCCТранзакция ветки фиксируется, а транзакция ветки откатывается вTCCResourceManagerЕсть:

	public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId,
									 String applicationData) throws TransactionException {
		TCCResource tccResource = (TCCResource) tccResourceCache.get(resourceId);
		if (tccResource == null) {
			throw new ShouldNeverHappenException("TCC resource is not exist, resourceId:" + resourceId);
		}
		Object targetTCCBean = tccResource.getTargetBean();
		Method commitMethod = tccResource.getCommitMethod();
		if (targetTCCBean == null || commitMethod == null) {
			throw new ShouldNeverHappenException("TCC resource is not available, resourceId:" + resourceId);
		}
		boolean result = false;
		//BusinessActionContext
		BusinessActionContext businessActionContext =
				getBusinessActionContext(xid, branchId, resourceId, applicationData);
		Object ret = commitMethod.invoke(targetTCCBean, businessActionContext);
		LOGGER.info("TCC resource commit result :" + ret + ", xid:" + xid + ", branchId:" + branchId + ", resourceId:" +
				resourceId);
		if (ret != null && ret instanceof TwoPhaseResult) {
			result = ((TwoPhaseResult) ret).isSuccess();
		} else {
			result = (boolean) ret;
		}
		return result ? BranchStatus.PhaseTwo_Committed : BranchStatus.PhaseTwo_CommitFailed_Retryable;
	}

Выполните следующие действия:

  • Шаг 1: Сначала выясните, имеет ли текущая службаTCCресурс, если не выдается исключение.
  • Шаг 2: Затем найдите наш объект TCC и соответствующийcommitметод.
  • Шаг 3: Затем выполните нашcommitметод.
  • Шаг 4: Наконец, верните результат в нашServer,Зависит отServerРешите, стоит ли попробовать еще раз.

здесьbranchRollbackЭтот метод также относительно прост, поэтому я не буду проводить здесь слишком много анализа.

Суммировать

Из вышеприведенного анализа мы знаем, чтоSeataИнициализация зависит отSpringЧтобы продолжить, открытие/фиксация/откат наших глобальных транзакций зависит от нашего менеджера транзакций TM, а управление транзакциями наших филиалов зависит от нас.RM, который обеспечивает два режимаATа такжеTCC,ATСхема должна использовать базу данных, а ее основная реализация заключается в реализации прокси для источника данных, внедряя в него нашу собственную логику. Наш TCC может компенсировать тот факт, что мы не используем базу данных, и мы будем реализовывать как отправку, так и откат сами.Основная логика реализации заключается в том, чтобы полагаться на двухэтапный метод ресурса и наш целевой объект в нашем ресурсе. контекст. Сохраните его для использования в будущем.

Наконец, если вы заинтересованы в распределенных транзакциях, вы можете использовать и прочитатьSeataкод и дать нам предложения.

Если вы считаете, что эта статья полезна для вас, то ваше внимание и пересылка - самая большая поддержка для меня, O(∩_∩)O: