Принцип JTA и реализация распределенных транзакций (3)

Java задняя часть база данных API

Авторские права принадлежат автору Для любой формы перепечатки, пожалуйста, свяжитесь с автором для получения разрешения и укажите источник.

Что такое JTA

Интерфейс программирования транзакций Java (JTA: Java Transaction API) и служба транзакций Java (JTS; служба транзакций Java) предоставляют службы распределенных транзакций для платформы J2EE. Распределенная транзакция включает диспетчер транзакций и один или несколько диспетчеров ресурсов, которые поддерживают протокол XA.

Менеджер ресурсов (менеджер ресурсов)Думайте об этом как о любом типе постоянного хранилища данных.
Менеджер транзакцийОн отвечает за координацию и контроль всех участвующих подразделений фирмы.

Транзакции JTA эффективно защищают базовые ресурсы транзакций, поэтому приложения могут участвовать в обработке транзакций прозрачным образом, однако по сравнению с локальными транзакциями системные накладные расходы протокола XA велики. отдается тому, действительно ли необходимо распределять распределенные транзакции. Если вам действительно нужны распределенные транзакции для координации нескольких ресурсов транзакций, вам следует внедрить и настроить менеджер транзакций, поддерживающий протокол XA, такой как JMS, пул соединений с базой данных JDBC и т. д.

Недостатки управления транзакциями JTA

  • двухэтапная фиксация
  • Время транзакции слишком велико, и время блокировки данных слишком велико.
  • низкая производительность, низкая пропускная способность

Преимущества отказа от управления транзакциями JTA

  • Механизм синхронизации транзакций Spring
  • Приблизительная согласованность транзакций (нестрогая согласованность) между несколькими источниками данных
  • Высокая производительность, высокая пропускная способность

Пример использования JTA для обработки транзакций выглядит следующим образом.(Примечание: connA и connB — это соединения из разных баз данных)

public void transferAccount() { 
        UserTransaction userTx = null; 
        Connection connA = null; 
        Statement stmtA = null; 
                
        Connection connB = null; 
        Statement stmtB = null; 
    
        try{ 
            ## 获得 Transaction 管理对象
            userTx = (UserTransaction)getContext().lookup("\java:comp/UserTransaction"); 
            ## 从数据库 A 中取得数据库连接
            connA = getDataSourceA().getConnection(); 
            
            ## 从数据库 B 中取得数据库连接
            connB = getDataSourceB().getConnection(); 
      
            ## 启动事务
            userTx.begin();
            
            ## 将 A 账户中的金额减少 500 
            stmtA = connA.createStatement(); 
            stmtA.execute("update t_account set amount = amount - 500 where account_id = 'A'");
            
            ## 将 B 账户中的金额增加 500 
            stmtB = connB.createStatement(); 
            stmtB.execute("\update t_account set amount = amount + 500 where account_id = 'B'");
            
            ## 提交事务
            userTx.commit();
            ## 事务提交:转账的两步操作同时成功(数据库 A 和数据库 B 中的数据被同时更新)
        } catch(SQLException sqle){ 
            try{ 
                 ## 发生异常,回滚在本事务中的操纵
                 userTx.rollback();
                ## 事务回滚:转账的两步操作完全撤销 
                ##( 数据库 A 和数据库 B 中的数据更新被同时撤销)
                stmt.close(); 
                conn.close(); 
                ## 剩余业务代码 
            }catch(Exception ignore){ } 
            sqle.printStackTrace(); 
            
        } catch(Exception ne){ 
            e.printStackTrace(); 
        } 
    }

Внешние (глобальные) транзакции — JTA

  • Внешний менеджер транзакций обеспечивает управление транзакциями JTA
  • Менеджер транзакций JTA может управлять несколькими ресурсами данных
  • Транзакции с несколькими источниками данных через двухэтапную фиксацию

Схема процесса механизма транзакции

Внешние (глобальные) транзакции (вызов JTA-менеджера приложения)

Внешние (глобальные) транзакции (вызов JTA-менеджера Spring)

XA

JTA

Принцип реализации JTA

Многих разработчиков будет интересовать внутренняя работа JTA: большую часть времени мы пишем код без какого-либо кода, который взаимодействует с транзакционными ресурсами (такими как соединения с базой данных), но мои операции (обновления базы данных) фактически содержатся внутри транзакции, как JTA достигает такой прозрачности?

Менеджер транзакций JTA и менеджер ресурсов

Чтобы понять принцип реализации JTA, сначала нужно понять его архитектуру:

  • Менеджер транзакций
  • Один или несколько менеджеров ресурсов ( Resource Manager ), которые поддерживают протокол XA.

Мы можем думать о менеджерах ресурсов как о любом типе постоянного носителя данных;
Менеджер транзакций отвечает за координацию и контроль всех подразделений, участвующих в транзакциях.

В зависимости от объектно-ориентированного подхода мы можем понимать диспетчер транзакций JTA и диспетчер ресурсов как два аспекта:

  • Ориентированный на разработчиков интерфейс использования (менеджер транзакций)
  • Интерфейс реализации для поставщика услуг (менеджер ресурсов)

Основная часть интерфейса разработки указана в приведенном выше примере.UserTransactionОбъект, разработчики реализуют распределенные транзакции в информационной системе через этот интерфейс, а интерфейс реализации используется для стандартизации транзакционных услуг, предоставляемых провайдерами (такими как провайдеры подключения к базе данных), что предусматривает функцию управления ресурсами транзакций, чтобы JTA мог выполнять совместная коммуникация между разнородными транзакционными ресурсами. Если взять в качестве примера базы данных, IBM предоставляет драйверы баз данных для реализации распределенных транзакций, а Oracle также предоставляет драйверы баз данных для реализации распределенных транзакций. При одновременном использовании соединений с базами данных DB2 и Oracle JTA может основываться на согласованном интерфейсе. ресурсы транзакций для достижения распределенных транзакций. Именно различные реализации, основанные на единой спецификации, позволяют JTA координировать и контролировать ресурсы транзакций различных баз данных или поставщиков JMS.Архитектура JTAКак показано ниже:

Разработчики используют интерфейс разработчика для реализации поддержки приложением глобальных транзакций; каждый провайдер (база данных, JMS и т. д.) предоставляет функции управления ресурсами транзакций в соответствии со спецификациями интерфейса провайдера; диспетчер транзакций (TransactionManager) будет применять распределенную транзакцию. сопоставляется с фактическими ресурсами транзакций, а также координирует и контролирует ресурсы транзакций. Ниже эта статья будет включатьUserTransaction,TransactionиTransactionManagerПредставлены три основных интерфейса и определенные для них методы.

Пользовательская транзакция и транзакция

Интерфейсом для разработчиков является UserTransaction (метод использования показан в приведенном выше примере), и разработчики обычно используют этот интерфейс только для реализации управления транзакциями JTA, который определяет следующие методы:

  • begin() — запустить распределенную транзакцию (в фоновом режиме TransactionManager создаст объект транзакции Transaction и свяжет этот объект с текущим потоком через * ThreadLocale)
  • commit () — зафиксировать транзакцию (в фоновом режиме TransactionManager достанет объект транзакции из текущего потока и зафиксирует транзакцию, представленную этим объектом)
  • rollback() - откатывает транзакцию (в фоновом режиме TransactionManager вынимает объект транзакции из текущего потока и откатывает транзакцию, представленную этим объектом)
  • getStatus() — возвращает статус распределенной транзакции, связанной с текущим потоком (объект Status определяет все статусы транзакций, заинтересованные читатели могут обратиться к документации по API)
  • setRollbackOnly() — указывает, что распределенная транзакция, связанная с текущим потоком, будет отброшена.

Интерфейс реализации, ориентированный на провайдера, в основном включает два объекта: TransactionManager и Transaction;

Транзакция представляет собой физическую транзакцию. Когда разработчик вызывает метод UserTransaction.begin(), TransactionManager создает объект транзакции Transaction (отмечая начало транзакции) и связывает этот объект с текущим потоком через ThreadLocale. Методы commit(), rollback(), getStatus() и другие методы интерфейса UserTransaction в конечном итоге будут делегированы соответствующим методам класса Transaction для выполнения.

Интерфейс Transaction определяет следующие методы:

  • commit () — координировать различные ресурсы транзакции для завершения фиксации транзакции.
  • rollback() — координирует различные ресурсы транзакции для завершения отката транзакции.
  • setRollbackOnly() — указывает, что распределенная транзакция, связанная с текущим потоком, будет отброшена.
  • getStatus() - возвращает статус распределенной транзакции, связанной с текущим потоком
  • enListResource(XAResource xaRes, int flag) — добавить ресурс транзакции в текущую транзакцию (в приведенном выше примере ресурс транзакции, который он представляет, будет связан с текущей транзакцией при работе с базой данных A и аналогичным образом при работе с базой данных B). ресурс транзакции, который он представляет, также будет связан с текущей транзакцией во время операции)
  • delistResourcec(XAResource xaRes, int flag) — удалить ресурс транзакции из текущей транзакции
  • registerSynchronization (Синхронизация синхронизации) - интерфейс обратного вызова

Инструменты ORM, такие как Hibernate, имеют свой собственный механизм контроля транзакций для обеспечения транзакций, но им также нужен механизм обратного вызова, который будет уведомлен о завершении транзакции, чтобы инициировать некоторую обработку, например очистку кеша и т. д. Это включает в себя интерфейс обратного вызова registerSynchronization of Transaction. Инструмент может внедрить программу обратного вызова в транзакцию через этот интерфейс.Когда транзакция будет успешно отправлена, программа обратного вызова будет активирована. Сам TransactionManager не берет на себя фактическую функцию обработки транзакций, он действует скорее как мост между пользовательским интерфейсом и интерфейсом реализации. Методы, определенные в TransactionManager, перечислены ниже, и вы можете видеть, что большинство методов транзакций в этом интерфейсе такие же, как UserTransaction и Transaction.
Когда разработчик вызывает метод UserTransaction.begin(), TransactionManager создаст объект транзакции Transaction (отмечающий начало транзакции) и свяжет этот объект с текущим потоком через ThreadLocale; аналогично UserTransaction.commit() вызовет TransactionManager.commit () , метод вынесет объект транзакции Transaction из текущего потока и зафиксирует транзакцию, представленную этим объектом, то есть вызовет Transaction.commit() TransactionManager определяет следующие методы:

  • begin() - запускает транзакцию
  • commit() - зафиксировать транзакцию
  • rollback() - откатывает транзакцию
  • getStatus() - возвращает текущий статус транзакции
  • setRollbackOnly()
  • getTransaction() — возвращает транзакцию, связанную с текущим потоком
  • setTransactionTimeout(целое число секунд) — Установить время ожидания транзакции
  • возобновить(транзакция tobj) - возобновить транзакцию, связанную с текущим потоком
  • suspend() - приостанавливает транзакцию, связанную с текущим потоком

В процессе развития системы будут операции, требующие временного исключения ресурсов транзакций, в это время необходимо вызватьsuspend()метод приостанавливает текущую транзакцию: любые операции, выполненные после этого метода, не будут включены в транзакцию, вызываемую после завершения нетранзакционной операции.resume()Чтобы продолжить транзакцию (Примечание: для выполнения этой операции необходимо получить объект TransactionManager, который отличается на разных серверах приложений J2EE)

Нижеследующее познакомит читателей с принципом реализации JTA через специальные коды. На следующем рисунке перечислены классы Java, используемые в примере реализации, гдеUserTransactionImplДостигнутоUserTransactionинтерфейс,TransactionManagerImplДостигнутоTransactionManagerинтерфейс,TransactionImplДостигнутоTransactionинтерфейс

Диаграмма класса реализации JTA:

## 开始事务
public class UserTransactionImpl implenments UserTransaction{
    public void begin() throws NotSupportedException, SystemException { 
     ## 将开始事务的操作委托给 TransactionManagerImpl 
         TransactionManagerImpl.singleton().begin(); 
     }
}
## 开始事务
public class TransactionManagerImpl implements TransactionManager{
## 此处 transactionHolder 用于将 Transaction 所代表的事务对象关联到线程上
    private static ThreadLocal<TransactionImpl> transactionHolder = new ThreadLocal<TransactionImpl>(); 
    ## TransacationMananger 必须维护一个全局对象,因此使用单实例模式实现
     private static TransactionManagerImpl singleton = new TransactionManagerImpl(); 
     
     private TransactionManagerImpl(){ } 
     
     public static TransactionManagerImpl singleton(){ 
         return singleton; 
     } 
 
     public void begin() throws NotSupportedException, SystemException { 
         ## XidImpl 实现了 Xid 接口,其作用是唯一标识一个事务
         XidImpl xid = new XidImpl(); 
         ## 创建事务对象,并将对象关联到线程
         TransactionImpl tx = new TransactionImpl(xid); 
         transactionHolder.set(tx); 
     }
}

Теперь мы можем понять, почему на интерфейсе Transaction не определен метод begin: сам объект Transaction представляет собой транзакцию, и при его создании он указывает, что транзакция началась, поэтому нет необходимости определять дополнительный метод begin. () метод.

##提交事务
public class UserTransactionImpl implenments UserTransaction{
    public void commit() throws RollbackException, HeuristicMixedException, 
        HeuristicRollbackException, SecurityException, 
        IllegalStateException, SystemException { 
        
        // 检查是否是 Roll back only 事务,如果是回滚事务
           if(rollBackOnly){ 
            rollback(); 
            return; 
          } else { 
           // 将提交事务的操作委托给 TransactionManagerImpl 
           TransactionManagerImpl.singleton().commit(); 
          } 
    }
}
## 提交事务
public class TransactionManagerImpl implenments TransactionManager{
public void commit() throws RollbackException, HeuristicMixedException, 
    HeuristicRollbackException, SecurityException, 
    IllegalStateException, SystemException { 
       ## 取得当前事务所关联的事务并通过其 commit 方法提交
       TransactionImpl tx = transactionHolder.get(); 
       tx.commit(); 
    }
}

Точно так же методы rollback, getStatus, setRollbackOnly и другие реализованы так же, как и commit(). Объект UserTransaction не имеет никакого контроля над транзакцией, и все методы транзакции передаются фактическому ресурсу транзакции, а именно объекту Transaction, через TransactionManager.
Приведенный выше пример демонстрирует обработку транзакции JTA, следующий покажет вам, как ресурсы транзакции (подключения к базе данных, JMS) прозрачно участвуют в транзакции JTA. Прежде всего, должно быть ясно, что,Источник базы данных (DataSource), полученный в коде транзакции JTA, должен поддерживать распределенные транзакции.. В следующем примере кода, хотя все операции с базой данных содержатся в транзакциях JTA, поскольку подключение к базе данных MySql получается локально, любые обновления MySql не будут автоматически содержаться в глобальных транзакциях.

## JTA 事务处理
public void transferAccount() { 
        UserTransaction userTx = null; 
        Connection mySqlConnection = null; 
        Statement mySqlStat = null; 
        Connection connB = null; 
        Statement stmtB = null; 
    
        try{ 
            ## 获得 Transaction 管理对象
            userTx = (UserTransaction)getContext().lookup("java:comp/UserTransaction");
            ## 以本地方式获得 mySql 数据库连接
            mySqlConnection = DriverManager.getConnection("localhost:1111"); 
            ## 从数据库 B 中取得数据库连接, getDataSourceB 返回应用服务器的数据源
            connB = getDataSourceB().getConnection(); 
            ## 启动事务
            userTx.begin();
            ## 将 A 账户中的金额减少 500 
            ## mySqlConnection 是从本地获得的数据库连接,不会被包含在全局事务中
            mySqlStat = mySqlConnection.createStatement(); 
            mySqlStat.execute("
            update t_account set amount = amount - 500 where account_id = 'A'");
            ## connB 是从应用服务器得的数据库连接,会被包含在全局事务中
            stmtB = connB.createStatement(); 
            stmtB.execute("update t_account set amount = amount + 500 where account_id = 'B'");
            ## 事务提交:connB 的操作被提交,mySqlConnection 的操作不会被提交
            userTx.commit();
        } catch(SQLException sqle){ 
            ## 处理异常代码
        } catch(Exception ne){ 
            e.printStackTrace(); 
        } 
    }

Почему подключение к базе данных из источника данных с поддержкой транзакций должно поддерживать распределенные транзакции?

  • На самом деле источник данных, поддерживающий транзакции, отличается от обычного источника данных тем, что в нем реализованы дополнительныеXADataSourceинтерфейс. Мы можем просто понимать XADataSource как обычный источник данных (наследующий java.sql.PooledConnection), но он добавляетgetXAResourceметод. Кроме того, соединение с базой данных, возвращаемое XADataSource, также отличается от обычного соединения.Это соединение реализует все функции, определенные в java.sql.Connection.XAConnectionинтерфейс. Мы можем понимать XAConnection как обычное соединение с базой данных, которое поддерживает все операции с базой данных спецификации JDBC, разница в том, чтоXAConnection добавляет поддержку распределенных транзакций.

Диаграмма классов ресурсов транзакций

Соединение с базой данных, полученное приложением из источника данных, который поддерживает распределенные транзакции, является реализацией интерфейса XAConnection, и сеанс (оператор), созданный этим соединением с базой данных, также добавляет функциональные возможности для поддержки распределенных транзакций, как показано в следующем коде:

## JTA 事务资源处理
public void transferAccount() { 
        UserTransaction userTx = null; 
        Connection conn = null; 
        Statement stmt = null; 
        
        try{ 
            ##获得 Transaction 管理对象
            userTx = (UserTransaction)getContext().lookup("java:comp/UserTransaction"); 
            ##从数据库中取得数据库连接, getDataSourceB 返回支持分布式事务的数据源
            conn = getDataSourceB().getConnection(); 
            ## 会话 stmt 已经为支持分布式事务进行了功能增强
            stmt = conn.createStatement(); 
            ## 启动事务
            userTx.begin();
            stmt.execute("update t_account ... where account_id = 'A'"); 
            userTx.commit();
        } catch(SQLException sqle){ 
            ## 处理异常代码
        } catch(Exception ne){ 
            e.printStackTrace(); 
        } 
    }

Давайте взглянем на реализацию кода части Session (Statement), созданной соединением с базой данных XAConnection (разные поставщики JTA имеют разные реализации, пример кода здесь просто показывает, как ресурсы транзакции автоматически добавляются в транзакцию). мывыполнить с объектом сеансаметод, например, поДобавление вызова метода AssociateWithTransactionIfNecessary в начале метода гарантирует, что во время транзакции JTA операции с любым подключением к базе данных будут прозрачно добавлены в транзакцию..

##将事务资源自动关联到事务对象
public class XAStatement implements Statement{
    public void execute(String sql) { 
        ## 对于每次数据库操作都检查此会话所在的数据库连接是否已经被加入到事务中
        associateWithTransactionIfNecessary(); 
        try{ 
          ## 处理数据库操作的代码
          .... 
        } catch(SQLException sqle){ 
            ## 处理异常代码
        } catch(Exception ne){ 
            e.printStackTrace(); 
        } 
    } 
    public void associateWithTransactionIfNecessary(){ 
    ## 获得 TransactionManager 
        TransactionManager tm = getTransactionManager(); 
        Transaction tx = tm.getTransaction();
        ## 检查当前线程是否有分布式事务
        if(tx != null){ 
        ## 在分布式事务内,通过 tx 对象判断当前数据连接是否已经被包含在事务中,如果不是那么将此连接加入到事务中
        Connection conn = this.getConnection(); 
        ##tx.hasCurrentResource, xaConn.getDataSource() 不是标准的 JTA 
        ## 接口方法,是为了实现分布式事务而增加的自定义方法
        if(!tx.hasCurrentResource(conn)){ 
            XAConnection xaConn = (XAConnection)conn; 
            XADataSource xaSource = xaConn.getDataSource(); 
            ## 调用 Transaction 的接口方法,将数据库事务资源加入到当前事务中
            tx.enListResource(xaSource.getXAResource(), 1);
            } 
        } 
   }
}

XAResource и Xid

  • XAResource — это Java-реализация стандарта Distributed Transaction Processing: The XA Specification.Это абстракция базовых ресурсов транзакций и определяет протокол между диспетчером транзакций и диспетчером ресурсов в процессе распределенной обработки транзакций.Каждый поставщик ресурсов транзакций ( например драйвер JDBC, JMS) обеспечит реализацию этого интерфейса. Используя этот интерфейс, разработчики могут реализовывать распределенную обработку транзакций с помощью собственного программирования, но обычно это реализуется сервером приложений (собственная реализация сервера более эффективна и стабильна).
  • Xid: перед использованием распределенных транзакций, чтобы различать транзакции, чтобы их не путали и не использовали для идентификации транзакций, Xid можно рассматривать как идентификатор транзакции.Каждый раз, когда создается новая транзакция, Xid будет назначен к транзакции Xid содержит три элемента:formatID,gtrid (глобальный идентификатор транзакцииbqual (идентификатор модификатора ветки). formatID обычно равен нулю, что означает, что вы будете использовать OSI CCR (стандарт Open Systems Interconnection Commitment, Concurrency and Recovery) для именования; если вы хотите использовать другой формат, formatID должен быть больше нуля, а значение -1 означает, что Xid недействителен. gtrid и bqual содержат 64 байта двоичного кода для идентификации глобальных транзакций и ветвей транзакций соответственно, единственным требованием являетсяgtrid и bqual должны быть глобально уникальными.

Интерфейс XAResource в основном определяет следующие методы:

  • commit() - зафиксировать транзакцию
  • isSameRM(XAResource xares) — проверяет, являются ли текущий XAResource и параметр одним и тем же ресурсом транзакции.
  • prepare() - уведомляет менеджера ресурсов о необходимости подготовить фиксацию транзакции.
  • rollback() — уведомляет менеджера ресурсов о необходимости отката транзакции.

Когда транзакция зафиксирована, объект Transaction соберет все ресурсы XAResource, содержащиеся в текущей транзакции, а затем вызовет метод фиксации ресурса, как показано в следующем коде:

## 提交事务
public class TransactionImpl implements Transaction{
    public void commit() throws RollbackException, HeuristicMixedException, 
         HeuristicRollbackException, SecurityException, 
         IllegalStateException, SystemException { 
         ## 得到当前事务中的所有事务资源
         List<XAResource> list = getAllEnlistedResouces(); 
         ## 通知所有的事务资源管理器,准备提交事务
         ## 对于生产级别的实现,此处需要进行额外处理以处理某些资源准备过程中出现的异常
         for(XAResource xa : list){ 
             xa.prepare(); 
         } 
         ## 所有事务性资源,提交事务
         for(XAResource xa : list){ 
             xa.commit(); 
         } 
   }
}

Благодаря приведенному выше введению я полагаю, что читатели имеют некоторое представление о принципе JTA.Пример кода в этой статье является идеальной гипотетической реализацией. Полная и зрелая реализация транзакции JTA должна учитывать и учитывать многие детали, такие как производительность (многопоточная параллельная отправка транзакций при отправке транзакций), отказоустойчивость (сеть, системные исключения) и т. д. зрелое накопление. Заинтересованные читатели могут прочитать некоторые реализации JTA с открытым исходным кодом для дальнейшего углубленного изучения.

Информационная рекомендация

Реализация распределенных транзакций JTA с помощью Atomikos
JTA Deep Adventure — принцип и реализация
4.0 Глобальные транзакции Atomikos JTA/XA