Что такое Сеата
Seata
Это платформа для распределенных транзакций, недавно открытая компанией Alibaba.github.com/seata/seata. Структура включает в себя группуTXC
(Облачная версия называетсяGTS
) и Ant FinancialTCC
Два режима, всего несколько месяцевGithub
Вверхstar
Их число приближается к 10 000, и в настоящее время это единственное решение для распределенных транзакций, одобренное крупными производителями.
TXC
существуетSeata
он жеAT
Режим, что означает, что метод компенсации автоматически генерируется платформой, которая полностью экранирована для пользователей. Пользователи могут использовать распределенные транзакции, как если бы они использовали локальные транзакции. Недостатком является то, что поддерживаются только реляционные базы данных (в настоящее время поддерживаютсяMySQL
), представляяSeata AT
Службе требуется локальное хранилище таблицrollback_info
, Уровень изоляции по умолчаниюRU
Применимые сценарии ограничены.
TCC
Это не новая концепция, она существует уже давно, пользователи определяют ее поtry/confirm/cancel
Три метода имитируют двухфазную фиксацию на уровне приложения, разница в том, что в TCCtry
Этот метод также должен управлять базой данных для блокировки ресурсов.Последующие два метода компенсации автоматически вызываются платформой для выполнения отправки и отката ресурсов соответственно, что аналогично простому уровню хранения.2PC
Не совсем то же самое. Муравей ФинансовыйSeata
внес свой вкладTCC
Реализация, говорят, что он развивался более десяти лет и широко используется в финансах, торговле, складировании и других областях.
История рождения распределенных транзакций
Ранние приложения имели единую архитектуру. Например, учетная запись, сумма и система заказов, связанные с платежными услугами, обрабатывались одним приложением. Нижний уровень имел доступ к одному и тому же экземпляру базы данных, а операции с естественными транзакциями также были локальными транзакциями.Spring
Это может быть легко реализовано, однако из-за увеличения масштабов один сервис необходимо разделить на три независимых сервиса.RPC
Когда вызов сделан, данные также существуют в разных экземплярах базы данных.Поскольку бизнес-операция включает в себя изменение нескольких данных базы данных в это время, больше нельзя полагаться на локальные транзакции и можно решить только через структуру распределенных транзакций. .
TCC — это решение для распределенных транзакций, оно относится к типу гибкой компенсации, преимущество в том, что оно простое для понимания и толькоtry
Фазовая блокировка обеспечивает лучшую производительность параллелизма, но недостатком является стоимость преобразования кода.
Что такое TCC В этой статье мы не будем вдаваться в подробности, сама концепция TCC не сложна.
Как использовать SeataTCC
Прежде чем анализировать исходный код, мы кратко упомянем следующееSeata TCC
Использование шаблона полезно для последующего понимания всейTCC
Процесс.
Участники Seata TCC
Seata
серединаTCC
Требования режимаTCC
Участники сервиса добавляются на интерфейс@TwoPhaseBusinessAction
примечание, примечаниеTCC
имя интерфейса (глобально уникальное),TCC
интерфейсconfirm
иcancel
Имя метода, используемого для последующих вызовов отражения фреймворка, приведено ниже.TCC
Пример интерфейса:
public interface TccAction {
@TwoPhaseBusinessAction(name = "yourTccActionName", commitMethod = "confirm", rollbackMethod = "cancel")
public boolean try(BusinessActionContext businessActionContext, int a, int b);
public boolean confirm(BusinessActionContext businessActionContext);
public boolean cancel(BusinessActionContext businessActionContext);
}
Затем определите класс реализацииImpl
Реализуйте этот интерфейс, чтобы предоставить конкретные реализации для трех методов. Наконец, служба участника публикуется и регистрируется на удаленном конце, в основном для последующих действий.Seata
Фреймворк обращается к участникамconfirm
илиcancel
метод закрывает циклTCC
Дела.
Инициатор Seata TCC
Seata TCC
Инициатор похож на нашу картинку вышеpayment service
, участникам необходимо увеличить бизнес-метод@GlobalTransactional
Аннотация, используемая для открытия глобальной транзакции регистрации аспекта, вызываемой в бизнес-методе.TCC
некоторые из сторонtry
метод, как только бизнес-метод будет успешно вызван,Seata
Framework будет информироватьTC
Перезвоните эти партииconfirm
иcancel
метод.
Анализ исходного кода
Seata
серединаTCC
Исходный код паттерна не сложен, в основном он ориентирован на:
module | class | Функции |
---|---|---|
seata-spring | GlobalTransactionalInterceptor.class | Глобальная логика аспекта транзакции, включая регистрацию глобальных транзакций и получение XID |
seata-spring | TccActionInterceptor.class | Логика аспекта участника TCC |
seata-tcc | TCCResourceManager.class | Разобрать TCC Bean и сохранить ресурсы TCC для последующих обратных вызовов. |
seata-tcc | ActionInterceptorHandler.class | Реализация регистрации транзакций филиала ТСС |
seata-server | DefaultCoordinator.класс, FileTransactionStoreManager.класс | В основном реализация TC, хранение транзакций и т.д. |
Подпишитесь на ресурсы TCC
Seata
ОдинTCC
интерфейс называетсяTCC Resources
, структура которого следующая:
public class TCCResource implements Resource {
private String resourceGroupId = "DEFAULT";
private String appName;
private String actionName; // TCC 接口名称
private Object targetBean; // TCC Bean
private Method prepareMethod; // try 方法
private String commitMethodName;
private Method commitMethod; // confirm 方法
private String rollbackMethodName;
private Method rollbackMethod; // cancel 方法
// …… 省略
}
Seata
Разбор для существования в приложенииTCC Bean
, затем черезparserRemotingServiceInfo
метод созданияTCCResource
объект, а затем вызватьTCCResourceManager
КатегорияregisterResource
метод, будетTCCResource
Объекты сохраняются локальноtccResourceCache
, этоConcurrentHashMap
структура, при прохожденииRmRpcClient
вTCCResource
изresourceId
,address
Подождите, пока информация будет зарегистрирована на сервере для удобства последующих действий.TC
пройти черезRPC
Перезвоните по правильному адресу.
// 解析 TCCResource 的部分代码
Class<?> interfaceClass = remotingBeanDesc.getInterfaceClass();
Method[] methods = interfaceClass.getMethods();
if(isService(bean, beanName)){
try {
// 如果是 TCC service Bean,解析并注册该 resource
Object targetBean = remotingBeanDesc.getTargetBean();
for(Method m : methods){
TwoPhaseBusinessAction twoPhaseBusinessAction = m.getAnnotation(TwoPhaseBusinessAction.class);
if(twoPhaseBusinessAction != null){
// 如果有 TCC 参与方注解,定义一个 TCCResource,
TCCResource tccResource = new TCCResource();
tccResource.setActionName(twoPhaseBusinessAction.name());
// TCC Bean
tccResource.setTargetBean(targetBean);
// try 方法
tccResource.setPrepareMethod(m);
// confirm 方法名称
tccResource.setCommitMethodName(twoPhaseBusinessAction.commitMethod());
// confirm 方法对象
tccResource.setCommitMethod(ReflectionUtil.getMethod(interfaceClass, twoPhaseBusinessAction.commitMethod(), new Class[]{BusinessActionContext.class}));
// cancel 方法名称
tccResource.setRollbackMethodName(twoPhaseBusinessAction.rollbackMethod());
// cancel 方法对象
tccResource.setRollbackMethod(ReflectionUtil.getMethod(interfaceClass, twoPhaseBusinessAction.rollbackMethod(), new Class[]{BusinessActionContext.class}));
// 调用到 TCCResourceManager 的 registerResource 方法
DefaultResourceManager.get().registerResource(tccResource);
}
}
}catch (Throwable t){
throw new FrameworkException(t, "parser remting service error");
}
}
Давайте взглянемTCCResourceManager
изregisterResource
Реализация метода:
// 内存中保存的 resourceId 和 TCCResource 的映射关系
private Map<String, Resource> tccResourceCache = new ConcurrentHashMap<String, Resource>();
@Override
public void registerResource(Resource resource) {
TCCResource tccResource = (TCCResource) resource;
tccResourceCache.put(tccResource.getResourceId(), tccResource);
// 调用父类的方法通过 RPC 注册到远端
super.registerResource(tccResource);
}
ПосмотримTCCResource
Как зарегистрироваться на сервере:
public void registerResource(Resource resource) {
// 拿到 RmRpcClient 实例,调用其 registerResource 方法
RmRpcClient.getInstance().registerResource(resource.getResourceGroupId(), resource.getResourceId());
}
public void registerResource(String resourceGroupId, String resourceId) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("register to RM resourceId:" + resourceId);
}
synchronized (channels) {
for (Map.Entry<String, Channel> entry : channels.entrySet()) {
String serverAddress = entry.getKey();
Channel rmChannel = entry.getValue();
if (LOGGER.isInfoEnabled()) {
LOGGER.info("register resource, resourceId:" + resourceId);
}
// 注册 resourceId,远端将其解析为一个 RpcContext 保存在内存中
sendRegisterMessage(serverAddress, rmChannel, resourceId);
}
}
}
GlobalTransaction регистрирует глобальные транзакции
GlobalTransaction
Аннотация — это точка входа в глобальную транзакцию, а ее аспектная логика реализована вGlobalTransactionalInterceptor
в классе. Если принято решение войти@GlobalTransaction
Модифицированный метод вызоветhandleGlobalTransaction
Метод входит в логику аспекта, где ключевой методtransactionalTemplate
изexecute
метод.
public Object execute(TransactionalExecutor business) throws Throwable {
// 如果上游已经有 xid 传过来说明自己是下游,直接参与到这个全局事务中就可以,不必新开一个,角色是 Participant
// 如果上游没有 xid 传递过来,说明自己是发起方,新开启一个全局事务,角色是 Launcher
GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();
// …… …… 省略
try {
// 开启全局事务
beginTransaction(txInfo, tx);
Object rs = null;
try {
// 调用业务方法
rs = business.execute();
} catch (Throwable ex) {
// 如果抛异常,通知 TC 回滚全局事务
completeTransactionAfterThrowing(txInfo,tx,ex);
throw ex;
}
// 如果不抛异常,通知 TC 提交全局事务
commitTransaction(tx);
return rs;
}
// …… …… 省略
}
beginTransaction
метод называетсяtransactionManager
изbegin
метод:
// 客户端
@Override
public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
throws TransactionException {
GlobalBeginRequest request = new GlobalBeginRequest();
request.setTransactionName(name);
request.setTimeout(timeout);
// 发送 RPC,获取 TC 下发的 xid
GlobalBeginResponse response = (GlobalBeginResponse)syncCall(request);
return response.getXid();
}
// 服务端
@Override
public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
throws TransactionException {
// 全局事务用 GlobalSession 来表示
GlobalSession session = GlobalSession.createGlobalSession(
applicationId, transactionServiceGroup, name, timeout);
session.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
// 将 GlobalSession 写入文件存储
session.begin();
// 返回 UUID 作为全局事务 ID
return XID.generateXID(session.getTransactionId());
}
TwoPhaseBusinessAction зарегистрировать транзакцию филиала
Когда глобальная транзакция вызывает бизнес-метод, он войдетTCC
Аспектная логика участников в основном реализуется вTccActionInterceptor
класс, ключевой методactionInterceptorHandler
изproceed
метод.
public Map<String, Object> proceed(Method method, Object[] arguments, TwoPhaseBusinessAction businessAction, Callback<Object> targetCallback) throws Throwable {
// …… …… 省略
// 创建分支事务
String branchId = doTccActionLogStore(method, arguments, businessAction, actionContext);
actionContext.setBranchId(branchId);
// 记录方法参数
Class<?>[] types = method.getParameterTypes();
int argIndex = 0;
for (Class<?> cls : types) {
if (cls.getName().equals(BusinessActionContext.class.getName())) {
arguments[argIndex] = actionContext;
break;
}
argIndex++;
}
// …… …… 省略
}
doTccActionLogStore
Метод отвечает за регистрацию транзакций филиала:
// 客户端
protected String doTccActionLogStore(Method method, Object[] arguments, TwoPhaseBusinessAction businessAction, BusinessActionContext actionContext) {
String actionName = actionContext.getActionName();
// 拿到全局事务 ID
String xid = actionContext.getXid();
// …… …… 省略
try {
// resourceManager 通过 RPC 向 TC 注册分支事务
Long branchId = DefaultResourceManager.get().branchRegister(BranchType.TCC, actionName, null, xid, applicationContextStr, null);
// 拿到 TC 返回的分支事务 ID
return String.valueOf(branchId);
}
// …… …… 省略
}
// 服务端
@Override
public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid,
String applicationData, String lockKeys) throws TransactionException {
GlobalSession globalSession = assertGlobalSession(XID.getTransactionId(xid), GlobalStatus.Begin);
// 分支事务用 BranchSession 表示,新建一个 BranchSession
BranchSession branchSession = SessionHelper.newBranchByGlobal(globalSession, branchType, resourceId,
applicationData, lockKeys, clientId);
if (!branchSession.lock()) {
throw new TransactionException(LockKeyConflict);
}
try {
// 将分支事务加入全局事务中,也会写文件
globalSession.addBranch(branchSession);
} catch (RuntimeException ex) {
throw new TransactionException(FailedToAddBranch);
}
// 返回分支事务 ID
return branchSession.getBranchId();
}
Метод компенсации участника обратного звонка TC
Транзакция филиала зарегистрирована, и бизнес-метод успешно вызывается для уведомленияTC
Зафиксируйте глобальную транзакцию.
@Override
public void commit() throws TransactionException {
// 如果是参与者,无需发起提交请求
if (role == GlobalTransactionRole.Participant) {
return;
}
// 由 TM 向 TC 发出提交全局事务的请求
status = transactionManager.commit(xid);
}
TC
полученный клиентTM
изcommit
После запроса:
@Override
public GlobalStatus commit(String xid) throws TransactionException {
// 根据 xid 找出 GlobalSession
GlobalSession globalSession = SessionHolder.findGlobalSession(XID.getTransactionId(xid));
if (globalSession == null) {
return GlobalStatus.Finished;
}
GlobalStatus status = globalSession.getStatus();
// 关闭这个 GlobalSession,不让后续的分支事务再注册上来
globalSession.closeAndClean();
if (status == GlobalStatus.Begin) {
// 修改状态为提交进行中
globalSession.changeStatus(GlobalStatus.Committing);
// 一旦分支事务中存在 TCC,做同步提交,其实 TCC 分支也可以异步提交,要求高性能时可以选择异步
if (globalSession.canBeCommittedAsync()) {
asyncCommit(globalSession);
} else {
doGlobalCommit(globalSession, false);
}
}
return globalSession.getStatus();
}
doGlobalCommit
это ключевой метод, на котором мы фокусируемся, и мы игнорируем вторичную логику в нем:
@Override
public void doGlobalCommit(GlobalSession globalSession, boolean retrying) throws TransactionException {
for (BranchSession branchSession : globalSession.getSortedBranches()) {
// …… …… 省略
try {
// 调用 DefaultCoordinator 的 branchCommit 方法做分支提交
// 参数有分支事务 id,resourceId 用来寻找对应的 TCCResource 和补偿方法参数信息
BranchStatus branchStatus = resourceManagerInbound.branchCommit(branchSession.getBranchType(),
XID.generateXID(branchSession.getTransactionId()), branchSession.getBranchId(),
branchSession.getResourceId(), branchSession.getApplicationData());
}
}
// …… …… 省略
}
серверная частьDefaultCoordinator
в классеbranchCommit
метод испускаетRPC
запрос, звоните в соответствующуюTCCResource
провайдер:
@Override
public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId,
String applicationData)
throws TransactionException {
// …… …… 省略
// 获取全局事务和分支事务
GlobalSession globalSession = SessionHolder.findGlobalSession(XID.getTransactionId(xid));
BranchSession branchSession = globalSession.getBranch(branchId);
// 根据 resourceId 找到对应的 channel 和 RpcContext
BranchCommitResponse response = (BranchCommitResponse)messageSender.sendSyncRequest(resourceId,
branchSession.getClientId(), request);
// 返回分支事务提交状态
return response.getBranchStatus();
// …… …… 省略
}
Клиент, естественно, получает фиксацию веткиRPC
запрос, а затем локально найти ранее проанализированный и сохраненныйTCCResource
Выполните вызов отражения метода компенсации. Ниже мы перехватываем ключевые шаги для анализа.
@Override
public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) throws TransactionException {
// 根据 resourceId 找出内存中保留的 TCCResource 对象
TCCResource tccResource = (TCCResource) tccResourceCache.get(resourceId);
if(tccResource == null){
throw new ShouldNeverHappenException("TCC resource is not exist, resourceId:" + resourceId);
}
// 获取 targetBean 和相应的 method 对象
Object targetTCCBean = tccResource.getTargetBean();
Method commitMethod = tccResource.getCommitMethod();
try {
boolean result = false;
// 取出补偿方法参数信息
BusinessActionContext businessActionContext = getBusinessActionContext(xid, branchId, resourceId, applicationData);
// 反射调用补偿方法
Object ret = commitMethod.invoke(targetTCCBean, businessActionContext);
// 返回状态
return result ? BranchStatus.PhaseTwo_Committed:BranchStatus.PhaseTwo_CommitFailed_Retryable;
}
// …… …… 省略
}
Транзакционное хранение
Что касается того, как модуль Seata TC выполняет хранение транзакций, в некоторых статьях в Интернете уже подробно объясняется, например,Углубленный анализ универсального решения для распределенных транзакций Seata-Server, поэтому здесь не повторяется.
Стоит упомянуть, что,TC
Можно стать узким местом в производительности всей службы распределенных транзакций, так как это сделать高性能
и高可用
Важно, чтобы текущее хранилищеFile
, код также имеет околоDB Store Mode
изTODO
элемент, файл сравнивается сDB
Производительность однозначно лучше, но юзабилити будет чуть хуже.Как это можно гарантированно ждать до доработки?HA Cluster
Вернитесь после публикации.
Суммировать
весьSeata
Рамка оTCC
Часть исходного кода не сложна. В этой статье только выбран ключевой код в некоторых классах для отображения, игнорируя некоторую логику суждений и обработку исключений. Автор считает, чтоSeata TCC
оTCC
Также стоит обратить внимание на инкапсуляцию исключений и пользовательскую обработку, а также на дизайн различных скрытых точек расширения пользователя.
МуравейSOFA Channel
я сделал предыдущийSeata TCC
Поделиться Seata TCCЭто также упоминается в объяснении,TCC
Сложность фреймворка не в нем самом, а в том, как написать хорошийTCC
Интерфейс, если вас интересует эта часть, вы можете щелкнуть ссылку, чтобы узнать о ней больше.
напиши в конце
Это молодой человек в халате программиста, который время от времени обновляется. Не только делиться гиковскими технологиями, но и записывать фейерверки в мире.