Инвентаризация Sharding-JDBC: разделение чтения и записи

Java
Инвентаризация Sharding-JDBC: разделение чтения и записи

Общая документация:Каталог статей
Github : github.com/black-ant

Введение

Я не собирался смотреть на код, который разделяет чтение и запись., но... хе-хе-хе опять проблема.После попытки настроить разделение чтения-записи по официальной документации ожидаемого результата не появилось.

Но к счастью, может запуститься, велика вероятность, что логика разделения чтения-записи не сработает

2. Анализ исходного кода

Самая большая трудность с предыдущей статьей в том, что я не знаю, с чего начать...

В это время выходит роль официального документа, он мне говорит, что есть интерфейсShardingSphereAlgorithm, поэтому ищите его класс реализации и находите что-то

Sharding-jdbc-balance.png

можно увидеть,Разделение чтения-записи также играет роль в балансировке нагрузки в Sharding, и соответствующий интерфейс — ReplicaLoadBalanceAlgorithm.

2.1 Поиск записей

Сначала посмотрим, что нужно реализовать в интерфейсе

// 接口中提供了一个待实现的方法
public interface ReplicaLoadBalanceAlgorithm extends ShardingSphereAlgorithm {

    // name : 查询逻辑数据源名称
    // primaryDataSourceName : 主要数据源的名称
    // replicaDataSourceNames : 副本数据源的名称
    String getDataSource(String name, String primaryDataSourceName, List<String> replicaDataSourceNames);
}

Мы случайным образом выбираем алгоритм ShardingSphereAlgorithm, чтобы обратить процесс вспять, и, наконец, находим класс

  • ReplicaQueryDataSourceRouter # route
  • ReplicaQuerySQLRouter # createRouteContext
  • PartialSQLRouteExecutor # route

Давайте обратим этот процесс:

Шаг Завершение: Окончательная обработка правила ReplicaQueryDataSourceRouter.

C55- ReplicaQueryDataSourceRouter
    M55_01- route(final SQLStatement sqlStatement)
    	?- route 规则类 , 干了如下2件事
    	- 是否为 isPrimaryRoute , 如果是 , 直接获取 getPrimaryDataSourceName
    	- 如果不是 , 通过规则获取 rule.getLoadBalancer()
        
// PS : 打个断点发现和预料的一样 , 并没有执行相关的逻辑 , 继续往上推导

C101- KernelProcessor ,这是 route 的核心处理类 , 再看一下重要的流程
	M101_01- generateExecutionContext
    

Попутно я нашел основной класс обработкиKernelProcessor, когда попадешь сюда, то в принципе можешь быть уверен, иначе пройдетShardingSpherePreparedStatementиметь дело с

2.2 Сортировка прямого процесса

Зная основной процесс, мы можем вывести прямой процесс:

  1. C74- ShardingSpherePreparedStatement
  2. C101- KernelProcessor
    • M101_01- generateExecutionContext
  3. C102- SQLRouteEngine
    • route
  4. C103- PartialSQLRouteExecutor
    • route
  5. C60- ReplicaQuerySQLRouter
    • createRouteContext -- наконец добрался до сути

Шаг 1: ShardingSpherePreparedStatement создает ExecutionContext

Перейдите непосредственно к главному входу для отладки: ShardingSpherePreparedStatement, есть 2 основных метода executeQuery() / executeUpdate()

В прошлый раз я сказал executeUpdate, на этот раз я в основном использую executeQuery

C74- ShardingSpherePreparedStatement
    M74_10- executeQuery
    	1- 创建 createExecutionContext()  -> M101_01
    	2- 通过 getInputGroups() 获取 InputGroup 集合
    	3- reparedStatementExecutor.executeQuery(inputGroups) 执行 Query
    	4- 合并结果
    	5- 构建 ShardingSphereResultSet 返回
	M74_11- createExecutionContext
    	1- createLogicSQL() 构建逻辑 SQL , 即需要查询的 SQL  -> PS : 
		2- 通过 KernelProcessor 构建一个 ExecutionContext
            ?- 这里就和上文关联起来了 -> 
            
// PS:M74_11 createLogicSQL 创建的 SQL 
select blogentity0_.id as id1_0_, blogentity0_.author as author2_0_, blogentity0_.column_id as column_i3_0_, blogentity0_.date as date4_0_, blogentity0_.title as title5_0_, blogentity0_.title_id as title_id6_0_ from t_blog_0 blogentity0_   


// M74_10 源代码
public ResultSet executeQuery() throws SQLException {
    ResultSet result;
    try {
        clearPrevious();
        // 核心处理 , 此处已经把需要处理的 SQL 放入容器中
        executionContext = createExecutionContext();
        List<QueryResult> queryResults;
        if (ExecutorConstant.MANAGED_RESOURCE) {
                Collection<InputGroup<StatementExecuteUnit>> inputGroups = getInputGroups();
                cacheStatements(inputGroups);
                reply();
                queryResults = preparedStatementExecutor.executeQuery(inputGroups);
        } else {
                queryResults = rawExecutor.executeQuery(getRawInputGroups(), new RawSQLExecutorCallback());
        }
        MergedResult mergedResult = mergeQuery(queryResults);
        result = new ShardingSphereResultSet(statements.stream().map(this::getResultSet).collect(Collectors.toList()), mergedResult, this, executionContext);
    } finally {
        clearBatch();
    }
    currentResultSet = result;
    return result;
}

Это основная якорная точка. Как только эта точка будет найдена, ее будет легко проанализировать позже.

PS: Последующий общий процесс такой же, как и раньше, в основном следующий процесс

Шаг 2: KernelProcessor строит основную логику процесса ExecutionContext.


C101- KernelProcessor ,这是 route 的核心处理类 , 再看一下重要的流程
	M101_01- generateExecutionContext
     	1- 获取 rules 集合
            ?- 此处解决了第一个配置问题 , 详见-> PS:M101_01_01
            ?- 配置改好后 , 可以看到 rule 已经正常了 -> PS:M101_01_02
            
       
public ExecutionContext generateExecutionContext(final LogicSQL logicSQL, final ShardingSphereSchema schema, final ConfigurationProperties props) {
        Collection<ShardingSphereRule> rules = schema.getRules();
        // RouteEngine 生成 , 其中包好了 rules 对象其相关属性
        SQLRouteEngine sqlRouteEngine = new SQLRouteEngine(rules, props);
        // PS:M101_01_10
        SQLStatementContext<?> sqlStatementContext = logicSQL.getSqlStatementContext();
    
    	// 核心流程
        RouteContext routeContext = sqlRouteEngine.route(logicSQL, schema);
        SQLRewriteEntry rewriteEntry = new SQLRewriteEntry(schema.getMetaData().getSchemaMetaData().getConfiguredSchemaMetaData(), props, rules);
        SQLRewriteResult rewriteResult = rewriteEntry.rewrite(logicSQL.getSql(), logicSQL.getParameters(), sqlStatementContext, routeContext);
        Collection<ExecutionUnit> executionUnits = ExecutionContextBuilder.build(schema.getMetaData(), rewriteResult, sqlStatementContext);
        return new ExecutionContext(sqlStatementContext, executionUnits, routeContext);
}

PS: M101_01_10 Список связанных свойств image.png

Шаг 3: PartialSQLRouteExecutor генерирует результат с помощью правил

Обратите внимание, что создание или добавление RouteUnits будет выбрано здесь в зависимости от того, были ли созданы RouteUnits.

Поток обработки, после решения следующих двух проблем, конфигурация нормальная, и вы можете видеть, что правило существует в это время.

// 我们根据上面2个流程着重分析其中2步
C103- PartialSQLRouteExecutor
    M103_01- route
    	FOR- 遍历所有的 rule , 根据是否生成了 RouteUnits 分别调用
            - entry.getValue().createRouteContext(logicSQL, schema, entry.getKey(), props) -> M104_01 
            - entry.getValue().decorateRouteContext(result, logicSQL, schema, entry.getKey(), props -> M104_02

PS:M101_01_02 список правил

Sharding-jdbc-rules-list.jpg

Шаг 4: ReplicaQuerySQLRouter/правила обрабатывают основную логику

В этой логике есть 2 важных метода

  • createRouteContext : создать RouteContext
  • декорироватьRouteContext : дополнить RouteContext
C60- ReplicaQuerySQLRouter 
    M60_01- createRouteContext   
		- 获取首个 rule , 构建 ReplicaQueryDataSourceRouter             
		- 调用 route 获取 需要执行的 datasourceName
			- 最终会调用对应的 ReplicaLoadBalanceAlgorithm
			-> PS:M60_01_01               	
    M60_02- decorateRouteContext
                                                    

// 以轮询为例                                            
C61- RoundRobinReplicaLoadBalanceAlgorithm
    F61_01- ConcurrentHashMap<String, AtomicInteger> COUNTS                                         
	M61_01- getDataSource
		?- 轮询主要通过属性 F61_01 完成
                ?- 简单点说就是每次 For 中 , 这个COUNTS 都会 + 1 , 到了和总数一样的时候 , 执行 CAS 设置为 0 
                ?- 算法部分先不深入太多 , 后续专门整理
                    
          

// M104_01 createRouteContext 源代码
public RouteContext createRouteContext(final LogicSQL logicSQL, final ShardingSphereSchema schema, final ReplicaQueryRule rule, final ConfigurationProperties props) {
	RouteContext result = new RouteContext();
	String dataSourceName = new ReplicaQueryDataSourceRouter(rule.getSingleDataSourceRule()).route(logicSQL.getSqlStatementContext().getSqlStatement());
	result.getRouteUnits().add(new RouteUnit(new RouteMapper(DefaultSchema.LOGIC_NAME, dataSourceName), Collections.emptyList()));
	return result;
}                                                
                                                    
// M104_02 decorateRouteContext 源代码                                               
public void decorateRouteContext(final RouteContext routeContext,
                                     final LogicSQL logicSQL, final ShardingSphereSchema schema, final ReplicaQueryRule rule, final ConfigurationProperties props) {
	Collection<RouteUnit> toBeRemoved = new LinkedList<>();
	Collection<RouteUnit> toBeAdded = new LinkedList<>();
	for (RouteUnit each : routeContext.getRouteUnits()) {
		String dataSourceName = each.getDataSourceMapper().getLogicName();
		Optional<ReplicaQueryDataSourceRule> dataSourceRule = rule.findDataSourceRule(dataSourceName);
		if (dataSourceRule.isPresent() && dataSourceRule.get().getName().equalsIgnoreCase(each.getDataSourceMapper().getActualName())) {
			toBeRemoved.add(each);
			String actualDataSourceName = new ReplicaQueryDataSourceRouter(dataSourceRule.get()).route(logicSQL.getSqlStatementContext().getSqlStatement());
			toBeAdded.add(new RouteUnit(new RouteMapper(each.getDataSourceMapper().getLogicName(), actualDataSourceName), each.getTableMappers()));
		}
	}
	routeContext.getRouteUnits().removeAll(toBeRemoved);
	routeContext.getRouteUnits().addAll(toBeAdded);
}     
                                                    
// PS:M60_01_01  获取负载均衡的 datasource
rule.getLoadBalancer().getDataSource(rule.getName(), rule.getPrimaryDataSourceName(), rule.getReplicaDataSourceNames());                                        
                                                    
         
// M61_01 源代码     
public String getDataSource(final String name, final String primaryDataSourceName, final List<String> replicaDataSourceNames) {
        AtomicInteger count = COUNTS.containsKey(name) ? COUNTS.get(name) : new AtomicInteger(0);
        COUNTS.putIfAbsent(name, count);
        count.compareAndSet(replicaDataSourceNames.size(), 0);
        return replicaDataSourceNames.get(Math.abs(count.getAndIncrement()) % replicaDataSourceNames.size());
}
    

Шаг Дополнительная ссылка: Процесс обработки правил

Поток обработки правил в основном состоит из следующих шагов:

  • SQLRouteEngine: механизм обработки маршрута, запуск маршрута обработки
  • PartialSQLRouteExecutor: какой основной метод выбрать
  • ReplicaQuerySQLRouter: основной метод обработки
  • ReplicaQueryDataSourceRouter: вызовите алгоритм, чтобы получить имя источника.
  • RoundRobinReplicaLoadBalanceAlgorithm: ядро ​​алгоритма

Процесс построения правил в основном состоит из следующих этапов:

  • SchemaContextsBuilder: сборка ContextsBuilder
  • ShardingSphereRulesBuilder: выполнение основной логики сборки через настройку правил.
  • ReplicaQueryRuleBuilder: получите соответствующую конфигурацию и создайте основное правило.
  • ReplicaQueryRule: создаваемый объект

PS: Сборник правил M101_01_01

Когда я запускаю его в первый раз, я обнаруживаю, что правило пустое, и я понимаю, что найдена первая проблема. Конфигурация фактически отсутствует. Давайте посмотрим на систему наследования правила.

ShardingSphereRule.png

// 很明显 , 看到了一个 ReplicaQueryRule , 进去看看

C56- ReplicaQueryRule
    ?- 其中有2个主要的构造方法 , 通过 Configuration 进行的配置
    MC56_01- ReplicaQueryRule(final ReplicaQueryRuleConfiguration config)
    	?- 调用 ReplicaQueryRuleBuilder 进行构建 -> M57_01
    MC56_02- ReplicaQueryRule(final AlgorithmProvidedReplicaQueryRuleConfiguration config)
    M01- getDataSourceMapper()

    
// 前置条件 , 配置 Rule
public final class ReplicaQueryRule implements DataSourceRoutedRule, StatusContainedRule {
    
    static {
        ShardingSphereServiceLoader.register(ReplicaLoadBalanceAlgorithm.class);
    }
    
    private final Map<String, ReplicaLoadBalanceAlgorithm> loadBalancers = new LinkedHashMap<>();
    
    private final Map<String, ReplicaQueryDataSourceRule> dataSourceRules;
    
	public ReplicaQueryRule(final ReplicaQueryRuleConfiguration config) {
        Preconditions.checkArgument(!config.getDataSources().isEmpty(), "Replica query data source rules can not be empty.");
        config.getLoadBalancers().forEach((key, value) -> loadBalancers.put(key, ShardingSphereAlgorithmFactory.createAlgorithm(value, ReplicaLoadBalanceAlgorithm.class)));
        dataSourceRules = new HashMap<>(config.getDataSources().size(), 1);
        for (ReplicaQueryDataSourceRuleConfiguration each : config.getDataSources()) {
            // TODO check if can not find load balancer should throw exception.
            ReplicaLoadBalanceAlgorithm loadBalanceAlgorithm = Strings.isNullOrEmpty(each.getLoadBalancerName()) || !loadBalancers.containsKey(each.getLoadBalancerName())
                    ? TypedSPIRegistry.getRegisteredService(ReplicaLoadBalanceAlgorithm.class) : loadBalancers.get(each.getLoadBalancerName());
            dataSourceRules.put(each.getName(), new ReplicaQueryDataSourceRule(each, loadBalanceAlgorithm));
        }
    }
    
    public ReplicaQueryRule(final AlgorithmProvidedReplicaQueryRuleConfiguration config) {
        Preconditions.checkArgument(!config.getDataSources().isEmpty(), "Replica query data source rules can not be empty.");
        loadBalancers.putAll(config.getLoadBalanceAlgorithms());
        dataSourceRules = new HashMap<>(config.getDataSources().size(), 1);
        for (ReplicaQueryDataSourceRuleConfiguration each : config.getDataSources()) {
            // TODO check if can not find load balancer should throw exception.
            ReplicaLoadBalanceAlgorithm loadBalanceAlgorithm = Strings.isNullOrEmpty(each.getLoadBalancerName()) || !loadBalancers.containsKey(each.getLoadBalancerName())
                    ? TypedSPIRegistry.getRegisteredService(ReplicaLoadBalanceAlgorithm.class) : loadBalancers.get(each.getLoadBalancerName());
            dataSourceRules.put(each.getName(), new ReplicaQueryDataSourceRule(each, loadBalanceAlgorithm));
        }
    }
    
    //...........
    
}    


C57- ReplicaQueryRuleBuilder
	M57_01- build(final ReplicaQueryRuleConfiguration ruleConfig, final Collection<String> dataSourceNames)
		- new ReplicaQueryRule(ruleConfig) 构建 -> MC56_01
    
// 看到这里大概知道构建的位置了 , 上文看到过的类 C51- ShardingSphereRulesBuilder
C51- ShardingSphereRulesBuilder
    M51_01- build(final Collection<RuleConfiguration> ruleConfigurations, final Collection<String> dataSourceNames)
    	?- 该方法会通过 RuleConfiguration 集合构建 Rule 对象
    	?- 再往上就不需要看了 , 最终还是会回到构建 ShardingSphereDataSource , 只需要根据配置类猜测配置方式即可 -> PS:M51_01_01
    
    

Шаг Приложение: Методы анализа связанных конфигураций для разделения чтения и записи

Класс конфигурации по-прежнему несколько предыдущих

  • SpringBootConfiguration
  • YamlReplicaQueryRuleSpringBootConfiguration : spring.shardingsphere.rules
    • YamlReplicaQueryRuleConfiguration : replicaQuery
      • Map<String, YamlReplicaQueryDataSourceRuleConfiguration> : dataSources
      • Map<String, YamlShardingSphereAlgorithmConfiguration> : loadBalancers

Я понимаю это, но я не знаю, является ли документ, который я ищу, неправильным.Недостаточно полагаться на официальный документ.В конце концов, я должен полагаться на себя.

Этот раздел, если вы передаете Bean класса конфигурации Sharding,Конфигурация, связанная с защитой от развертывания.

PS: предположение о классе конфигурации M51_01_01

// 点开可以看到 , 其中只有2个属性
public final class ReplicaQueryRuleConfiguration implements RuleConfiguration {
    private final Collection<ReplicaQueryDataSourceRuleConfiguration> dataSources;
    private final Map<String, ShardingSphereAlgorithmConfiguration> loadBalancers;
}


// 这里再来说明一下 , 如何逆推配置 , Sharding 的配置解析方式是很常见的方式 , 按照类型和方法名去反写一个就行了 , 比如这个配置
spring.shardingsphere.rules.sharding.tables.t_blog.key-generate-strategy.column=id

- sharding 来源 : YamlShardingRuleSpringBootConfiguration

sharding-config.png

我们按照这个方法 , 结合官方文档 , 逆推一下 , 可以找到如下几个类 : 

- YamlReplicaQueryRuleSpringBootConfiguration : spring.shardingsphere.rules
	- YamlReplicaQueryRuleConfiguration : replicaQuery
		- Map<String, YamlReplicaQueryDataSourceRuleConfiguration> : dataSources
		- Map<String, YamlShardingSphereAlgorithmConfiguration> : loadBalancers
            
public final class YamlReplicaQueryDataSourceRuleConfiguration implements YamlConfiguration {
    private String name;
    private String primaryDataSourceName;
    private List<String> replicaDataSourceNames = new ArrayList<>();
    private String loadBalancerName;
    private Properties props = new Properties();
}  

public final class YamlShardingSphereAlgorithmConfiguration implements YamlConfiguration {
    private String type;
    private Properties props = new Properties();
}      
            
// 逆推得到读写分离的配置信息 : 
spring.shardingsphere.rules.replica-query.data-sources.ds_0.primary-data-source-name=primary_ds_0
spring.shardingsphere.rules.replica-query.data-sources.ds_0.replica-data-source-names=primary_ds_0_replica_0, primary_ds_0_replica_1
spring.shardingsphere.rules.replica-query.data-sources.ds_1.primary-data-source-name=primary_ds_1
spring.shardingsphere.rules.replica-query.data-sources.ds_1.replica-data-source-names=primary_ds_1_replica_0, primary_ds_1_replica_1

// 小技巧 : 拿着 spring.shardingsphere.rules.replica-query 去网上搜 , 可能会有意想不到的惊喜  
    
最终配置结果 : 
spring.shardingsphere.rules.replica-query.data-sources.ds_0.name=rq-ds0
spring.shardingsphere.rules.replica-query.data-sources.ds_0.primary-data-source-name=ds0
spring.shardingsphere.rules.replica-query.data-sources.ds_0.replica-data-source-names=ds0,ds1,ds2
# Load balance 算法
spring.shardingsphere.rules.replica-query.load-balancers.rq-ds0.type=round_robin
spring.shardingsphere.rules.replica-query.load-balancers.rq-ds0.props.null=
    
// 请求地址 , 可以看到分别从 ds0,ds1,ds2 获取信息 , 插入数据只会插入 ds0
// PS : 读写分离一般配合数据库主从同步做的    

Как вы можете сказать мне угадать это, не глядя на исходный код??????????????

image-20210508162525556.png

3. Конфигурация

Выложите полную конфигурацию сюда

server.port=8085
##Jpa配置
spring.jpa.database=mysql
spring.jpa.show-sql=true
spring.jpa.hibernate.ddl-auto=none
# 配置真实数据源 ds0,ds1,ds2
spring.shardingsphere.datasource.names=ds0,ds1,ds2
spring.shardingsphere.datasource.common.type=com.zaxxer.hikari.HikariDataSource
spring.shardingsphere.datasource.common.driver-class-name=com.mysql.jdbc.Driver
# 配置第 1 个数据源
spring.shardingsphere.datasource.ds0.type=com.zaxxer.hikari.HikariDataSource
spring.shardingsphere.datasource.ds0.driver-class-name=com.mysql.jdbc.Driver
spring.shardingsphere.datasource.ds0.jdbc-url=jdbc:mysql://127.0.0.1:3306/database0?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&allowMultiQueries=true&serverTimezone=UTC
spring.shardingsphere.datasource.ds0.username=root
spring.shardingsphere.datasource.ds0.password=123456
# 配置第 2 个数据源
spring.shardingsphere.datasource.ds1.type=com.zaxxer.hikari.HikariDataSource
spring.shardingsphere.datasource.ds1.driver-class-name=com.mysql.jdbc.Driver
spring.shardingsphere.datasource.ds1.jdbc-url=jdbc:mysql://127.0.0.1:3306/database1?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&allowMultiQueries=true&serverTimezone=UTC
spring.shardingsphere.datasource.ds1.username=root
spring.shardingsphere.datasource.ds1.password=123456
# 配置从库 3
spring.shardingsphere.datasource.ds2.type=com.zaxxer.hikari.HikariDataSource
spring.shardingsphere.datasource.ds2.driver-class-name=com.mysql.jdbc.Driver
spring.shardingsphere.datasource.ds2.jdbc-url=jdbc:mysql://127.0.0.1:3306/database2?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&allowMultiQueries=true&serverTimezone=UTC
spring.shardingsphere.datasource.ds2.username=root
spring.shardingsphere.datasource.ds2.password=123456
# Sharding 读写分离配置
# 是否显示 SQL
spring.shardingsphere.props.sql.show=true
# 是否 Bean 覆盖
spring.main.allowBeanDefinitionOverriding=true
# 官网提供的配置 , 这是哪里的?????????????????????
#spring.shardingsphere.rules.readwrite-splitting.data-sources.dataSource.primary-data-source-name= ds0
#spring.shardingsphere.rules.readwrite-splitting.data-sources.dataSource.replica-data-source-names= ds1,ds2
#spring.shardingsphere.rules.readwrite-splitting.data-sources.dataSource.load-balancer-name= round_robin_type
# 实际配置信息
spring.shardingsphere.rules.replica-query.data-sources.ds_0.name=rq-ds0
spring.shardingsphere.rules.replica-query.data-sources.ds_0.primary-data-source-name=ds0
spring.shardingsphere.rules.replica-query.data-sources.ds_0.replica-data-source-names=ds0,ds1,ds2
# Load balance 算法
spring.shardingsphere.rules.replica-query.load-balancers.rq-ds0.type=round_robin
spring.shardingsphere.rules.replica-query.load-balancers.rq-ds0.props.null=


Суммировать

Разделение чтения и записи на самом деле очень простое, в основном потому, что я не знаю метод настройки новой версии, и надеюсь, что этот документ может быть полезен для устранения неполадок в будущем.