SpringBoot+Mybatis настраивает несколько источников данных и схем транзакций.

Java

предисловие

Возможно, из-за некоторых бизнес-требований нашей системе иногда необходимо подключить несколько баз данных, что приводит к проблеме нескольких источников данных.

В случае нескольких источников данных, как правило, мы должны выполнять автоматическое переключение.В это время аннотация транзакции Transactional не вступает в силу, и будет задействована проблема распределенной транзакции.

Что касается решения с несколькими источниками данных, то автор видел несколько примеров в Интернете, но большинство из них являются неправильными примерами, которые вообще не могут быть запущены или несовместимы с транзакциями.

Сегодня мы немного разберем первопричины этих проблем и соответствующие решения.

1. Несколько источников данных

Для плавного развития сюжета наш смоделированный бизнес заключается в создании заказов и вычете инвентаря.

Итак, давайте сначала создадим таблицу заказов и таблицу инвентаризации. Обратите внимание, поместите их в две базы данных отдельно.

CREATE TABLE `t_storage` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `commodity_code` varchar(255) DEFAULT NULL,
  `count` int(11) DEFAULT '0',
  PRIMARY KEY (`id`),
  UNIQUE KEY `commodity_code` (`commodity_code`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8;

CREATE TABLE `t_order` (
  `id` bigint(16) NOT NULL,
  `commodity_code` varchar(255) DEFAULT NULL,
  `count` int(11) DEFAULT '0',
  `amount` double(14,2) DEFAULT '0.00',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

1. Подключение к базе данных

Сначала настройте обе базы данных через файл YML.

spring:
  datasource:
    ds1:
      jdbc_url: jdbc:mysql://127.0.0.1:3306/db1
      username: root
      password: root
    ds2:
      jdbc_url: jdbc:mysql://127.0.0.1:3306/db2
      username: root
      password: root

2. Настройте источник данных

Мы знаем, что когда Mybatis выполняет оператор SQL, он должен сначала получить соединение. В это время он передается менеджеру Spring для получения соединения от DataSource.

В Spring есть DataSource с функцией маршрутизации, которая может вызывать разные источники данных путем поиска ключей, чтоAbstractRoutingDataSource.

public abstract class AbstractRoutingDataSource{
    //数据源的集合
    @Nullable
    private Map<Object, Object> targetDataSources;
    //默认的数据源
    @Nullable
    private Object defaultTargetDataSource;
	
    //返回当前的路由键,根据该值返回不同的数据源
    @Nullable
    protected abstract Object determineCurrentLookupKey();
    
    //确定一个数据源
    protected DataSource determineTargetDataSource() {
        //抽象方法 返回一个路由键
        Object lookupKey = determineCurrentLookupKey();
        DataSource dataSource = this.targetDataSources.get(lookupKey);
        return dataSource;
    }
}

Можно видеть, что ядро ​​этого абстрактного класса состоит в том, чтобы сначала установить несколько источников данных в коллекцию Map, а затем получить разные источники данных в соответствии с ключом.

Затем мы можем переопределить метод defineCurrentLookupKey, который возвращает имя источника данных.

public class DynamicDataSource extends AbstractRoutingDataSource {
    @Override
    protected Object determineCurrentLookupKey() {
        DataSourceType.DataBaseType dataBaseType = DataSourceType.getDataBaseType();
        return dataBaseType;
    }
}

Затем вам также понадобится класс инструмента для сохранения типа источника данных текущего потока.

public class DataSourceType {

    public enum DataBaseType {
        ds1, ds2
    }
    // 使用ThreadLocal保证线程安全
    private static final ThreadLocal<DataBaseType> TYPE = new ThreadLocal<DataBaseType>();
    // 往当前线程里设置数据源类型
    public static void setDataBaseType(DataBaseType dataBaseType) {
        if (dataBaseType == null) {
            throw new NullPointerException();
        }
        TYPE.set(dataBaseType);
    }
    // 获取数据源类型
    public static DataBaseType getDataBaseType() {
        DataBaseType dataBaseType = TYPE.get() == null ? DataBaseType.ds1 : TYPE.get();
        return dataBaseType;
    }
}

После того, как все это будет сделано, нам также нужно настроить этот источник данных в контейнере Spring. Следующий класс конфигурации делает следующее:

  • Создание нескольких источников данных DataSource, ds1 и ds2;
  • Поместите источники данных ds1 и ds2 в динамический источник данных DynamicDataSource;
  • Вставьте DynamicDataSource в SqlSessionFactory.
@Configuration
public class DataSourceConfig {

    /**
     * 创建多个数据源 ds1 和 ds2
     * 此处的Primary,是设置一个Bean的优先级
     * @return
     */
    @Primary
    @Bean(name = "ds1")
    @ConfigurationProperties(prefix = "spring.datasource.ds1")
    public DataSource getDateSource1() {
        return DataSourceBuilder.create().build();
    }
    @Bean(name = "ds2")
    @ConfigurationProperties(prefix = "spring.datasource.ds2")
    public DataSource getDateSource2() {
        return DataSourceBuilder.create().build();
    }


    /**
     * 将多个数据源注入到DynamicDataSource
     * @param dataSource1
     * @param dataSource2
     * @return
     */
    @Bean(name = "dynamicDataSource")
    public DynamicDataSource DataSource(@Qualifier("ds1") DataSource dataSource1,
                                        @Qualifier("ds2") DataSource dataSource2) {
        Map<Object, Object> targetDataSource = new HashMap<>();
        targetDataSource.put(DataSourceType.DataBaseType.ds1, dataSource1);
        targetDataSource.put(DataSourceType.DataBaseType.ds2, dataSource2);
        DynamicDataSource dataSource = new DynamicDataSource();
        dataSource.setTargetDataSources(targetDataSource);
        dataSource.setDefaultTargetDataSource(dataSource1);
        return dataSource;
    }
    
    
    /**
     * 将动态数据源注入到SqlSessionFactory
     * @param dynamicDataSource
     * @return
     * @throws Exception
     */
    @Bean(name = "SqlSessionFactory")
    public SqlSessionFactory getSqlSessionFactory(@Qualifier("dynamicDataSource") DataSource dynamicDataSource)
            throws Exception {
        SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
        bean.setDataSource(dynamicDataSource);
        bean.setMapperLocations(
                new PathMatchingResourcePatternResolver().getResources("classpath*:mapping/*.xml"));
        bean.setTypeAliasesPackage("cn.youyouxunyin.multipledb2.entity");
        return bean.getObject();
    }
}

3. Установите ключ маршрутизации

После того, как описанная выше конфигурация завершена, нам также необходимо найти способ динамического изменения значения ключа источника данных, связанного с бизнесом системы.

Например, здесь у нас есть два интерфейса Mapper, создающие заказы и вычитающие запасы.

public interface OrderMapper {
    void createOrder(Order order);
}
public interface StorageMapper {
    void decreaseStorage(Order order);
}

Затем мы можем создать аспект, при выполнении операции заказа переходить к источнику данных ds1, при выполнении операции инвентаризации — к источнику данных ds2.

@Component
@Aspect
public class DataSourceAop {
    @Before("execution(* cn.youyouxunyin.multipledb2.mapper.OrderMapper.*(..))")
    public void setDataSource1() {
        DataSourceType.setDataBaseType(DataSourceType.DataBaseType.ds1);
    }
    @Before("execution(* cn.youyouxunyin.multipledb2.mapper.StorageMapper.*(..))")
    public void setDataSource2() {
        DataSourceType.setDataBaseType(DataSourceType.DataBaseType.ds2);
    }
}

4. Тест

Теперь вы можете написать метод службы и протестировать его через интерфейс REST.

public class OrderServiceImpl implements OrderService {
    @Override
    public void createOrder(Order order) {
        storageMapper.decreaseStorage(order);
        logger.info("库存已扣减,商品代码:{},购买数量:{}。创建订单中...",order.getCommodityCode(),order.getCount());
        orderMapper.createOrder(order);
    }
}

По крайней мере, после завершения бизнес-процесса таблицы двух баз данных изменились.

Но сейчас мы будем думать, что эти две операции должны гарантировать атомарность. Поэтому нам нужно полагаться на транзакции и пометить Transactional в методе Service.

Если мы добавим аннотацию Transactional в метод createOrder, а затем запустим код, будет выдано исключение.

### Cause: java.sql.SQLSyntaxErrorException: Table 'db2.t_order' doesn't exist
; bad SQL grammar []; nested exception is java.sql.SQLSyntaxErrorException: 
    Table 'db2.t_order' doesn't exist] with root cause

Это означает, что если транзакция Spring добавлена, наш источник данных не может быть переключен. Что тут происходит?

Во-вторых, режим транзакций, почему нельзя переключить источник данных

Чтобы выяснить причину, мы должны проанализировать и проанализировать, добавляется ли транзакция Spring, что она делает?

Мы знаем, что автоматические транзакции Spring реализованы на основе АОП. Когда вызывается метод, содержащий транзакцию, вводится перехватчик.

public class TransactionInterceptor{
    public Object invoke(MethodInvocation invocation) throws Throwable {
        //获取目标类
        Class<?> targetClass = AopUtils.getTargetClass(invocation.getThis());
        //事务调用
        return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed);
    }
}

1. Создайте транзакцию

В нем первым делом нужно приступить к созданию транзакции.

protected Object doGetTransaction() {
    //DataSource的事务对象
    DataSourceTransactionObject txObject = new DataSourceTransactionObject();
    //设置事务自动保存
    txObject.setSavepointAllowed(isNestedTransactionAllowed());
    //给事务对象设置ConnectionHolder
    ConnectionHolder conHolder = TransactionSynchronizationManager.getResource(obtainDataSource());
    txObject.setConnectionHolder(conHolder, false);
    return txObject;
}

На этом шаге основное внимание уделяется установке свойства ConnectionHolder для объекта транзакции, но в настоящее время оно все еще пусто.

2. Начать бизнес

Следующим шагом является запуск транзакции, здесь ресурс привязывается к текущему объекту транзакции через ThreadLocal, а затем устанавливаются некоторые состояния транзакции.

protected void doBegin(Object txObject, TransactionDefinition definition) {
    
    Connection con = null;
    //从数据源中获取一个连接
    Connection newCon = obtainDataSource().getConnection();
    //重新设置事务对象中的connectionHolder,此时已经引用了一个连接
    txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
    //将这个connectionHolder标记为与事务同步
    txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
    con = txObject.getConnectionHolder().getConnection();
    con.setAutoCommit(false);
    //激活事务活动状态
    txObject.getConnectionHolder().setTransactionActive(true);
    //将connection holder绑定到当前线程,通过threadlocal
    if (txObject.isNewConnectionHolder()) {
        TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
    }
    //事务管理器,激活事务同步状态
    TransactionSynchronizationManager.initSynchronization();
}

3. Запустите интерфейс Mapper

После запуска транзакции выполняется реальный метод целевого класса. Здесь он начнет вводить прокси-объект Mybatis. . Ха-ха, фреймворк — это просто множество агентов.

Мы знаем, что Mybatis должен получить объект SqlSession перед выполнением SQL.

public static SqlSession getSqlSession(SqlSessionFactory sessionFactory, ExecutorType executorType,
                PersistenceExceptionTranslator exceptionTranslator) {

    //从ThreadLocal中获取SqlSessionHolder,第一次获取不到为空
    SqlSessionHolder holder = TransactionSynchronizationManager.getResource(sessionFactory);
    
    //如果SqlSessionHolder为空,那也肯定获取不到SqlSession;
    //如果SqlSessionHolder不为空,直接通过它来拿到SqlSession
    SqlSession session = sessionHolder(executorType, holder);
    if (session != null) {
        return session;
    }
    //创建一个新的SqlSession
    session = sessionFactory.openSession(executorType);
    //如果当前线程的事务处于激活状态,就将SqlSessionHolder绑定到ThreadLocal
    registerSessionHolder(sessionFactory, executorType, exceptionTranslator, session);
    return session;
}

После получения SqlSession он начинает вызывать исполнителя Mybatis для подготовки к выполнению оператора SQL. Прежде чем выполнять SQL, конечно, вам нужно сначала получить соединение Connection.

public Connection getConnection() throws SQLException {
    //通过数据源获取连接
    //比如我们配置了多数据源,此时还会正常切换
    if (this.connection == null) {
        openConnection();
    }
    return this.connection;
}

Смотрим на метод openConnection, его роль заключается в получении подключения Connection от источника данных. Если мы настроим несколько источников данных, мы можем в это время нормально переключаться. Если транзакция добавлена, причина, по которой источник данных не переключается, заключается в том, что при выполнении второго вызоваthis.connection != null, возвращается последнее соединение.

Это связано с тем, что когда SqlSession получен во второй раз, текущий поток получается из ThreadLocal, поэтому соединение Connection не будет получено повторно.

До сих пор в случае нескольких источников данных, если добавляются транзакции Spring, мы все должны понимать причину, по которой источник данных не может быть динамически переключен.

Здесь автор вставляет вопрос интервью:

  • Как Spring гарантирует транзакции?

То есть поместить несколько бизнес-операций в одно и то же соединение с базой данных и отправить или откатить их вместе.

  • Как это сделать, все в одном подключении?

Вот использование различных ThreadlLocals, пытающихся связать ресурсы базы данных и текущие транзакции вместе.

3. Режим транзакций, как поддерживать переключение источников данных

Мы разобрались с причинами выше, а затем посмотрим, как поддержать его для динамического переключения источников данных.

При неизменности других конфигураций нам нужно создать два разных sqlSessionFactory.

@Bean(name = "sqlSessionFactory1")
public SqlSessionFactory sqlSessionFactory1(@Qualifier("ds1") DataSource dataSource){
    return createSqlSessionFactory(dataSource);
}

@Bean(name = "sqlSessionFactory2")
public SqlSessionFactory sqlSessionFactory2(@Qualifier("ds2") DataSource dataSource){
    return createSqlSessionFactory(dataSource);
}

Затем настройте CustomSqlSessionTemplate для замены исходного sqlSessionTemplate в Mybatis и внедрите два SqlSessionFactory, определенные выше.

@Bean(name = "sqlSessionTemplate")
public CustomSqlSessionTemplate sqlSessionTemplate(){
    Map<Object,SqlSessionFactory> sqlSessionFactoryMap = new HashMap<>();
    sqlSessionFactoryMap.put("ds1",factory1);
    sqlSessionFactoryMap.put("ds2",factory2);
    CustomSqlSessionTemplate customSqlSessionTemplate = new CustomSqlSessionTemplate(factory1);
    customSqlSessionTemplate.setTargetSqlSessionFactorys(sqlSessionFactoryMap);
    customSqlSessionTemplate.setDefaultTargetSqlSessionFactory(factory1);
    return customSqlSessionTemplate;
}

В определяемом CustomSqlSessionTemplate все остальное тоже самое, в основном зависит от способа получения SqlSessionFactory.

public class CustomSqlSessionTemplate extends SqlSessionTemplate {
    @Override
    public SqlSessionFactory getSqlSessionFactory() {
        //当前数据源的名称
        String currentDsName = DataSourceType.getDataBaseType().name();
        SqlSessionFactory targetSqlSessionFactory = targetSqlSessionFactorys.get(currentDsName);
        if (targetSqlSessionFactory != null) {
            return targetSqlSessionFactory;
        } else if (defaultTargetSqlSessionFactory != null) {
            return defaultTargetSqlSessionFactory;
        }
        return this.sqlSessionFactory;
    }
}

Здесь дело в том, что мы можем получить разные SqlSessionFactory в соответствии с разными источниками данных. Если SqlSessionFactory отличается, то при получении SqlSession он не будет получен в ThreadLocal, поэтому каждый раз это новый объект SqlSession.

Поскольку SqlSession не один и тот же, при получении подключения Connection он каждый раз будет обращаться к источнику динамических данных, чтобы получить его.

Принцип такой принцип, прогуляемся.

После изменения конфигурации мы добавляем аннотацию транзакции в метод службы, и в это время данные могут обновляться в обычном режиме.

@Transactional
@Override
public void createOrder(Order order) {
    storageMapper.decreaseStorage(order);
    orderMapper.createOrder(order);
}

Возможность переключения источников данных — это только первый шаг, нам нужны гарантии того, что транзакционные операции могут быть гарантированы. Если в приведенном выше коде вычет инвентаря завершен, но создание заказа не удалось, инвентарь не будет откатываться. Поскольку они относятся к разным источникам данных, это совсем не одно и то же соединение.

4. Распределенные транзакции протокола XA

Для решения вышеуказанной проблемы мы можем рассмотреть только протокол XA.

Что касается протокола XA, то автор не будет его слишком подробно описывать. Нам просто нужно знать, что механизм хранения MySQL InnoDB поддерживает транзакции XA.

Тогда реализация протокола XA называется Java Transaction Manager на Java, или сокращенно JTA.

Как внедрить JTA? Мы используем фреймворк Atomikos, чтобы сначала представить его зависимости.

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-jta-atomikos</artifactId>
    <version>2.2.7.RELEASE</version>
</dependency>

Затем просто измените объект DataSource на AtomikosDataSourceBean.

public DataSource getDataSource(Environment env, String prefix, String dataSourceName){
    Properties prop = build(env,prefix);
    AtomikosDataSourceBean ds = new AtomikosDataSourceBean();
    ds.setXaDataSourceClassName(MysqlXADataSource.class.getName());
    ds.setUniqueResourceName(dataSourceName);
    ds.setXaProperties(prop);
    return ds;
}

После этой конфигурации, когда соединение Connection получено, объект MysqlXAConnection фактически получен. При отправке или откате используется протокол XA MySQL.

public void commit(Xid xid, boolean onePhase) throws XAException {
    //封装 XA COMMIT 请求
    StringBuilder commandBuf = new StringBuilder(300);
    commandBuf.append("XA COMMIT ");
    appendXid(commandBuf, xid);
    try {
        //交给MySQL执行XA事务操作
        dispatchCommand(commandBuf.toString());
    } finally {
        this.underlyingConnection.setInGlobalTx(false);
    }
}

Внедрив Atomikos и изменив DataSource, в случае нескольких источников данных несколько баз данных можно будет нормально откатить, даже если во время бизнес-операций произойдет ошибка.

Еще вопрос, стоит ли использовать протокол XA?

Протокол XA выглядит относительно просто, но у него есть и недостатки. Например:

  • Проблемы с производительностью, все участники находятся в состоянии синхронной блокировки на этапе фиксации транзакции, занимая системные ресурсы, легко вызывая узкие места в производительности и неспособные удовлетворить сценарии с высокой степенью параллелизма;
  • Если координатор имеет единую точку отказа, в случае отказа координатора участники всегда будут заблокированы;
  • Репликация master-slave может привести к несогласованному состоянию транзакции.

Некоторые ограничения протокола XA также перечислены в официальной документации MySQL:

https://dev.mysql.com/doc/refman/8.0/en/xa-restrictions.html

Кроме того, автор никогда не использовал его в реальных проектах.Этот метод используется для решения проблемы распределенных транзакций.В этом примере обсуждается только технико-экономическое обоснование плана.

Суммировать

В этой статье анализируются следующие проблемы, представляя сценарий SpringBoot + Mybatis с несколькими источниками данных:

  • Настройка и реализация нескольких источников данных;
  • Spring режим транзакций, причины и решения неэффективности нескольких источников данных;
  • Несколько источников данных, реализация распределенных транзакций на основе протокола XA.

Из-за нехватки места примеры в этой статье содержат не весь код. При необходимости перейдите на GitHub, чтобы забрать его.

https://github.com/taoxun/multipledb2.git

Нелегко быть оригинальным, это понравится приглашенным официальным лицам перед отъездом.Это будет движущей силой для автора продолжать писать~