1. Предпосылки
В предыдущих статьях было представлено общее введение Seata, как его использовать и принципиальный анализ Seata-Server.Если вам интересно, вы можете прочитать следующие статьи:
- Углубленный анализ универсального решения для распределенных транзакций Seata-Server
- Платформа расшифровки распределенных транзакций — Fescar
В этой статье будут представлены две другие важные роли в Seata.TM
(менеджер по сделкам) иRM
(Исследователь), сначала давайте посмотрим на следующую картинку:
TC
Принцип подробно описан, для ТМ и РМ мы видим, что они принадлежатclient
роли, их соответствующие функции заключаются в следующем:
-
TM
(Диспетчер транзакций): используется для управления всей распределенной транзакцией и инициирования глобальных транзакций.Begin/Commit/Rollback
. -
RM(资源管理器)
: используется для регистрации вашей собственной транзакции филиала, принятьTC
изCommit
илиRollback
просить.
2.Seata-Spring
Сначала давайте представим некоторыеSeata-client
серединаSpring
модуль,Seata
Через этот модуль к себеTM
а такжеRM
Выполните инициализацию и сканирование аннотаций для режима AT и режима TCC, а также инициализируйте ресурсы, необходимые для этих режимов.
существуетSeata
один из проектовspring
модуль, который содержит наши иspring
родственная логика,GlobalTransactionScanner
это основной класс:
public class GlobalTransactionScanner extends AbstractAutoProxyCreator implements InitializingBean,ApplicationContextAware,
DisposableBean
Приведенный выше код является определением класса, в первую очередь он наследуетAbstractAutoProxyCreator
ДостигнутоwrapIfNecessary
метод реализует аспектный прокси для нашего метода, который реализуетInitializingBean
Интерфейс используется для инициализации нашего клиента, который реализуетApplicationContextAware
чтобы спасти нашуspring
контейнер, реализованныйDisposableBean
для плавного выключения.
Сначала взгляните на wrapIfNecessary, который наследует реализацию AbstractAutoProxyCreator.
protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
if (PROXYED_SET.contains(beanName)) {
return bean;
}
interceptor = null;
//check TCC proxy
if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {
//TCC interceptor, proxy bean of sofa:reference/dubbo:reference, and LocalTCC
interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));
} else {
Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);
Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);
if (!existsAnnotation(new Class[]{serviceInterface})
&& !existsAnnotation(interfacesIfJdk)) {
return bean;
}
if (interceptor == null) {
interceptor = new GlobalTransactionalInterceptor(failureHandlerHook);
}
}
if (!AopUtils.isAopProxy(bean)) {
bean = super.wrapIfNecessary(bean, beanName, cacheKey);
} else {
AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);
Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));
for (Advisor avr : advisor) {
advised.addAdvisor(0, avr);
}
}
PROXYED_SET.add(beanName);
return bean;
}
- Шаг 1: проверьте текущий
beanName
Разобрались ли с этим? Если с этим разобрались на этот раз, с ним не разберутся. - Шаг 2: По аннотации найдите соответствующий режим
Inteceptor
, тут три случая первыйTCC
, второй - перехватчик глобальной ТМ управления транзакциями, третий - нет аннотации, если нет, то вернитесь напрямую. - Шаг 3: Поместите соответствующий
interceptor
добавить к текущемуBean
.
Тогда посмотрите изInitializingBean
реализовано вafterPropertiesSet
, это верноSeata
инициализация:
public void afterPropertiesSet() {
initClient();
}
private void initClient() {
//init TM
TMClient.init(applicationId, txServiceGroup);
//init RM
RMClient.init(applicationId, txServiceGroup);
registerSpringShutdownHook();
}
private void registerSpringShutdownHook() {
if (applicationContext instanceof ConfigurableApplicationContext) {
((ConfigurableApplicationContext) applicationContext).registerShutdownHook();
ShutdownHook.removeRuntimeShutdownHook();
}
ShutdownHook.getInstance().addDisposable(TmRpcClient.getInstance(applicationId, txServiceGroup));
ShutdownHook.getInstance().addDisposable(RmRpcClient.getInstance(applicationId, txServiceGroup));
}
Логика приведенного выше кода относительно ясна:
- Шаг 1: Инициализировать
TM
клиент, сюда будут отправленыServer
зарегистрироватьTM
. - Шаг 2: Инициализировать
RM
Клиент, здесь будет зарегистрирован сервер с серверомRM
. - Шаг 3: Зарегистрируйтесь
ShutdownHook
, за которым последуетTM
а такжеRM
Закройте изящно.
Обратите внимание, что при инициализации здесь будут инициализированы два клиента, а именноTM
клиент иRM
клиент, думают многиеTM
а такжеRM
Это тот же клиент, который используется, поэтому вам нужно обратить внимание на это.
2.1 Interceptor
В первой части приведенной выше логики мы видим, что у нас есть два бизнес-ядра.Interceptor
,одинGlobalTransactionalInterceptor
Используется для управления глобальными транзакциями (открытие, фиксация, откат), другойTccActionInterceptor
Используется для обработки режима TCC. Знакомые с Seata друзья спросят про режим AT, почему только режим TCC, здесь режим AT представляет собой автоматическую обработку транзакций, нам не нужны аспекты
2.1.1 GlobalTransactionalInterceptor
Сначала взгляните на GlobalTransactionalInterceptor#invoke:
public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
Class<?> targetClass = (methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null);
Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);
final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);
final GlobalTransactional globalTransactionalAnnotation = getAnnotation(method, GlobalTransactional.class);
final GlobalLock globalLockAnnotation = getAnnotation(method, GlobalLock.class);
if (globalTransactionalAnnotation != null) {
return handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation);
} else if (globalLockAnnotation != null) {
return handleGlobalLock(methodInvocation);
} else {
return methodInvocation.proceed();
}
}
- Шаг 1: Получите оригинал из прокси-класса
Method
- Шаг 2: получить
Method
аннотации в - Шаг 3: Если есть
@GlobalTransactional
Аннотация выполняет логику аспекта handleGlobalTransaction, которая также является логикой нашей глобальной транзакции. - Шаг 4: Если есть
@GlobalLock
Аннотация, выполняется логика аспекта handleGlobalLock. Эта аннотация используется для некоторой блокировки базы данных, отличной от режима AT. После добавления этой аннотации он будет запрашивать, заблокированы ли соответствующие данные перед выполнением инструкции Sql, но не присоединится к глобальной транзакции. .
handleGlobalTransaction
Логика следующая:
private Object handleGlobalTransaction(final MethodInvocation methodInvocation,
final GlobalTransactional globalTrxAnno) throws Throwable {
return transactionalTemplate.execute(new TransactionalExecutor() {
@Override
public Object execute() throws Throwable {
return methodInvocation.proceed();
}
});
}
TransactionalTemplate#execute
public Object execute(TransactionalExecutor business) throws Throwable {
// 1. get or create a transaction
GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();
// 1.1 get transactionInfo
TransactionInfo txInfo = business.getTransactionInfo();
if (txInfo == null) {
throw new ShouldNeverHappenException("transactionInfo does not exist");
}
try {
// 2. begin transaction
beginTransaction(txInfo, tx);
Object rs = null;
try {
// Do Your Business
rs = business.execute();
} catch (Throwable ex) {
// 3.the needed business exception to rollback.
completeTransactionAfterThrowing(txInfo,tx,ex);
throw ex;
}
// 4. everything is fine, commit.
commitTransaction(tx);
return rs;
} finally {
//5. clear
triggerAfterCompletion();
cleanUp();
}
}
существуетhandleGlobalTransaction
Лейтенант даст конкретную реализациюTransactionalTemplate#execute
сделано, конкретные шаги заключаются в следующем:
- Шаг 1: Получите текущую глобальную транзакцию, если нет, создайте ее.
- Шаг 2: Получите информацию о транзакциях в бизнесе, включая время ожидания и т. д.
- Шаг 3: Откройте глобальную транзакцию
- Шаг 4: Если для обработки исключения было создано исключение, выполните откат.
- Шаг 5: Если исключения нет, зафиксируйте глобальную транзакцию.
- Шаг 6: Очистите текущую информацию о контексте транзакции.
2.1.2 TccActionInterceptor
Давайте сначала посмотрим, как используется TccActionInterceptor:
@TwoPhaseBusinessAction(name = "TccActionOne" , commitMethod = "commit", rollbackMethod = "rollback")
public boolean prepare(BusinessActionContext actionContext, int a);
public boolean commit(BusinessActionContext actionContext);
public boolean rollback(BusinessActionContext actionContext);
Вообще говоря, будет определено три метода, один — это метод try этапа, а другой — фиксация и откат второго этапа.Первый параметр каждого метода — контекст нашей транзакции.Здесь нам не нужно заботиться что он будет самодостаточным в нашем аспекте.
Далее давайте посмотрим, как обрабатываются перехватчики, связанные с TCC:
public Object invoke(final MethodInvocation invocation) throws Throwable {
Method method = getActionInterfaceMethod(invocation);
TwoPhaseBusinessAction businessAction = method.getAnnotation(TwoPhaseBusinessAction.class);
//try method
if(businessAction != null) {
if(StringUtils.isBlank(RootContext.getXID())){
//not in distribute transaction
return invocation.proceed();
}
Object[] methodArgs = invocation.getArguments();
//Handler the TCC Aspect
Map<String, Object> ret = actionInterceptorHandler.proceed(method, methodArgs, businessAction, new Callback<Object>(){
@Override
public Object execute() throws Throwable {
return invocation.proceed();
}
});
//return the final result
return ret.get(Constants.TCC_METHOD_RESULT);
}
return invocation.proceed();
}
- Шаг 1: Получите оригинал
Method
. - Шаг 2: Определите, входит ли он в глобальную транзакцию, то есть выполняется ли самый внешний уровень всей логической службы
GlobalTransactionalInterceptor
. Если он больше не выполняется напрямую. - Шаг 3: Выполнить
TCC
Аспект, основная логика находится вactionInterceptorHandler#proceed
середина.
посмотри сноваactionInterceptorHandler#proceed
Сюда:
public Map<String, Object> proceed(Method method, Object[] arguments, TwoPhaseBusinessAction businessAction, Callback<Object> targetCallback) throws Throwable {
Map<String, Object> ret = new HashMap<String, Object>(16);
//TCC name
String actionName = businessAction.name();
String xid = RootContext.getXID();
BusinessActionContext actionContext = new BusinessActionContext();
actionContext.setXid(xid);
//set action anme
actionContext.setActionName(actionName)
//Creating Branch Record
String branchId = doTccActionLogStore(method, arguments, businessAction, actionContext);
actionContext.setBranchId(branchId);
//set the parameter whose type is BusinessActionContext
Class<?>[] types = method.getParameterTypes();
int argIndex = 0;
for (Class<?> cls : types) {
if (cls.getName().equals(BusinessActionContext.class.getName())) {
arguments[argIndex] = actionContext;
break;
}
argIndex++;
}
//the final parameters of the try method
ret.put(Constants.TCC_METHOD_ARGUMENTS, arguments);
//the final result
ret.put(Constants.TCC_METHOD_RESULT, targetCallback.execute());
return ret;
}
- Шаг 1: Получите некоторую информацию о транзакции, такую как
TCC
имя, эта транзакцияXID
Ждать. - Шаг 2: Создайте
Branch
транзакции, одна местнаяcontext
поместите это в контекстcommit
а такжеrollback
информация хранится, другая для нашегоSeata-Server
Зарегистрируйте транзакции филиала для последующего управления. - Шаг 3: Заполните параметры метода, который является нашим
BusinessActionContext
.
2.2 Резюме
Было проанализировано несколько общих компонентов Spring.Основных классов в основном три, одинScanner
,дваInterceptor
. В целом это относительно просто. По сути, то, что делает Spring, это некоторая инициализация нашего клиента. Далее, давайте более подробно рассмотрим роль TM.
3. Менеджер транзакций ТМ
В предыдущей главе мы говорили оGlobalTransactionalInterceptor
В этом аспекте перехватчика мы знаем, что этот перехватчик делает то, что должен делать наш TM, например, открытие транзакции, фиксацию транзакции и откат транзакции. Это только отправная точка нашей общей логики. Конкретная клиентская логика находится в нашем DefaultTransactionManager. Код в этом классе выглядит следующим образом:
public class DefaultTransactionManager implements TransactionManager {
@Override
public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
throws TransactionException {
GlobalBeginRequest request = new GlobalBeginRequest();
request.setTransactionName(name);
request.setTimeout(timeout);
GlobalBeginResponse response = (GlobalBeginResponse)syncCall(request);
return response.getXid();
}
@Override
public GlobalStatus commit(String xid) throws TransactionException {
GlobalCommitRequest globalCommit = new GlobalCommitRequest();
globalCommit.setXid(xid);
GlobalCommitResponse response = (GlobalCommitResponse)syncCall(globalCommit);
return response.getGlobalStatus();
}
@Override
public GlobalStatus rollback(String xid) throws TransactionException {
GlobalRollbackRequest globalRollback = new GlobalRollbackRequest();
globalRollback.setXid(xid);
GlobalRollbackResponse response = (GlobalRollbackResponse)syncCall(globalRollback);
return response.getGlobalStatus();
}
@Override
public GlobalStatus getStatus(String xid) throws TransactionException {
GlobalStatusRequest queryGlobalStatus = new GlobalStatusRequest();
queryGlobalStatus.setXid(xid);
GlobalStatusResponse response = (GlobalStatusResponse)syncCall(queryGlobalStatus);
return response.getGlobalStatus();
}
private AbstractTransactionResponse syncCall(AbstractTransactionRequest request) throws TransactionException {
try {
return (AbstractTransactionResponse)TmRpcClient.getInstance().sendMsgWithResponse(request);
} catch (TimeoutException toe) {
throw new TransactionException(TransactionExceptionCode.IO, toe);
}
}
}
существуетDefaultTransactionManager
Общая логика относительно проста, и есть четыре метода:
-
begin
:КServer
положить началоGlobalBeginRequest
Запрос на запуск глобальной транзакции. -
commit
:КServer
положить началоGlobalCommitRequest
Запрос на совершение глобальной транзакции. -
rollback
:КServer
положить началоGlobalRollbackRequest
Запрос на откат глобальной транзакции. -
getStatus
:КServer
положить началоGlobalStatusRequest
Запрос на запрос глобальной информации о статусе транзакции.
4. Обозреватель РМ
существуетSeata
текущий менеджментRM
Есть два режима: одинAT
Режим, который требует поддержки транзакционной базы данных, будет автоматически записывать моментальный снимок до модификации и моментальный снимок после модификации для фиксации и отката.TCC
режим, который также можно рассматривать какMT
режим, для случаев, когда режим AT не поддерживается, выполните фиксацию и откат вручную. Далее мы подробно проанализируем принципы реализации этих двух режимов.
4.1 Управление ресурсами АТ
AT
нужно использовать в режимеSeata
Общая логика реализации предоставленного агента источника данных показана на следующем рисунке:
В нашей программе выполнитьsql
утверждение, используете ли выmybatis
, или использовать напрямуюjdbcTemplate
, выполните следующие действия:
- Шаг 1: Получите подключение к базе данных из источника данных.
- Шаг 2: Получите его из соединения
Statement
. - Шаг 3: Выполните наше заявление через Заявление
sql
утверждение
Таким образом, мы можем положитьDataSource
,Connection
,Statement
Прокси, а затем выполнить некоторые из нашей специальной логики, чтобы завершить наш режим AT.
4.1.1 DataSourceProxy
В DataSourceProxy не так много бизнес-логики, просто получитьConnection
используйте нашConnectionProxy
Прокси-класс инкапсулирован, и код выглядит следующим образом:
public ConnectionProxy getConnection() throws SQLException {
Connection targetConnection = targetDataSource.getConnection();
return new ConnectionProxy(this, targetConnection);
}
Сначала через наш прокси доDataSource
получить соединение, затем использоватьConnectionProxy
прокси это.
4.1.2 ConnectionProxy
ConnectionProxy
В основном делайте три вещи, первая — генерировать агентовStatement
, второе — сохранить наш контекст соединения: заблокированный ключ, undoLog и т. д., третье — прокси для выполнения нашей локальной транзакцииcommit
а такжеrollback
.
Сначала посмотрите на сгенерированный проксиStatement
:
@Override
public Statement createStatement() throws SQLException {
Statement targetStatement = getTargetConnection().createStatement();
return new StatementProxy(this, targetStatement);
}
@Override
public PreparedStatement prepareStatement(String sql) throws SQLException {
PreparedStatement targetPreparedStatement = getTargetConnection().prepareStatement(sql);
return new PreparedStatementProxy(this, targetPreparedStatement, sql);
}
Здесь также напрямую генерируется через наше исходное соединениеStatement
, а затем прокси.
Далее, давайте взглянем на управление нашим контекстом.Всем известно, что одна из наших транзакций на самом деле соответствует соединению с базой данных.sql
изundolog
а такжеlockKey
регистрируются в контексте соединения. Как показано в следующем коде:
/**
* append sqlUndoLog
*
* @param sqlUndoLog the sql undo log
*/
public void appendUndoLog(SQLUndoLog sqlUndoLog) {
context.appendUndoItem(sqlUndoLog);
}
/**
* append lockKey
*
* @param lockKey the lock key
*/
public void appendLockKey(String lockKey) {
context.appendLockKey(lockKey);
}
Код здесь простой,lockKey
а такжеundolog
все используютlist
сохранить, напрямуюadd
Вот и все.
Когда наша локальная транзакция завершена, нам нужно вызватьConnection
изcommit
илиrollback
для фиксации или отката транзакции. Здесь нам также нужно проксировать эти два метода, чтобы завершить нашу обработку транзакций филиала, давайте сначала посмотримcommit
метод.
public void commit() throws SQLException {
if (context.inGlobalTransaction()) {
processGlobalTransactionCommit();
} else if (context.isGlobalLockRequire()) {
processLocalCommitWithGlobalLocks();
} else {
targetConnection.commit();
}
}
private void processGlobalTransactionCommit() throws SQLException {
try {
register();
} catch (TransactionException e) {
recognizeLockKeyConflictException(e);
}
try {
if (context.hasUndoLog()) {
UndoLogManager.flushUndoLogs(this);
}
targetConnection.commit();
} catch (Throwable ex) {
report(false);
if (ex instanceof SQLException) {
throw new SQLException(ex);
}
}
report(true);
context.reset();
}
- Шаг 1: Суждение
context
Входит ли он в глобальную транзакцию, если да, отправьте его и перейдите к шагу 2. - Шаг 2: Зарегистрируйте транзакцию филиала и добавьте глобальную блокировку, а также создайте исключение, если глобальная блокировка не заблокируется.
- Шаг 3: Если
context
имеютundolog
, тоUnlog
слить в базу. - Шаг 4: Зафиксируйте локальную транзакцию.
- Шаг 5: Сообщите о состоянии локальной транзакции, сообщите о сбое, если есть исключение, и сообщите о нормальном состоянии, если проблем нет.
Выше описан процесс отправки транзакции, когдаcontext
В процессе глобальной блокировки будет выполняться запрос глобальной блокировки, который относительно прост и здесь повторяться не будет.context
В приведенном выше случае транзакция также не будет зафиксирована напрямую.
о насrollback
Код относительно прост:
public void rollback() throws SQLException {
targetConnection.rollback();
if (context.inGlobalTransaction()) {
if (context.isBranchRegistered()) {
report(false);
}
}
context.reset();
}
- Шаг 1: Сначала зафиксируйте локальную транзакцию.
- Шаг 2: Определите, входит ли он в глобальную транзакцию.
- Шаг 3: Если да, определите, была ли зарегистрирована транзакция филиала.
- Шаг 4: Если он был зарегистрирован, сообщите об исключении сбоя транзакции непосредственно клиенту.
Внимательные друзья могут обнаружить, что если наша локальная транзакция завершится ошибкой после фиксации или отката, будет ли результат нашей распределенной транзакции правильным? Здесь не о чем беспокоиться, и наш сервер имеет идеальное обнаружение тайм-аута, повторные попытки и другие механизмы, которые помогают нам справляться с этими особыми ситуациями.
4.1.3 StatementProxy
Обычно мы используемstatement
позвонюexecuteXXX
метод для выполнения нашегоsql
заявление, поэтому в нашемProxy
Вы можете использовать этот метод вsql
При выполнении некоторой логики, которую нам нужно сделать, давайте посмотримexecute
Код метода:
public boolean execute(String sql) throws SQLException {
this.targetSQL = sql;
return ExecuteTemplate.execute(this, new StatementCallback<Boolean, T>() {
@Override
public Boolean execute(T statement, Object... args) throws SQLException {
return statement.execute((String) args[0]);
}
}, sql);
}
Здесь логика напрямую передана намExecuteTemplate
Для выполнения есть следующий код:
public static <T, S extends Statement> T execute(SQLRecognizer sqlRecognizer,
StatementProxy<S> statementProxy,
StatementCallback<T, S> statementCallback,
Object... args) throws SQLException {
if (!RootContext.inGlobalTransaction() && !RootContext.requireGlobalLock()) {
// Just work as original statement
return statementCallback.execute(statementProxy.getTargetStatement(), args);
}
if (sqlRecognizer == null) {
sqlRecognizer = SQLVisitorFactory.get(
statementProxy.getTargetSQL(),
statementProxy.getConnectionProxy().getDbType());
}
Executor<T> executor = null;
if (sqlRecognizer == null) {
executor = new PlainExecutor<T, S>(statementProxy, statementCallback);
} else {
switch (sqlRecognizer.getSQLType()) {
case INSERT:
executor = new InsertExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer);
break;
case UPDATE:
executor = new UpdateExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer);
break;
case DELETE:
executor = new DeleteExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer);
break;
case SELECT_FOR_UPDATE:
executor = new SelectForUpdateExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer);
break;
default:
executor = new PlainExecutor<T, S>(statementProxy, statementCallback);
break;
}
}
T rs = null;
try {
rs = executor.execute(args);
} catch (Throwable ex) {
if (!(ex instanceof SQLException)) {
// Turn other exception into SQLException
ex = new SQLException(ex);
}
throw (SQLException)ex;
}
return rs;
}
}
Вот наша реализация проксиsql
Основная логика, шаги следующие:
- Шаг 1: Если вы не участвуете в глобальной транзакции и вам не нужно запрашивать глобальную блокировку, выполните оригинал напрямую.
Statement
. - Шаг 2: Если нет входящего
sql
распознаватель, то нам нужно сгенерироватьsql
Распознаватель, здесь мы позаимствуем Друидаsql
, мы получилиsql
Распознаватель , мы можем получить различные типыsql
Некоторые условия утверждения, такие какSQLUpdateRecognizer
используется дляupdate
изsql
Распознаватель, мы можем напрямую получить имя таблицы, условный оператор, обновленное поле, обновленное значение поля и т. д. - Шаг 3: Согласно
sql
Типы распознавателей для создания наших различных типов исполнителей. - Шаг 4: Выполните наш оператор sql через исполнителя на третьем шаге.
Вот пятьExecutor
:INSERT,UPDATE,DELETE
Исполнитель запишет отмену и глобальную блокировку,SELECT_FOR_UPDATE
Только запрашивать глобальную блокировку, есть значение по умолчанию, которое мы сейчас не поддерживаем, ничего не будем делать и напрямую выполним нашsql
утверждение.
Для INSERT, UPDATE, DELETE исполнители будут наследовать нашиAbstractDMLBaseExecutor
:
protected T executeAutoCommitFalse(Object[] args) throws Throwable {
TableRecords beforeImage = beforeImage();
T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
TableRecords afterImage = afterImage(beforeImage);
prepareUndoLog(beforeImage, afterImage);
return result;
}
protected abstract TableRecords beforeImage() throws SQLException;
protected abstract TableRecords afterImage(TableRecords beforeImage) throws SQLException;
существуетAbstractDMLBaseExecutor
выполнять логику вexecuteAutoCommitFalse
Этот метод, шаги следующие:
- Шаг 1: Получите текущее выполнение
sql
Снимок затронутых строк ранее, здесьbeforeImage
Будут переопределены различными типами операторов SQL. - Шаг 2: Выполните текущий
sql
утверждение и получить результат. - Шаг 3. Получите исполнение
sql
Снимки позже, здесьafterIamge
Также перереализован различными типами операторов SQL. - Шаг 4: Поместите
undolog
Готово, здесь будет сохранен нашConnectionContext
середина.
protected void prepareUndoLog(TableRecords beforeImage, TableRecords afterImage) throws SQLException {
if (beforeImage.getRows().size() == 0 && afterImage.getRows().size() == 0) {
return;
}
ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
TableRecords lockKeyRecords = sqlRecognizer.getSQLType() == SQLType.DELETE ? beforeImage : afterImage;
String lockKeys = buildLockKey(lockKeyRecords);
connectionProxy.appendLockKey(lockKeys);
SQLUndoLog sqlUndoLog = buildUndoItem(beforeImage, afterImage);
connectionProxy.appendUndoLog(sqlUndoLog);
}
ПодготовитьUndoLog
получит нашConnectionProxy
, поставь нашUndolog
а такжеLockKey
Сохраните его для последующих локальных транзакцийcommit
а такжеrollback
использовать, как было сказано выше.
4.1.4 Фиксация и откат транзакций ветки
Вышеупомянутые 4.1.1-4.1.3 все относятся к первому этапу нашей распределенной транзакции, которая заключается в регистрации нашей транзакции филиала дляServer
, в то время как фиксация ветки второго этапа и откат ветки находятся в нашемDataSourceManager
, есть следующий код для фиксации транзакции ветки:
public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) throws TransactionException {
return asyncWorker.branchCommit(branchType, xid, branchId, resourceId, applicationData);
}
public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) throws TransactionException {
if (!ASYNC_COMMIT_BUFFER.offer(new Phase2Context(branchType, xid, branchId, resourceId, applicationData))) {
LOGGER.warn("Async commit buffer is FULL. Rejected branch [" + branchId + "/" + xid + "] will be handled by housekeeping later.");
}
return BranchStatus.PhaseTwo_Committed;
}
Здесь информация, представленная нашей транзакцией ветки, ставится в очередь и обрабатывается асинхронно, то есть мы удаляем нашу ветку асинхронно.undolog
данные, так как после отправкиundolog
Данные бесполезны.
Здесь можно спросить, что если машина выйдет из строя при асинхронной подаче этой информации в очередь, то асинхронное удаление не будет выполненоundolog
логика, то этоundolog
Станут ли они постоянными грязными данными? здесьSeata
Чтобы этого не произошло, некоторые старые данные undolog будут регулярно сканироваться, а затем удаляться, что не будет загрязнять наши данные.
Для отката транзакции нашей ветки у нас есть следующий код:
public BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) throws TransactionException {
DataSourceProxy dataSourceProxy = get(resourceId);
if (dataSourceProxy == null) {
throw new ShouldNeverHappenException();
}
try {
UndoLogManager.undo(dataSourceProxy, xid, branchId);
} catch (TransactionException te) {
if (te.getCode() == TransactionExceptionCode.BranchRollbackFailed_Unretriable) {
return BranchStatus.PhaseTwo_RollbackFailed_Unretryable;
} else {
return BranchStatus.PhaseTwo_RollbackFailed_Retryable;
}
}
return BranchStatus.PhaseTwo_Rollbacked;
}
Здесь мы сначала получим наш источник данных, а затем вызовем наш менеджер журналов повторов.undo
метод повтора журнала,undo
Метод длинный и не будет опубликован здесь Основная логика состоит в том, чтобы найти нашundolog
Затем повторите снимок внутри нашей базы данных.
4.2 Управление ресурсами ТСС
TCC
нетAT
Управление ресурсами режима настолько сложное, что часть основной логики находится в предыдущемInterceptor
Это было объяснено в , таких как сохранение двухэтапного метода. В основном здесьTCC
Транзакция ветки фиксируется, а транзакция ветки откатывается вTCCResourceManager
Есть:
public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId,
String applicationData) throws TransactionException {
TCCResource tccResource = (TCCResource) tccResourceCache.get(resourceId);
if (tccResource == null) {
throw new ShouldNeverHappenException("TCC resource is not exist, resourceId:" + resourceId);
}
Object targetTCCBean = tccResource.getTargetBean();
Method commitMethod = tccResource.getCommitMethod();
if (targetTCCBean == null || commitMethod == null) {
throw new ShouldNeverHappenException("TCC resource is not available, resourceId:" + resourceId);
}
boolean result = false;
//BusinessActionContext
BusinessActionContext businessActionContext =
getBusinessActionContext(xid, branchId, resourceId, applicationData);
Object ret = commitMethod.invoke(targetTCCBean, businessActionContext);
LOGGER.info("TCC resource commit result :" + ret + ", xid:" + xid + ", branchId:" + branchId + ", resourceId:" +
resourceId);
if (ret != null && ret instanceof TwoPhaseResult) {
result = ((TwoPhaseResult) ret).isSuccess();
} else {
result = (boolean) ret;
}
return result ? BranchStatus.PhaseTwo_Committed : BranchStatus.PhaseTwo_CommitFailed_Retryable;
}
Выполните следующие действия:
- Шаг 1: Сначала выясните, имеет ли текущая служба
TCC
ресурс, если не выдается исключение. - Шаг 2: Затем найдите наш объект TCC и соответствующий
commit
метод. - Шаг 3: Затем выполните наш
commit
метод. - Шаг 4: Наконец, верните результат в наш
Server
,Зависит отServer
Решите, стоит ли попробовать еще раз.
здесьbranchRollback
Этот метод также относительно прост, поэтому я не буду проводить здесь слишком много анализа.
Суммировать
Из вышеприведенного анализа мы знаем, чтоSeata
Инициализация зависит отSpring
Чтобы продолжить, открытие/фиксация/откат наших глобальных транзакций зависит от нашего менеджера транзакций TM, а управление транзакциями наших филиалов зависит от нас.RM
, который обеспечивает два режимаAT
а такжеTCC
,AT
Схема должна использовать базу данных, а ее основная реализация заключается в реализации прокси для источника данных, внедряя в него нашу собственную логику. Наш TCC может компенсировать тот факт, что мы не используем базу данных, и мы будем реализовывать как отправку, так и откат сами.Основная логика реализации заключается в том, чтобы полагаться на двухэтапный метод ресурса и наш целевой объект в нашем ресурсе. контекст. Сохраните его для использования в будущем.
Наконец, если вы заинтересованы в распределенных транзакциях, вы можете использовать и прочитатьSeata
код и дать нам предложения.
Если вы считаете, что эта статья полезна для вас, то ваше внимание и пересылка - самая большая поддержка для меня, O(∩_∩)O: