предисловие
Несколько дней назад я изучал промежуточное ПО для распределенных транзакций Alibaba с открытым исходным кодом дома.Seata
, и записал процесс.
SpringBoot+Dubbo+Seata распределенная битва транзакций
Однако одного реального боя недостаточно, мы должны до некоторой степени понять принцип, иначе было бы стыдно не знать, как решить проблему.
1. Принцип
Во-первых, представьте традиционное монолитное приложение, которое завершает бизнес, обновляя данные в одном и том же источнике данных через 3 модуля.
Естественно, согласованность данных всего бизнес-процесса гарантируется локальными транзакциями.
Монолитные приложения разбиваются на микросервисы по мере изменения бизнес-требований и архитектуры. Исходные 3 модуля были разделены на 3 независимых сервиса, каждый из которых использует независимые данные.
Бизнес-процесс будет выполняться через сервисные вызовы RPC.
В настоящее время согласованность данных в каждой службе по-прежнему гарантируется локальными транзакциями.
И как обеспечить глобальную согласованность данных и целостность всего бизнес-уровня? Это типичное требование к распределенным транзакциям, с которым сталкивается микросервисная архитектура.
1. Принцип и конструкция
Seata
Понимать распределенную транзакцию кактранзакция филиалаизглобальная транзакция.глобальная транзакцияответственность за координацию подведомственных ему юрисдикцийтранзакция филиалаДостигните соглашения, либо успешно зафиксируйте вместе, либо потерпите неудачу и откатитесь вместе. Кроме того, обычнотранзакция филиалаЭто локальная транзакция, которая удовлетворяет самой ACID.
Seata
Для протоколирования обработки распределенных транзакций определены три компонента.
- Координатор транзакций (TC): координатор транзакций поддерживает текущее состояние глобальной транзакции и отвечает за координацию и управление фиксацией или откатом глобальной транзакции.
- Диспетчер транзакций (TM): контролирует границы глобальных транзакций, отвечает за открытие глобальной транзакции и, наконец, инициирует глобальную фиксацию или разрешение глобального отката.
- Диспетчер ресурсов (RM): контролирует транзакции филиалов, отвечает за регистрацию филиалов, отчеты о состоянии и получает инструкции от координатора транзакций для управления фиксацией и откатом транзакций филиала (локальных).
Типичный процесс распределенной транзакции:
- TM обращается к TC, чтобы открыть глобальную транзакцию, глобальная транзакция успешно создается и генерируется глобально уникальный XID.
- XID распространяется в контексте цепочки вызовов микрослужб.
- RM регистрирует транзакцию филиала в TC и переводит ее в юрисдикцию глобальной транзакции, соответствующей XID.
- TM инициирует глобальную фиксацию или разрешение отката для XID в TC.
- TC планирует все транзакции филиалов под юрисдикцией XID для выполнения запроса на фиксацию или откат.
2. В режиме
Seata предлагает 4 решения для распределенных транзакций, а именно режим AT, режим TCC, режим Saga и режим XA.
В нашем примере проекта используется режим AT. В режиме AT пользователям нужно обращать внимание только на свой «бизнес-SQL». «Бизнес-SQL» пользователя используется в качестве первого этапа, а платформа Seata автоматически генерирует операции фиксации и отката транзакции второго этапа.
- Первый этап:
На первом этапе Seata перехватит «бизнес-SQL», сначала проанализирует семантику SQL, найдет «бизнес-SQL». SQL», чтобы обновить бизнес-данные, перед обновлением бизнес-данных сохраните их как «изображение до», затем выполните «бизнес-SQL», чтобы обновить бизнес-данные, после обновления бизнес-данных сохраните их как «изображение после», Наконец, создается блокировка строки.Все вышеуказанные операции выполняются в рамках транзакции базы данных, что обеспечивает атомарность одноэтапной операции.
- Вторая фаза фиксации:
Если отправлен второй этап, поскольку «бизнес-SQL» был отправлен в базу данных на первом этапе, платформе Seata нужно только удалить данные моментального снимка и блокировки строк, сохраненные на первом этапе, чтобы завершить очистку данных.
- Двухэтапный откат:
Если второй этап представляет собой откат, Seata необходимо выполнить откат «бизнес-SQL», который был выполнен на первом этапе, чтобы восстановить бизнес-данные. Метод отката заключается в восстановлении бизнес-данных с помощью образа «до».
Давайте посмотрим, как весь процесс связан из исходного кода.
2. Создание локальной среды
Чтобы облегчить просмотр исходного кода, мы должны сначала настроить среду отладки для облегчения отладки.
Исходный код Seata:github.com/seata/seata.
Текущая версия0.7.0-SNAPSHOT
, то черезmvn install
Упакуйте проект локально.
нашSpringBoot+Seata
Тестовый проект может ввести эту зависимость.
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-all</artifactId>
<version>0.7.0-SNAPSHOT</version>
</dependency>
почему ты хочешь сделать это? потому чтоSeata
Связь между различными компонентамиNetty
По полной, при отладке часто отключается из-за таймаута.
С введением локальной версии мы можем увеличить время обнаружения сердцебиения или просто удалить его, просто сделайте это~
1. Запустите сервер
оказатьсяio.seata.server.Server
, запустить напрямуюmain
метод, запустите сервис Seata, так просто~
мы сказали вышеSeata
Определены три компонента, один из которых — координатор транзакций с именем TC, который относится к серверу.
Давайте посмотрим, что именно он делает.
public class Server {
public static void main(String[] args) throws IOException {
//初始化参数解析器
ParameterParser parameterParser = new ParameterParser(args);
//初始化RpcServer ,设置服务器参数
RpcServer rpcServer = new RpcServer(WORKING_THREADS);
rpcServer.setHost(parameterParser.getHost());
rpcServer.setListenPort(parameterParser.getPort());
UUIDGenerator.init(1);
//从文件或者数据库中加载Session
SessionHolder.init(parameterParser.getStoreMode());
//初始化默认的协调器
DefaultCoordinator coordinator = new DefaultCoordinator(rpcServer);
coordinator.init();
rpcServer.setHandler(coordinator);
//注册钩子程序 清理协调器相关资源
ShutdownHook.getInstance().addDisposable(coordinator);
//127.0.0.1 and 0.0.0.0 are not valid here.
if (NetUtil.isValidIp(parameterParser.getHost(), false)) {
XID.setIpAddress(parameterParser.getHost());
} else {
XID.setIpAddress(NetUtil.getLocalIp());
}
XID.setPort(rpcServer.getListenPort());
//启动RPC服务
rpcServer.init();
System.exit(0);
}
}
здесьRpcServer
Это сервер RPC, реализованный Netty для получения и обработки сообщений от TM и RM. В этой статье основное внимание уделяется не серверной стороне, поэтому сначала мы можем составить общее впечатление.
3. Конфигурация клиента
В проекте мы настроилиSeataConfiguration
, который фокусируется на настройке глобальных сканеров транзакций и агентов источников данных. Итак, давайте сначала посмотрим, почему они настроены и что они делают.
1. Сканер транзакций
@Bean
public GlobalTransactionScanner globalTransactionScanner() {
return new GlobalTransactionScanner("springboot-order", "my_test_tx_group");
}
По правилам, смотрим на класс, сначала смотрим на его структуру. Например, чей это сын, откуда он взялся и куда хочет уйти?
public class GlobalTransactionScanner extends AbstractAutoProxyCreator
implements InitializingBean, ApplicationContextAware,DisposableBean {
}
Здесь мы видим, что этоAbstractAutoProxyCreator
Подкласс , который снова реализуетInitializingBean
интерфейс.
Эти два приятеля являются членами семейства Spring, один используется для прокси генерации Spring AOP, другой используется для вызова метода инициализации Bean.
- InitializingBean
Существует три способа инициализации bean-компонентов по порядку:@PostConstruct、afterPropertiesSet、init-method
.
Здесь, в своем методе инициализации, он в основном делает три вещи.
private void initClient() {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Initializing Global Transaction Clients ... ");
}
//init TM 初始化事务管理器
TMClient.init(applicationId, txServiceGroup);
//init RM 初始化资源管理器
RMClient.init(applicationId, txServiceGroup);
//注册钩子程序,用于TM、RM的资源清理
registerSpringShutdownHook();
}
Пока всплыли все три компонента, определенные Seata.
TMClient.init
В основном это инициализация клиента диспетчера транзакций, установка соединения с RPC-сервером и одновременная регистрация у координатора транзакций.
RMClient.init
Это тот же процесс, инициализация диспетчера ресурсов, установление соединения с RPC-сервером и регистрация у координатора транзакций.
В то же время все они подключены через временные задачи, поэтому их можно автоматически переподключать после отключения.
timerExecutor.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
clientChannelManager.reconnect(getTransactionServiceGroup());
}
}, 5, 5, TimeUnit.SECONDS);
Наконец, зарегистрируйте хуки для очистки ресурсов в этих двух компонентах.
- AbstractAutoProxyCreator
На самом деле это постпроцессор бина, после инициализации бина он называетсяpostProcessAfterInitialization
метод.
public Object postProcessAfterInitialization(@Nullable Object bean, String beanName) {
if (bean != null) {
Object cacheKey = this.getCacheKey(bean.getClass(), beanName);
if (this.earlyProxyReferences.remove(cacheKey) != bean) {
return this.wrapIfNecessary(bean, beanName, cacheKey);
}
}
return bean;
}
затем вGlobalTransactionScanner.wrapIfNecessary()
Что он сделал?
Это проверка того, содержит ли метод BeanGlobalTransactional
а такжеGlobalLock
Аннотируйте, а затем создайте прокси-класс.
protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey){
if (disableGlobalTransaction) {
return bean;
}
//已经生成了代理,直接返回
if (PROXYED_SET.contains(beanName)) {
return bean;
}
interceptor = null;
//检查是不是TCC的代理
if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {
//TCC interceptor, proxy bean of sofa:reference/dubbo:reference, and LocalTCC
interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));
} else {
Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);
Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);
//判断类方法上是否包含GlobalTransactional注解和GlobalLock注解
if (!existsAnnotation(new Class[] {serviceInterface})
&& !existsAnnotation(interfacesIfJdk)) {
return bean;
}
//创建拦截器
if (interceptor == null) {
interceptor = new GlobalTransactionalInterceptor(failureHandlerHook);
}
}
//如果不是AOP代理,则创建代理;如果是代理,则将拦截器加入到Advisor
if (!AopUtils.isAopProxy(bean)) {
bean = super.wrapIfNecessary(bean, beanName, cacheKey);
} else {
AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);
Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));
for (Advisor avr : advisor) {
advised.addAdvisor(0, avr);
}
}
PROXYED_SET.add(beanName);
return bean;
}
Пока мы остановились на одном. насServiceImpl
класс реализации сGlobalTransactional
Аннотированные методы будут генерировать прокси-класс.
При вызове метода фактически вызывается метод-перехватчик прокси-класса.invoke()
.
public class GlobalTransactionalInterceptor implements MethodInterceptor {
@Override
public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
//获取目标类
Class<?> targetClass = AopUtils.getTargetClass(methodInvocation.getThis());
//获取调用的方法
Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);
final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);
//获取方法上的注解
final GlobalTransactional globalTransactionalAnnotation = getAnnotation(method, GlobalTransactional.class);
final GlobalLock globalLockAnnotation = getAnnotation(method, GlobalLock.class);
//处理全局事务
if (globalTransactionalAnnotation != null) {
return handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation);
} else if (globalLockAnnotation != null) {
return handleGlobalLock(methodInvocation);
} else {
return methodInvocation.proceed();
}
}
}
Как видите, именно здесь начинается глобальная транзакция. Давайте не будем вникать в это здесь, а потом посмотрим вниз.
2. Агент источника данных
В дополнение к прокси для метода, созданного выше, создайте прокси для источника данных, затем установите прокси-объект вSqlSessionFactory
.
@Bean
public DataSourceProxy dataSourceProxy(DataSource dataSource) {
return new DataSourceProxy(dataSource);
}
@Bean
public SqlSessionFactory sqlSessionFactoryBean(DataSourceProxy dataSourceProxy) throws Exception {
SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean();
sqlSessionFactoryBean.setDataSource(dataSourceProxy);
sqlSessionFactoryBean.setTransactionFactory(new JdbcTransactionFactory());
return sqlSessionFactoryBean.getObject();
}
Суть здесь в том, чтобы создатьDataSourceProxy
, и установите его наMybatis
серединаSqlSessionFactory
.
мы знаем, что вMybatis
Когда метод выполняется, наконец, необходимо создатьPreparedStatement
объект, а затем выполнитьps.execute()
Возврат результатов SQL.
Здесь нам нужно обратить внимание на две вещи:
- Создание подготовленного заявления
PreparedStatement
объект изConnection
Объект создан, возможно мы написали:
PreparedStatement pstmt = conn.prepareStatement(insert ........)
- Создание соединения
Connection
Откуда это? Это мы не должны стесняться, конечно, мы можем получить соединение с источником данных.
Однако мы поместили источник данныхDataSource
объект был заменен наSeata
серединаDataSourceProxy
объект.
так,Connection和PreparedStatement
На момент создания было сделаноSeata
Прокси-объект в .
Если вы мне не верите, см.:
public class DataSourceProxy extends AbstractDataSourceProxy implements Resource {
public ConnectionProxy getConnection() throws SQLException {
Connection targetConnection = targetDataSource.getConnection();
return new ConnectionProxy(this, targetConnection);
}
}
тогда позвониAbstractDataSourceProxy
создаватьPreparedStatement
.
public abstract class AbstractConnectionProxy implements Connection {
@Override
public PreparedStatement prepareStatement(String sql) throws SQLException {
PreparedStatement targetPreparedStatement = getTargetConnection().prepareStatement(sql);
return new PreparedStatementProxy(this, targetPreparedStatement, sql);
}
}
Увидев это, мы должны понять одну вещь.
в исполненииps.execute()
, он позвонитPreparedStatementProxy.execute()
.
Прояснив логику файла конфигурации, вы сможете понять его контекст, а изучив код, вы поймете, с чего начать.
В-четвертых, реализация метода
Как указано выше,ServiceImpl
Это уже прокси-класс, так что давайте посмотрим непосредственно наGlobalTransactionalInterceptor.invoke()
.
он позвонитTransactionalTemplate.execute()
,TransactionalTemplate
Шаблон для бизнес-логики и глобальных транзакций.
public class TransactionalTemplate {
public Object execute(TransactionalExecutor business) throws Throwable {
// 1. 创建一个全局事务
GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();
// 1.1 获取事务的属性 比如超时时间、事务名称
TransactionInfo txInfo = business.getTransactionInfo();
if (txInfo == null) {
throw new ShouldNeverHappenException("transactionInfo does not exist");
}
try {
// 2. 开始事务
beginTransaction(txInfo, tx);
Object rs = null;
try {
// 执行业务逻辑
rs = business.execute();
} catch (Throwable ex) {
// 3.回滚
completeTransactionAfterThrowing(txInfo,tx,ex);
throw ex;
}
// 4. 提交
commitTransaction(tx);
return rs;
} finally {
//5. 清理资源
triggerAfterCompletion();
cleanUp();
}
}
}
Код здесь очень понятен, и ход транзакции ясен с первого взгляда.
- Создайте глобальную транзакцию и задайте свойства транзакции
- открыть транзакцию
- Выполнять бизнес-логику
- Если возникает исключение, откатите транзакцию, в противном случае зафиксируйте транзакцию.
- очистить ресурсы
Давайте посмотрим, как он это делает.
1. Начать бизнес
С точки зрения клиента, открытие транзакции означает сообщение серверу: Я хочу начать глобальную транзакцию, пожалуйста, попросите г-на TC, координатора транзакции, назначить мне глобальный идентификатор транзакции.
Mr. TC создаст глобальный сеанс на основе имени приложения, группы транзакций, имени транзакции и т. д., а также сгенерирует XID глобальной транзакции.
Затем клиент записывает текущий статус транзакции какBegin
и привяжите XID к текущему потоку.
2. Выполнить бизнес-логику
После открытия транзакции мы начинаем выполнять собственную бизнес-логику. Это включает в себя операции с базой данных, как мы упоминали выше.Seata
уже поставилPreparedStatement
Объект является прокси. Поэтому, когда он выполняется, он вызываетPreparedStatementProxy.execute()
.
public class PreparedStatementProxy{
public boolean execute() throws SQLException {
return ExecuteTemplate.execute(this, new StatementCallback<Boolean, PreparedStatement>() {
@Override
public Boolean execute(PreparedStatement statement, Object... args) throws SQLException {
return statement.execute();
}
});
}
}
Здесь он сначала сгенерирует разные исполнители в соответствии с типом SQL. напримерINSERT INTO
приговор, тоInsertExecutor
Актуатор.
Затем оцените, отправляется ли он автоматически, и выполните соответствующий метод. Тогда см.executeAutoCommitFalse()
public abstract class AbstractDMLBaseExecutor{
protected T executeAutoCommitFalse(Object[] args) throws Throwable {
TableRecords beforeImage = beforeImage();
T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
TableRecords afterImage = afterImage(beforeImage);
prepareUndoLog(beforeImage, afterImage);
return result;
}
}
Вот что делает режим AT на первом этапе, перехватывая бизнес-SQL и сохраняя его какbeforeImage
; Затем выполните бизнес-SQL и сохраните его какafterImage
. Все эти операции выполняются в локальной транзакции, что обеспечивает атомарность однофазных операций.
мы начинаем сINSERT INTO
Например, посмотрите, как это делается.
- beforeImage
Поскольку это новая операция, эта запись не существует до выполнения, а beforeImage — это просто пустая запись в таблице.
- Бизнес SQL
Выполните исходный оператор SQL, напримерINSERT INTO ORDER(ID,NAME)VALUE(?,?)
- afterImage
Все, что ему нужно сделать, это найти запись, только что добавленную в базу данных.
protected TableRecords afterImage(TableRecords beforeImage) throws SQLException {
//查找主键ID的值
List<Object> pkValues = containsPK() ? getPkValuesByColumn() : getPkValuesByAuto();
//根据主键ID查找记录
TableRecords afterImage = getTableRecords(pkValues);
return afterImage;
}
потомbeforeImage
а такжеafterImage
встроить вUndoLog
объект, сохраненный в базе данных. Важно отметить, что все эти операции выполняются в рамках одной и той же локальной транзакции. Мы также можем увидеть это, взглянув на его sqlList.
Наконец, давайте посмотрим наUndoLog
Записи в базе данных выглядят так:
{
"@class": "io.seata.rm.datasource.undo.BranchUndoLog",
"xid": "192.168.216.1:8091:2016493467",
"branchId": 2016493468,
"sqlUndoLogs": ["java.util.ArrayList", [{
"@class": "io.seata.rm.datasource.undo.SQLUndoLog",
"sqlType": "INSERT",
"tableName": "t_order",
"beforeImage": {
"@class": "io.seata.rm.datasource.sql.struct.TableRecords$EmptyTableRecords",
"tableName": "t_order",
"rows": ["java.util.ArrayList", []]
},
"afterImage": {
"@class": "io.seata.rm.datasource.sql.struct.TableRecords",
"tableName": "t_order",
"rows": ["java.util.ArrayList", [{
"@class": "io.seata.rm.datasource.sql.struct.Row",
"fields": ["java.util.ArrayList", [{
"@class": "io.seata.rm.datasource.sql.struct.Field",
"name": "id",
"keyType": "PrimaryKey",
"type": 4,
"value": 116
}, {
"@class": "io.seata.rm.datasource.sql.struct.Field",
"name": "order_no",
"keyType": "NULL",
"type": 12,
"value": "c233d8fb-5e71-4fc1-bc95-6f3d86312db6"
}, {
"@class": "io.seata.rm.datasource.sql.struct.Field",
"name": "user_id",
"keyType": "NULL",
"type": 12,
"value": "200548"
}, {
"@class": "io.seata.rm.datasource.sql.struct.Field",
"name": "commodity_code",
"keyType": "NULL",
"type": 12,
"value": "HYD5620"
}, {
"@class": "io.seata.rm.datasource.sql.struct.Field",
"name": "count",
"keyType": "NULL",
"type": 4,
"value": 10
}, {
"@class": "io.seata.rm.datasource.sql.struct.Field",
"name": "amount",
"keyType": "NULL",
"type": 8,
"value": 5000.0
}]]
}]]
}
}]]
}
3. Отправить
Если в выполнении бизнеса нет отклонений, он войдет во второй этап представления. Клиент отправляет на сервер событие Commit и одновременно отвязывает XID.
После того, как сервер ответит, чтобы подтвердить отправку, клиент очищает локальные данные UndoLog.
здесь важноAsyncWorker.init()
метод, он запустит запланированную задачу для выполненияdoBranchCommits
, чтобы очистить данные журнала.
4. Откат
При возникновении исключения выполняется двухэтапный откат.
Сначала найдите запись UnDoLog через xid и branchId, затем проанализируйте данные внутри, чтобы сгенерировать обратный SQL, и отзовите результат выполнения прямо сейчас.
Этот код длинный, пожалуйста, обратитесь к нему самиUndoLogManager.undo()
а такжеAbstractUndoExecutor.executeOn()
метод.
5. Как общаться с Даббо
Только один диспетчер транзакций TM может открыть глобальную транзакцию, так как же другие участники службы автоматически вовлекаются в глобальную транзакцию?
первый,Seata
Сделал фильтр для Dubbo под названиемTransactionPropagationFilter
.
это будет вDubbo RPC
Установите XID в контексте, чтобы другие службы также могли получить этот XID.
Затем, как мы знаем, Seata проксировалаPreparedStatement
. При выполнении операций с данными существует суждение.
if (!RootContext.inGlobalTransaction() && !RootContext.requireGlobalLock()) {
//如果不包含XID,就执行原始方法
return statementCallback.execute(statementProxy.getTargetStatement(), args);
}
Смысл здесь в том, что если текущий поток не содержит XID, выполнить исходный метод, если содержит, продолжить выполнение метода транзакции.
V. Резюме
В этой статье изложен принцип работы клиента в режиме Seata TA. Также есть часть логики сервера Seata, которая подробно не рассматривается в этой статье.
Причина в том, что автор не до конца понял эту часть содержания, поэтому нет возможности расписать по-простому, а я добавлю позже~
Если в тексте есть неточности, я тоже надеюсь, что друзья дадут мне какой-нибудь совет, спасибо.