Использование JPA и SHARDING-JDBC для динамического разделения месячной таблицы с нуля

Java

Начинать

Используйте spring-data-jpa и sharding-jdbc для динамической месячной таблицы с нуля, начните работу напрямую.

Заявление о потребностях

Количество данных поступает в соответствующую месячную таблицу по ключу сегмента (время хранения), а указанная таблица запрашивается по значению ключа сегмента, но каждый запрос должен приносить ключ сегмента, что не очень удобно, поэтому Кроме того, следующие инструкции о том, как запросить последние два месяца, если ключ сегмента не указан.

Предварительная подготовка

оператор создания таблицы

-- 逻辑表,每个月表都根据逻辑表生成
CREATE TABLE `EXAMPLE` (
  `ID` bigint(36) NOT NULL AUTO_INCREMENT,
  `NAME` varchar(255) NOT NULL,
  `CREATED` datetime(3) DEFAULT NULL,
  `UPDATED` datetime DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  PRIMARY KEY (`ID`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-- 月表
CREATE TABLE `EXAMPLE_201909` (
  `ID` bigint(36) NOT NULL AUTO_INCREMENT,
  `NAME` varchar(255) NOT NULL,
  `CREATED` datetime(3) DEFAULT NULL,
  `UPDATED` datetime DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  PRIMARY KEY (`ID`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
CREATE TABLE `EXAMPLE_201910` (
  `ID` bigint(36) NOT NULL AUTO_INCREMENT,
  `NAME` varchar(255) NOT NULL,
  `CREATED` datetime(3) DEFAULT NULL,
  `UPDATED` datetime DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  PRIMARY KEY (`ID`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

класс сущности

@Entity
@Data
@Table(name = "EXAMPLE")
public class Example implements Serializable {
	private static final long serialVersionUID = 1L;
	@Id
	@GeneratedValue(strategy = GenerationType.IDENTITY)
	@Column(name = "ID")
	private String id;
	@Column(name = "NAME")
	private String name;
	@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss.SSS", timezone = "GMT+8")
	@Column(name = "CREATED")
	private Date created;
	@Column(name = "UPDATED", insertable = false, updatable = false)
	private Date updated;
}

repo

import java.util.Date;
import java.util.List;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.JpaSpecificationExecutor;
import com.test.sharding.entity.Example;

public interface ExampleRepo extends JpaRepository<Example, Long>, JpaSpecificationExecutor<Example> {
	List<Example> findByCreatedBetween(Date start, Date end);
}

Зависимости Maven

После тестирования он поддерживает Springboot 2.0.X+ и 1.5.X+.

		<dependency>
			<groupId>mysql</groupId>
			<artifactId>mysql-connector-java</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-data-jpa</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-devtools</artifactId>
			<scope>runtime</scope>
			<optional>true</optional>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-configuration-processor</artifactId>
			<optional>true</optional>
		</dependency>
		<dependency>
			<groupId>org.projectlombok</groupId>
			<artifactId>lombok</artifactId>
			<optional>true</optional>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
			<exclusions>
				<exclusion>
					<groupId>org.junit.vintage</groupId>
					<artifactId>junit-vintage-engine</artifactId>
				</exclusion>
			</exclusions>
		</dependency>
		<dependency>
			<groupId>io.shardingsphere</groupId>
			<artifactId>sharding-jdbc-spring-boot-starter</artifactId>
			<version>3.0.0</version>
		</dependency>
		<dependency>
			<groupId>cn.hutool</groupId>
			<artifactId>hutool-all</artifactId>
			<version>4.6.7</version>
		</dependency>
		<dependency>
			<groupId>org.apache.commons</groupId>
			<artifactId>commons-lang3</artifactId>
		</dependency>
		<dependency>
			<groupId>com.alibaba</groupId>
			<artifactId>druid</artifactId>
			<version>1.1.20</version>
		</dependency>

Реализация алгоритма фрагментации

Поскольку выбранная стратегия сегментированияStandardShardingStrategy(Он будет настроен в конфигурационном файле позже), поэтому вам нужно попробовать следующие два алгоритма шардинга:

  • точный алгоритм шардинга
import java.util.Collection;
import java.util.Date;
import cn.hutool.core.date.DateUtil;
import io.shardingsphere.api.algorithm.sharding.PreciseShardingValue;
import io.shardingsphere.api.algorithm.sharding.standard.PreciseShardingAlgorithm;

public class MyPreciseShardingAlgorithm implements PreciseShardingAlgorithm<Date> {
  // 可以优化为全局变量
	private static String yearAndMonth = "yyyyMM";

	@Override
	public String doSharding(Collection<String> availableTargetNames, PreciseShardingValue<Date> shardingValue) {
		StringBuffer tableName = new StringBuffer();
		tableName.append(shardingValue.getLogicTableName()).append("_")
				.append(DateUtil.format(shardingValue.getValue(), yearAndMonth));
		return tableName.toString();
	}
}
  • Алгоритм разделения диапазона
public class TimeRangeShardingAlgorithm implements RangeShardingAlgorithm<Date> {
	private static String yearAndMonth = "yyyyMM";
	/**
	 * 只查询最近两个月的数据
	 */
	@Override
	public Collection<String> doSharding(Collection<String> availableTargetNames, RangeShardingValue<Date> shardingValue) {
		Collection<String> result = new LinkedHashSet<String>();
		Range<Date> range = shardingValue.getValueRange();
		// 获取范围
		String end = DateUtil.format(range.lowerEndpoint(), yearAndMonth);
		// 获取前一个月
		String start = DateUtil.format(range.upperEndpoint(), yearAndMonth);
		result.add(shardingValue.getLogicTableName() + "_" + start);
		if (!end.equals(start)) {
			result.add(shardingValue.getLogicTableName() + "_" + end);
		}
		return result;
	}

}

конфигурация application.yml

spring:
  datasource: # 可有可无,在配置了sharding之后,默认只会有sharding数据源生效
    type: com.alibaba.druid.pool.DruidDataSource
    url: jdbc:mysql://localhost:3306/ddssss
    username: root
    password: ppppppp
    tomcat:
      initial-size: 5
    driver-class-name: com.mysql.jdbc.Driver
  jpa:
    database: mysql
sharding:
  jdbc:
    datasource:
      names: month-0 # 数据源名称
      month-0:
        driver-class-name: com.mysql.jdbc.Driver
        url: jdbc:mysql://localhost:3306/ddssss
        username: root
        password: ppppppp
        type: com.alibaba.druid.pool.DruidDataSource
    config:
      sharding:
        tables:
          month: # 表名
            key-generator-column-name: id # 主键名称
            table-strategy:
              standard:
                sharding-column: ccreated # 分片键
                precise-algorithm-class-name: com.example.sharding.config.MyPreciseShardingAlgorithm # 实现类的完全限定类名
                range-algorithm-class-name: com.example.sharding.config.MyRangeShardingAlgorithm # 实现类的完全限定类名
        props:
          sql.show: true # 是否显示SQL ,默认为false

контрольная работа


import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import javax.persistence.criteria.Predicate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.data.jpa.domain.Specification;
import org.springframework.stereotype.Component;
import com.alibaba.fastjson.JSONObject;
import com.test.sharding.entity.Example;
import com.test.sharding.repository.ExampleRepo;
import cn.hutool.core.date.DateUtil;
import lombok.extern.slf4j.Slf4j;

@Component
@Slf4j
public class StartRunner implements CommandLineRunner {
	@Autowired
	ExampleRepo exampleRepo;

	@Override
	public void run(String... args) throws Exception {
		log.info("==============init===================");
		Example example = new Example();
		example.setName("我的名字");
		example.setCreated(new Date());
		exampleRepo.save(example);
		log.info("example:{}", JSONObject.toJSONString(example));
		// 普通条件查询
		List<Example> list = exampleRepo.findAll(org.springframework.data.domain.Example.<Example>of(example));
		log.info("normal list :{}", JSONObject.toJSONString(list));
		// 动态条件查询
		Example condtion = new Example();
		condtion.setCreated(example.getCreated());
		list = exampleRepo.findAll(getIdSpecification(condtion));
		log.info("dynamic list :{}", JSONObject.toJSONString(list));
		// 范围查询
		Date end = new Date();
		list = exampleRepo.findByCreatedBetween(DateUtil.lastMonth()
				.toJdkDate(), end);
		log.info("range select list :{}", JSONObject.toJSONString(list));
	}

	protected Specification<Example> getIdSpecification(final Example condtion) {
		return (root, query, cb) -> {
			List<Predicate> list = new ArrayList<>();
			list.add(cb.equal(root.<Date>get("created"), condtion.getCreated()));
			Predicate[] predicates = new Predicate[list.size()];
			query.where(list.toArray(predicates));
			return query.getRestriction();
		};
	}
}

После запуска вы увидите журнал следующим образом:

база данных:

  • поверхность:

  • данные

постскриптум

Хотя это реализует динамическое разделение ежемесячного запроса и вставки в таблицу на основе времени, в реальном использовании все еще остается много мелких проблем.Например, метод сохранения все равно будет выполняться, когда указан первичный ключ.INSERTвместоUPDATE, вы должны принести ключ сегмента при запросе, и вам нужно вручную создать последующую месячную таблицу.

Для этих трех проблем необходима дальнейшая оптимизация.

причина проблемы

  1. Почему метод сохранения все еще работает, когда указан первичный ключINSERTвместоUPDATE

Когда указанный первичный ключ не пуст, JPA SAVE сначала обращается к таблице, чтобы проверить, существует ли первичный ключ, но условием запроса является только первичный ключ без ключа сегментирования.Стратегия Sharding-JDBC заключается в том, что ключ сегментирования не указан. Будут опрошены все сегментированные таблицы.

Но здесь есть недоразумение: Sharding-JDBC, активно запрашивающий все сегментированные таблицы, относится к случаю фиксированного сегментирования. Например, здесь есть еще одна таблица, судя по шардингу по четности идентификаторов, там две таблицы. Тогда все данные будут в двух таблицах, и мы тоже напрямую две таблицы настраиваем при настройке.

Это не относится к нашим текущим потребностям, потому что наши правила сегментирования таблиц основаны на времени, и каждый месяц и каждый год создается новая таблица, поэтому, если нет указанного ключа сегментирования, заслуживающего запроса, Sharding-JDBC по умолчанию запрашивает логический стол. В это время, если он возвращает пустое значение, JPA будет думать, что в первичном ключе нет данных, поэтому соответствующий SQLINSERTвместоUPDATE.

  1. Почему ключ сегмента должен быть включен при запросе?

Причина та же, что и выше: Sharding-JDBC запрашивает логическую таблицу по значению, когда ключ сегмента не указан.

  1. Последующие ежемесячные таблицы также необходимо создавать вручную.

Во-первых, нужно каждый месяц создавать соответствующую месячную таблицу.Конечно, можно сразу создать таблицу за несколько лет за один раз, но я чувствую, что это бессмысленно.Такого рода повторяющиеся вещи должна делать программа , и ежемесячное время создания должно быть выполнено.

решение

Для проблемы 1 и проблемы 2 я напрямую переписываю правила маршрутизации Sharding-JDBC, которые прекрасно решаются.

  • Переписать правила маршрутизации

класс нужно изменитьio.shardingsphere.core.routing.type.standard.StandardRoutingEngineизrouteTablesметод и объявляет статическую переменную для записи логической таблицы, которую необходимо разделить на таблицы.Конкретный код выглядит следующим образом:

// 时间格式化
private static String yearAndMonth = "yyyyMM";
// 保存需要分表的逻辑表
private static final Set<String> needRoutTables = new HashSet<>(
			Lists.newArrayList("EXAMPLE"));
	private Collection<DataNode> routeTables(final TableRule tableRule, final String routedDataSource,
			final List<ShardingValue> tableShardingValues) {
		Collection<String> availableTargetTables = tableRule.getActualTableNames(routedDataSource);
		// 路由表,根据分表算法得到,动态分表时如果条件里没有分片键则返回逻辑表,本文是:EXAMPLE
		Collection<String> routedTables = new LinkedHashSet<>(tableShardingValues.isEmpty() ? availableTargetTables
				: shardingRule.getTableShardingStrategy(tableRule)
						.doSharding(availableTargetTables, tableShardingValues));
		// 如果得到的路由表只有一个,因为大于2的情况都应该是制定了分片键的(分表是不建议联表查询的)
		if (routedTables.size() <= 1) {
			// 得到逻辑表名
			String routeTable = routedTables.iterator()
					.next();
			// 判断是否需要分表,true代表需要分表
			if (needRoutTables.contains(routeTable)) {
				// 移除逻辑表
				routedTables.remove(routeTable);
				Date now = new Date();
				// 月份后缀,默认最近两个月
				String nowSuffix = DateUtil.format(now, yearAndMonth);
				String lastMonthSuffix = DateUtil.format(DateUtil.lastMonth(), yearAndMonth);
				routedTables.add(routeTable + "_" + nowSuffix);
				routedTables.add(routeTable + "_" + lastMonthSuffix);
			}
		}
		Preconditions.checkState(!routedTables.isEmpty(), "no table route info");
		Collection<DataNode> result = new LinkedList<>();
		for (String each : routedTables) {
			result.add(new DataNode(routedDataSource, each));
		}
		return result;
	}

Для проблемы 3 используйте программу для регулярного построения таблицы, здесь я не выбрал общий оператор построения таблицы:

-- ****** 日期,在程序里动态替换
CREATE TABLE `EXAMPLE_******` (
  `ID` bigint(36) NOT NULL AUTO_INCREMENT,
  `NAME` varchar(255) NOT NULL,
  `CREATED` datetime(3) DEFAULT NULL,
  `UPDATED` datetime DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  PRIMARY KEY (`ID`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

Основными причинами являются следующие две

  1. В общих проектах полей таблицы вообще не так уж и мало, а оператор построения таблицы будет очень длинным.
  2. А последующее сопровождение нехорошо, любые изменения в таблице нужно сохранять в программе.

Я решил создать таблицу на основе шаблона, SQL выглядит следующим образом:

-- ****** 日期,在程序里动态替换
CREATE TABLE IF NOT EXISTS `EXAMPLE_******` LIKE `EXAMPLE`

Преимущество этого заключается в том, что оператор создания таблицы относительно прост, и нет необходимости заботиться о структуре таблицы.Все создается из шаблона для создания новой ежемесячной таблицы. Но это также приводит к новой проблеме, Sharding-JDBC не поддерживает такой синтаксис. Поэтому необходимо модифицировать исходный код и переписать правила перехвата. конкретный классio.shardingsphere.core.parsing.parser.sql.ddl.create.table.AbstractCreateTableParserизparseметод:

	public final DDLStatement parse() {
		lexerEngine.skipAll(getSkippedKeywordsBetweenCreateIndexAndKeyword());
		lexerEngine.skipAll(getSkippedKeywordsBetweenCreateAndKeyword());
		CreateTableStatement result = new CreateTableStatement();
		if (lexerEngine.skipIfEqual(DefaultKeyword.TABLE)) {
			lexerEngine.skipAll(getSkippedKeywordsBetweenCreateTableAndTableName());
		} else {
			throw new SQLParsingException("Can't support other CREATE grammar unless CREATE TABLE.");
		}
		tableReferencesClauseParser.parseSingleTableWithoutAlias(result);
		// 注释掉这个命令
		// lexerEngine.accept(Symbol.LEFT_PAREN);
		do {
			parseCreateDefinition(result);
		} while (lexerEngine.skipIfEqual(Symbol.COMMA));
		// 注释掉这个命令
		// lexerEngine.accept(Symbol.RIGHT_PAREN);
		return result;
	}

Суммировать

На этом этапе завершено полное динамическое разделение месячной таблицы.В целом это относительно просто.Настоящая трудность заключается в анализе исходного кода при возникновении проблем, и может разумно реализовать собственное деление в соответствии с их собственные бизнес-потребности логика таблиц.