Общая документация:Каталог статей
Github : github.com/black-ant
Введение
Я не собирался смотреть на код, который разделяет чтение и запись., но... хе-хе-хе опять проблема.После попытки настроить разделение чтения-записи по официальной документации ожидаемого результата не появилось.
Но к счастью, может запуститься, велика вероятность, что логика разделения чтения-записи не сработает
2. Анализ исходного кода
Самая большая трудность с предыдущей статьей в том, что я не знаю, с чего начать...
В это время выходит роль официального документа, он мне говорит, что есть интерфейсShardingSphereAlgorithm, поэтому ищите его класс реализации и находите что-то
можно увидеть,Разделение чтения-записи также играет роль в балансировке нагрузки в Sharding, и соответствующий интерфейс — ReplicaLoadBalanceAlgorithm.
2.1 Поиск записей
Сначала посмотрим, что нужно реализовать в интерфейсе
// 接口中提供了一个待实现的方法
public interface ReplicaLoadBalanceAlgorithm extends ShardingSphereAlgorithm {
// name : 查询逻辑数据源名称
// primaryDataSourceName : 主要数据源的名称
// replicaDataSourceNames : 副本数据源的名称
String getDataSource(String name, String primaryDataSourceName, List<String> replicaDataSourceNames);
}
Мы случайным образом выбираем алгоритм ShardingSphereAlgorithm, чтобы обратить процесс вспять, и, наконец, находим класс
- ReplicaQueryDataSourceRouter # route
- ReplicaQuerySQLRouter # createRouteContext
- PartialSQLRouteExecutor # route
Давайте обратим этот процесс:
Шаг Завершение: Окончательная обработка правила ReplicaQueryDataSourceRouter.
C55- ReplicaQueryDataSourceRouter
M55_01- route(final SQLStatement sqlStatement)
?- route 规则类 , 干了如下2件事
- 是否为 isPrimaryRoute , 如果是 , 直接获取 getPrimaryDataSourceName
- 如果不是 , 通过规则获取 rule.getLoadBalancer()
// PS : 打个断点发现和预料的一样 , 并没有执行相关的逻辑 , 继续往上推导
C101- KernelProcessor ,这是 route 的核心处理类 , 再看一下重要的流程
M101_01- generateExecutionContext
Попутно я нашел основной класс обработкиKernelProcessor, когда попадешь сюда, то в принципе можешь быть уверен, иначе пройдетShardingSpherePreparedStatementиметь дело с
2.2 Сортировка прямого процесса
Зная основной процесс, мы можем вывести прямой процесс:
- C74- ShardingSpherePreparedStatement
-
C101- KernelProcessor
- M101_01- generateExecutionContext
-
C102- SQLRouteEngine
- route
-
C103- PartialSQLRouteExecutor
- route
-
C60- ReplicaQuerySQLRouter
- createRouteContext -- наконец добрался до сути
Шаг 1: ShardingSpherePreparedStatement создает ExecutionContext
Перейдите непосредственно к главному входу для отладки: ShardingSpherePreparedStatement, есть 2 основных метода executeQuery() / executeUpdate()
В прошлый раз я сказал executeUpdate, на этот раз я в основном использую executeQuery
C74- ShardingSpherePreparedStatement
M74_10- executeQuery
1- 创建 createExecutionContext() -> M101_01
2- 通过 getInputGroups() 获取 InputGroup 集合
3- reparedStatementExecutor.executeQuery(inputGroups) 执行 Query
4- 合并结果
5- 构建 ShardingSphereResultSet 返回
M74_11- createExecutionContext
1- createLogicSQL() 构建逻辑 SQL , 即需要查询的 SQL -> PS :
2- 通过 KernelProcessor 构建一个 ExecutionContext
?- 这里就和上文关联起来了 ->
// PS:M74_11 createLogicSQL 创建的 SQL
select blogentity0_.id as id1_0_, blogentity0_.author as author2_0_, blogentity0_.column_id as column_i3_0_, blogentity0_.date as date4_0_, blogentity0_.title as title5_0_, blogentity0_.title_id as title_id6_0_ from t_blog_0 blogentity0_
// M74_10 源代码
public ResultSet executeQuery() throws SQLException {
ResultSet result;
try {
clearPrevious();
// 核心处理 , 此处已经把需要处理的 SQL 放入容器中
executionContext = createExecutionContext();
List<QueryResult> queryResults;
if (ExecutorConstant.MANAGED_RESOURCE) {
Collection<InputGroup<StatementExecuteUnit>> inputGroups = getInputGroups();
cacheStatements(inputGroups);
reply();
queryResults = preparedStatementExecutor.executeQuery(inputGroups);
} else {
queryResults = rawExecutor.executeQuery(getRawInputGroups(), new RawSQLExecutorCallback());
}
MergedResult mergedResult = mergeQuery(queryResults);
result = new ShardingSphereResultSet(statements.stream().map(this::getResultSet).collect(Collectors.toList()), mergedResult, this, executionContext);
} finally {
clearBatch();
}
currentResultSet = result;
return result;
}
Это основная якорная точка. Как только эта точка будет найдена, ее будет легко проанализировать позже.
PS: Последующий общий процесс такой же, как и раньше, в основном следующий процесс
Шаг 2: KernelProcessor строит основную логику процесса ExecutionContext.
C101- KernelProcessor ,这是 route 的核心处理类 , 再看一下重要的流程
M101_01- generateExecutionContext
1- 获取 rules 集合
?- 此处解决了第一个配置问题 , 详见-> PS:M101_01_01
?- 配置改好后 , 可以看到 rule 已经正常了 -> PS:M101_01_02
public ExecutionContext generateExecutionContext(final LogicSQL logicSQL, final ShardingSphereSchema schema, final ConfigurationProperties props) {
Collection<ShardingSphereRule> rules = schema.getRules();
// RouteEngine 生成 , 其中包好了 rules 对象其相关属性
SQLRouteEngine sqlRouteEngine = new SQLRouteEngine(rules, props);
// PS:M101_01_10
SQLStatementContext<?> sqlStatementContext = logicSQL.getSqlStatementContext();
// 核心流程
RouteContext routeContext = sqlRouteEngine.route(logicSQL, schema);
SQLRewriteEntry rewriteEntry = new SQLRewriteEntry(schema.getMetaData().getSchemaMetaData().getConfiguredSchemaMetaData(), props, rules);
SQLRewriteResult rewriteResult = rewriteEntry.rewrite(logicSQL.getSql(), logicSQL.getParameters(), sqlStatementContext, routeContext);
Collection<ExecutionUnit> executionUnits = ExecutionContextBuilder.build(schema.getMetaData(), rewriteResult, sqlStatementContext);
return new ExecutionContext(sqlStatementContext, executionUnits, routeContext);
}
PS: M101_01_10 Список связанных свойств
Шаг 3: PartialSQLRouteExecutor генерирует результат с помощью правил
Обратите внимание, что создание или добавление RouteUnits будет выбрано здесь в зависимости от того, были ли созданы RouteUnits.
Поток обработки, после решения следующих двух проблем, конфигурация нормальная, и вы можете видеть, что правило существует в это время.
// 我们根据上面2个流程着重分析其中2步
C103- PartialSQLRouteExecutor
M103_01- route
FOR- 遍历所有的 rule , 根据是否生成了 RouteUnits 分别调用
- entry.getValue().createRouteContext(logicSQL, schema, entry.getKey(), props) -> M104_01
- entry.getValue().decorateRouteContext(result, logicSQL, schema, entry.getKey(), props -> M104_02
PS:M101_01_02 список правил
Шаг 4: ReplicaQuerySQLRouter/правила обрабатывают основную логику
В этой логике есть 2 важных метода
- createRouteContext : создать RouteContext
- декорироватьRouteContext : дополнить RouteContext
C60- ReplicaQuerySQLRouter
M60_01- createRouteContext
- 获取首个 rule , 构建 ReplicaQueryDataSourceRouter
- 调用 route 获取 需要执行的 datasourceName
- 最终会调用对应的 ReplicaLoadBalanceAlgorithm
-> PS:M60_01_01
M60_02- decorateRouteContext
// 以轮询为例
C61- RoundRobinReplicaLoadBalanceAlgorithm
F61_01- ConcurrentHashMap<String, AtomicInteger> COUNTS
M61_01- getDataSource
?- 轮询主要通过属性 F61_01 完成
?- 简单点说就是每次 For 中 , 这个COUNTS 都会 + 1 , 到了和总数一样的时候 , 执行 CAS 设置为 0
?- 算法部分先不深入太多 , 后续专门整理
// M104_01 createRouteContext 源代码
public RouteContext createRouteContext(final LogicSQL logicSQL, final ShardingSphereSchema schema, final ReplicaQueryRule rule, final ConfigurationProperties props) {
RouteContext result = new RouteContext();
String dataSourceName = new ReplicaQueryDataSourceRouter(rule.getSingleDataSourceRule()).route(logicSQL.getSqlStatementContext().getSqlStatement());
result.getRouteUnits().add(new RouteUnit(new RouteMapper(DefaultSchema.LOGIC_NAME, dataSourceName), Collections.emptyList()));
return result;
}
// M104_02 decorateRouteContext 源代码
public void decorateRouteContext(final RouteContext routeContext,
final LogicSQL logicSQL, final ShardingSphereSchema schema, final ReplicaQueryRule rule, final ConfigurationProperties props) {
Collection<RouteUnit> toBeRemoved = new LinkedList<>();
Collection<RouteUnit> toBeAdded = new LinkedList<>();
for (RouteUnit each : routeContext.getRouteUnits()) {
String dataSourceName = each.getDataSourceMapper().getLogicName();
Optional<ReplicaQueryDataSourceRule> dataSourceRule = rule.findDataSourceRule(dataSourceName);
if (dataSourceRule.isPresent() && dataSourceRule.get().getName().equalsIgnoreCase(each.getDataSourceMapper().getActualName())) {
toBeRemoved.add(each);
String actualDataSourceName = new ReplicaQueryDataSourceRouter(dataSourceRule.get()).route(logicSQL.getSqlStatementContext().getSqlStatement());
toBeAdded.add(new RouteUnit(new RouteMapper(each.getDataSourceMapper().getLogicName(), actualDataSourceName), each.getTableMappers()));
}
}
routeContext.getRouteUnits().removeAll(toBeRemoved);
routeContext.getRouteUnits().addAll(toBeAdded);
}
// PS:M60_01_01 获取负载均衡的 datasource
rule.getLoadBalancer().getDataSource(rule.getName(), rule.getPrimaryDataSourceName(), rule.getReplicaDataSourceNames());
// M61_01 源代码
public String getDataSource(final String name, final String primaryDataSourceName, final List<String> replicaDataSourceNames) {
AtomicInteger count = COUNTS.containsKey(name) ? COUNTS.get(name) : new AtomicInteger(0);
COUNTS.putIfAbsent(name, count);
count.compareAndSet(replicaDataSourceNames.size(), 0);
return replicaDataSourceNames.get(Math.abs(count.getAndIncrement()) % replicaDataSourceNames.size());
}
Шаг Дополнительная ссылка: Процесс обработки правил
Поток обработки правил в основном состоит из следующих шагов:
- SQLRouteEngine: механизм обработки маршрута, запуск маршрута обработки
- PartialSQLRouteExecutor: какой основной метод выбрать
- ReplicaQuerySQLRouter: основной метод обработки
- ReplicaQueryDataSourceRouter: вызовите алгоритм, чтобы получить имя источника.
- RoundRobinReplicaLoadBalanceAlgorithm: ядро алгоритма
Процесс построения правил в основном состоит из следующих этапов:
- SchemaContextsBuilder: сборка ContextsBuilder
- ShardingSphereRulesBuilder: выполнение основной логики сборки через настройку правил.
- ReplicaQueryRuleBuilder: получите соответствующую конфигурацию и создайте основное правило.
- ReplicaQueryRule: создаваемый объект
PS: Сборник правил M101_01_01
Когда я запускаю его в первый раз, я обнаруживаю, что правило пустое, и я понимаю, что найдена первая проблема. Конфигурация фактически отсутствует. Давайте посмотрим на систему наследования правила.
// 很明显 , 看到了一个 ReplicaQueryRule , 进去看看
C56- ReplicaQueryRule
?- 其中有2个主要的构造方法 , 通过 Configuration 进行的配置
MC56_01- ReplicaQueryRule(final ReplicaQueryRuleConfiguration config)
?- 调用 ReplicaQueryRuleBuilder 进行构建 -> M57_01
MC56_02- ReplicaQueryRule(final AlgorithmProvidedReplicaQueryRuleConfiguration config)
M01- getDataSourceMapper()
// 前置条件 , 配置 Rule
public final class ReplicaQueryRule implements DataSourceRoutedRule, StatusContainedRule {
static {
ShardingSphereServiceLoader.register(ReplicaLoadBalanceAlgorithm.class);
}
private final Map<String, ReplicaLoadBalanceAlgorithm> loadBalancers = new LinkedHashMap<>();
private final Map<String, ReplicaQueryDataSourceRule> dataSourceRules;
public ReplicaQueryRule(final ReplicaQueryRuleConfiguration config) {
Preconditions.checkArgument(!config.getDataSources().isEmpty(), "Replica query data source rules can not be empty.");
config.getLoadBalancers().forEach((key, value) -> loadBalancers.put(key, ShardingSphereAlgorithmFactory.createAlgorithm(value, ReplicaLoadBalanceAlgorithm.class)));
dataSourceRules = new HashMap<>(config.getDataSources().size(), 1);
for (ReplicaQueryDataSourceRuleConfiguration each : config.getDataSources()) {
// TODO check if can not find load balancer should throw exception.
ReplicaLoadBalanceAlgorithm loadBalanceAlgorithm = Strings.isNullOrEmpty(each.getLoadBalancerName()) || !loadBalancers.containsKey(each.getLoadBalancerName())
? TypedSPIRegistry.getRegisteredService(ReplicaLoadBalanceAlgorithm.class) : loadBalancers.get(each.getLoadBalancerName());
dataSourceRules.put(each.getName(), new ReplicaQueryDataSourceRule(each, loadBalanceAlgorithm));
}
}
public ReplicaQueryRule(final AlgorithmProvidedReplicaQueryRuleConfiguration config) {
Preconditions.checkArgument(!config.getDataSources().isEmpty(), "Replica query data source rules can not be empty.");
loadBalancers.putAll(config.getLoadBalanceAlgorithms());
dataSourceRules = new HashMap<>(config.getDataSources().size(), 1);
for (ReplicaQueryDataSourceRuleConfiguration each : config.getDataSources()) {
// TODO check if can not find load balancer should throw exception.
ReplicaLoadBalanceAlgorithm loadBalanceAlgorithm = Strings.isNullOrEmpty(each.getLoadBalancerName()) || !loadBalancers.containsKey(each.getLoadBalancerName())
? TypedSPIRegistry.getRegisteredService(ReplicaLoadBalanceAlgorithm.class) : loadBalancers.get(each.getLoadBalancerName());
dataSourceRules.put(each.getName(), new ReplicaQueryDataSourceRule(each, loadBalanceAlgorithm));
}
}
//...........
}
C57- ReplicaQueryRuleBuilder
M57_01- build(final ReplicaQueryRuleConfiguration ruleConfig, final Collection<String> dataSourceNames)
- new ReplicaQueryRule(ruleConfig) 构建 -> MC56_01
// 看到这里大概知道构建的位置了 , 上文看到过的类 C51- ShardingSphereRulesBuilder
C51- ShardingSphereRulesBuilder
M51_01- build(final Collection<RuleConfiguration> ruleConfigurations, final Collection<String> dataSourceNames)
?- 该方法会通过 RuleConfiguration 集合构建 Rule 对象
?- 再往上就不需要看了 , 最终还是会回到构建 ShardingSphereDataSource , 只需要根据配置类猜测配置方式即可 -> PS:M51_01_01
Шаг Приложение: Методы анализа связанных конфигураций для разделения чтения и записи
Класс конфигурации по-прежнему несколько предыдущих
- SpringBootConfiguration
- YamlReplicaQueryRuleSpringBootConfiguration : spring.shardingsphere.rules
- YamlReplicaQueryRuleConfiguration : replicaQuery
- Map<String, YamlReplicaQueryDataSourceRuleConfiguration> : dataSources
- Map<String, YamlShardingSphereAlgorithmConfiguration> : loadBalancers
- YamlReplicaQueryRuleConfiguration : replicaQuery
Я понимаю это, но я не знаю, является ли документ, который я ищу, неправильным.Недостаточно полагаться на официальный документ.В конце концов, я должен полагаться на себя.
Этот раздел, если вы передаете Bean класса конфигурации Sharding,Конфигурация, связанная с защитой от развертывания.
PS: предположение о классе конфигурации M51_01_01
// 点开可以看到 , 其中只有2个属性
public final class ReplicaQueryRuleConfiguration implements RuleConfiguration {
private final Collection<ReplicaQueryDataSourceRuleConfiguration> dataSources;
private final Map<String, ShardingSphereAlgorithmConfiguration> loadBalancers;
}
// 这里再来说明一下 , 如何逆推配置 , Sharding 的配置解析方式是很常见的方式 , 按照类型和方法名去反写一个就行了 , 比如这个配置
spring.shardingsphere.rules.sharding.tables.t_blog.key-generate-strategy.column=id
- sharding 来源 : YamlShardingRuleSpringBootConfiguration
我们按照这个方法 , 结合官方文档 , 逆推一下 , 可以找到如下几个类 :
- YamlReplicaQueryRuleSpringBootConfiguration : spring.shardingsphere.rules
- YamlReplicaQueryRuleConfiguration : replicaQuery
- Map<String, YamlReplicaQueryDataSourceRuleConfiguration> : dataSources
- Map<String, YamlShardingSphereAlgorithmConfiguration> : loadBalancers
public final class YamlReplicaQueryDataSourceRuleConfiguration implements YamlConfiguration {
private String name;
private String primaryDataSourceName;
private List<String> replicaDataSourceNames = new ArrayList<>();
private String loadBalancerName;
private Properties props = new Properties();
}
public final class YamlShardingSphereAlgorithmConfiguration implements YamlConfiguration {
private String type;
private Properties props = new Properties();
}
// 逆推得到读写分离的配置信息 :
spring.shardingsphere.rules.replica-query.data-sources.ds_0.primary-data-source-name=primary_ds_0
spring.shardingsphere.rules.replica-query.data-sources.ds_0.replica-data-source-names=primary_ds_0_replica_0, primary_ds_0_replica_1
spring.shardingsphere.rules.replica-query.data-sources.ds_1.primary-data-source-name=primary_ds_1
spring.shardingsphere.rules.replica-query.data-sources.ds_1.replica-data-source-names=primary_ds_1_replica_0, primary_ds_1_replica_1
// 小技巧 : 拿着 spring.shardingsphere.rules.replica-query 去网上搜 , 可能会有意想不到的惊喜
最终配置结果 :
spring.shardingsphere.rules.replica-query.data-sources.ds_0.name=rq-ds0
spring.shardingsphere.rules.replica-query.data-sources.ds_0.primary-data-source-name=ds0
spring.shardingsphere.rules.replica-query.data-sources.ds_0.replica-data-source-names=ds0,ds1,ds2
# Load balance 算法
spring.shardingsphere.rules.replica-query.load-balancers.rq-ds0.type=round_robin
spring.shardingsphere.rules.replica-query.load-balancers.rq-ds0.props.null=
// 请求地址 , 可以看到分别从 ds0,ds1,ds2 获取信息 , 插入数据只会插入 ds0
// PS : 读写分离一般配合数据库主从同步做的
Как вы можете сказать мне угадать это, не глядя на исходный код??????????????
3. Конфигурация
Выложите полную конфигурацию сюда
server.port=8085
##Jpa配置
spring.jpa.database=mysql
spring.jpa.show-sql=true
spring.jpa.hibernate.ddl-auto=none
# 配置真实数据源 ds0,ds1,ds2
spring.shardingsphere.datasource.names=ds0,ds1,ds2
spring.shardingsphere.datasource.common.type=com.zaxxer.hikari.HikariDataSource
spring.shardingsphere.datasource.common.driver-class-name=com.mysql.jdbc.Driver
# 配置第 1 个数据源
spring.shardingsphere.datasource.ds0.type=com.zaxxer.hikari.HikariDataSource
spring.shardingsphere.datasource.ds0.driver-class-name=com.mysql.jdbc.Driver
spring.shardingsphere.datasource.ds0.jdbc-url=jdbc:mysql://127.0.0.1:3306/database0?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&allowMultiQueries=true&serverTimezone=UTC
spring.shardingsphere.datasource.ds0.username=root
spring.shardingsphere.datasource.ds0.password=123456
# 配置第 2 个数据源
spring.shardingsphere.datasource.ds1.type=com.zaxxer.hikari.HikariDataSource
spring.shardingsphere.datasource.ds1.driver-class-name=com.mysql.jdbc.Driver
spring.shardingsphere.datasource.ds1.jdbc-url=jdbc:mysql://127.0.0.1:3306/database1?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&allowMultiQueries=true&serverTimezone=UTC
spring.shardingsphere.datasource.ds1.username=root
spring.shardingsphere.datasource.ds1.password=123456
# 配置从库 3
spring.shardingsphere.datasource.ds2.type=com.zaxxer.hikari.HikariDataSource
spring.shardingsphere.datasource.ds2.driver-class-name=com.mysql.jdbc.Driver
spring.shardingsphere.datasource.ds2.jdbc-url=jdbc:mysql://127.0.0.1:3306/database2?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&allowMultiQueries=true&serverTimezone=UTC
spring.shardingsphere.datasource.ds2.username=root
spring.shardingsphere.datasource.ds2.password=123456
# Sharding 读写分离配置
# 是否显示 SQL
spring.shardingsphere.props.sql.show=true
# 是否 Bean 覆盖
spring.main.allowBeanDefinitionOverriding=true
# 官网提供的配置 , 这是哪里的?????????????????????
#spring.shardingsphere.rules.readwrite-splitting.data-sources.dataSource.primary-data-source-name= ds0
#spring.shardingsphere.rules.readwrite-splitting.data-sources.dataSource.replica-data-source-names= ds1,ds2
#spring.shardingsphere.rules.readwrite-splitting.data-sources.dataSource.load-balancer-name= round_robin_type
# 实际配置信息
spring.shardingsphere.rules.replica-query.data-sources.ds_0.name=rq-ds0
spring.shardingsphere.rules.replica-query.data-sources.ds_0.primary-data-source-name=ds0
spring.shardingsphere.rules.replica-query.data-sources.ds_0.replica-data-source-names=ds0,ds1,ds2
# Load balance 算法
spring.shardingsphere.rules.replica-query.load-balancers.rq-ds0.type=round_robin
spring.shardingsphere.rules.replica-query.load-balancers.rq-ds0.props.null=
Суммировать
Разделение чтения и записи на самом деле очень простое, в основном потому, что я не знаю метод настройки новой версии, и надеюсь, что этот документ может быть полезен для устранения неполадок в будущем.