Теги: спрингбатч
1. Введение
Предыдущий"Удобное чтение и запись данных — весенний пакет (5) в сочетании с beetlSql для чтения и записи данных"Используется вSpring Batch
а такжеBeetlSql
, чтобы синхронизировать компоненты чтения и записи базы данных, что фактически является полной синхронизацией. Проблема с полной синхронизацией заключается в том, что все данные таблицы нужно читать каждый раз, если данные таблицы большие, потребление ресурсов велико, а обновлять существующие данные неудобно. Поэтому в процессе синхронизации данных больше используется инкрементальная синхронизация, то есть через определенные условия выделяют новые данные для вставки, обновляют данные, которые изменились, удаляют данные, которых нет и т. д. (конечно, в общем случае не данные будут удалены физически и только логически удалены, поэтому это становится операцией обновления данных).
В большинстве случаев добавочного обновления оно должно основываться на состоянии после последнего обновления (например, время, идентификатор автоинкремента, расположение данных и т. д.), а следующее обновление основывается на состоянии предыдущего обновления. Поэтому необходимо использовать переменную после каждого обновления. Метод параметров сохраняется, и следующее обновление будет использовать эти данные состояния в качестве динамических параметров.Spring Batch
Он поддерживает динамические параметры во время выполнения задачи.В сочетании с этой функцией может быть реализована инкрементная синхронизация данных.
2. Среда разработки
- JDK: jdk1.8
- Spring Boot: 2.1.4.RELEASE
- Spring Batch:4.1.2.RELEASE
- Среда разработки: ИДЕЯ
- Инструменты сборки Maven: 3.3.9
- Журнал компонента журнала: 1.2.3
- lombok:1.18.6
3. Краткий обзор инкрементальной синхронизации
Инкрементная синхронизация относится к полной синхронизации, то есть для каждой синхронизации необходимо синхронизировать только измененную часть исходной базы данных, что повышает эффективность синхронизации данных. Это обычный способ синхронизации текущих данных. Извлечь измененные данные, также известные какCDC
,Прямо сейчасChange Data Capture
Изменение сбора данных. существуетРешение Pentaho Kettle: создание решений ETL с открытым исходным кодом с помощью PDIкнига, даCDC
сделал более подробное описание. Вот краткое описание.В настоящее время существует четыре способа достижения инкрементальной синхронизации, которые основаны на исходных данных.CDC
, на основе триггераCDC
, на основе снимковCDC
, на основе журналаCDC
.
3.1 По исходным даннымCDC
на основе исходных данныхCDC
Исходные данные должны иметь соответствующие столбцы атрибутов. Используя эти столбцы атрибутов, вы можете определить, где находятся добавочные данные. Наиболее распространенные столбцы атрибутов:
-
отметка времени Для идентификации данных по времени требуется как минимум один раз, а лучше два: один для определения времени создания и один для определения времени обновления, поэтому обычно мы добавляем его при проектировании базы данных.
sys_create_time
а такжеsys_update_time
В качестве полей по умолчанию и предназначены для текущего времени по умолчанию и обработки обновлений. -
самоувеличивающаяся последовательность Используйте самоувеличивающееся поле последовательности (обычно первичный ключ) таблицы базы данных, чтобы идентифицировать вновь вставленные данные. Однако в действительности он используется меньше.
Для этого метода требуется временная таблица для хранения времени последнего обновления, или на практике эта таблица обычно создается в отдельной схеме для хранения данных. Следующее обновление сравнивает последнее время или последовательность. Это наиболее часто используемый метод, и добавочная синхронизация в этой статье также использует этот метод.
3.2 На основе триггераCDC
Запишите триггер в базу данных, текущая база данных выполняетсяINSERT
,UPDATE
,DELETE
При ожидании оператора может быть активирован триггер в базе данных, после чего триггер может сохранить измененные данные в промежуточную временную таблицу, а затем получить данные из временной таблицы и синхронизировать их с целевой базой данных. Конечно, этот метод является наиболее инвазивным, и общие базы данных не позволяют добавлять триггеры в базу данных (влияющие на производительность).
3.3 На основе снимковCDC
Этот метод заключается в том, чтобы извлечь все текущие данные за один раз и поместить их в буфер в виде снимка, прочитать данные из исходных данных при следующей синхронизации, а затем сравнить их со снимком, чтобы узнать измененные данные. Проще говоря, это прочитать и сравнить всю таблицу, чтобы узнать измененные данные. При полном сканировании таблицы проблема заключается в производительности, поэтому этот метод обычно не используется.
3.4 На основе журналаCDC
Наиболее продвинутым и наименее инвазивным методом является метод на основе журнала.База данных будет записывать в журнал операции вставки, обновления и удаления, такие какMysql
будутbinlog
, Инкрементальная синхронизация может прочитать лог-файл, превратить бинарный файл в понятный вид, а затем повторить операции по порядку. Однако этот метод действителен только для баз данных одного типа и не может быть реализован для разнородных баз данных. И добиться этого трудно.
3.5 Описание метода инкрементной синхронизации в этом примере
В этом примере, все еще основанном наtest_user
Таблица инкрементно синхронизируется, и в таблице есть поляsys_create_time
а такжеsys_update_time
Чтобы определить время создания и обновления данных (в настоящее время, если в реальности есть только одно время, оно может быть основано только на этом времени, но сложнее определить, обновляются данные или вставляются). Процесс инкрементной синхронизации выглядит следующим образом:
проиллюстрировать:
- Для каждой синхронизации временная таблица будет считываться первой, чтобы получить время данных после последней синхронизации.
- Если синхронизация первая, то синхронизируются все, если нет, то время используется как параметр оператора запроса.
- Прочитав данные по времени, вставьте данные в целевую таблицу.
- Обновите время данных временной таблицы для следующей синхронизации.
4. Привязка динамических параметров Spring Batch
В соответствии с описанным выше процессом инкрементной синхронизации ключевым моментом является сохранение времени данных во временной таблице, которую можно использовать в качестве условия сравнения при чтении данных. И этот параметр времени является динамическим и передается при выполнении задачи.Spring Batch
, поддерживает привязку динамических параметров, просто используйте@StepScope
Аннотация может быть объединенаBeetlSql
, и вскоре станет возможной добавочная синхронизация. Этот пример основан напредыдущий постпример для дальнейшего развития, вы можетеСкачать исходный кодПосмотрите полный пример.
4.1 Наследовать исходную конфигурацию базы данных и несколько источников данных
- Исходная база данных:
mytest
- Целевая база данных:
my_test1
- база данных весенней партии:
my_spring_batch
- Таблица синхронизированных данных:
test_user
4.2 Создание временной таблицы
Используя пример вsql/initCdcTempTable.sql
,существуетmy_spring_batch
В библиотеке создайте временную таблицуcdc_temp
, и вставьте запись как1
записей, идентифицированных как синхронизацияtest_user
поверхность. Здесь нам нужно только обратить внимание наlast_update_time
а такжеcurrent_update_time
, первое указывает последнее время данных после последней синхронизации, а второе указывает системное время после последней синхронизации.
4.3 Добавить/изменить дао
4.3.1 Добавление временной таблицы dao и класса обслуживания
- добавить класс
CdcTempRepository
В зависимости от конфигурации, посколькуcdc_temp
вmy_spring_batch
, в то время как его чтение и запись находятся вdao.local
пакет, поэтому вам нужно добавитьdao.local
package, затем добавьте классCdcTempRepository
,Следующим образом:
@Repository
public interface CdcTempRepository extends BaseMapper<CdcTemp> {
}
- добавить класс
CdcTempService
, дляcdc_temp
Чтение таблицы и обновление данных В основном он включает в себя две функции, одна из которых - получение текущегоcdc_temp
Записывается, чтобы получить время последней синхронизации данных. Один из них — обновить после завершения синхронизации.cdc_temp
Данные. следующим образом:
/**
* 根据id获取cdc_temp的记录
* @param id 记录ID
* @return {@link CdcTemp}
*/
public CdcTemp getCurrentCdcTemp(int id){
return cdcTempRepository.getSQLManager().single(CdcTemp.class, id);
}
/**
* 根据参数更新cdcTemp表的数据
* @param cdcTempId cdcTempId
* @param status job状态
* @param lastUpdateTime 最后更新时间
*/
public void updateCdcTempAfterJob(int cdcTempId,BatchStatus status,Date lastUpdateTime){
//获取
CdcTemp cdcTemp = cdcTempRepository.getSQLManager().single(CdcTemp.class, cdcTempId);
cdcTemp.setCurrentUpdateTime(DateUtil.date());
//正常完成则更新数据时间
if( status == BatchStatus.COMPLETED){
cdcTemp.setLastUpdateTime(lastUpdateTime);
}else{
log.info(LogConstants.LOG_TAG+"同步状态异常:"+ status.toString());
}
//设置同步状态
cdcTemp.setStatus(status.name());
cdcTempRepository.updateById(cdcTemp);
}
4.3.2 Изменить дао исходных данных
В классе дао исходных данныхOriginUserRepository
добавить функциюgetOriginIncreUser
, эта функция соответствуетuser.md
серединаsql
утверждение.
4.3.3 Изменить целевой дао данных
В целевом классе дао данныхTargetUserRepository
добавить функцию вselectMaxUpdateTime
, который используется для запроса последнего времени данных после синхронизации. Поскольку sql этого метода прост, его можно использовать напрямую@Sql
Аннотация, как показано ниже:
@Sql(value="select max(sys_update_time) from test_user")
Date selectMaxUpdateTime();
4.4 Модификацияuser.md
серединаsql
утверждение.
4.4.1 Добавление инкрементных данных чтения sql
существуетuser.md
Добавьте оператор sql для постепенного чтения данных следующим образом:
getOriginIncreUser
===
* 查询user数据
select * from test_user
WHERE 1=1
@if(!isEmpty(lastUpdateTime)){
AND (sys_create_time >= #lastUpdateTime# OR sys_update_time >= #lastUpdateTime#)
@}
проиллюстрировать:
-
@
начинается сbeetl
Синтаксис переменной можно прочитать и логически осмыслить.Здесь это означает, что если переменнаяlastUpdateTime
Если он не пустой, читаем согласно этому условию. -
lastUpdateTime
Переменная передается вызовом (Map
) - специфический
beetl
Используйте синтаксис, см.официальная документация
4.4.2 Запись инструкции инкрементной вставки sql
дляMysql
база данных, сinsert into ... on duplicate key update ...
Использование , то есть по уникальному ключу (первичному ключу или уникальному индексу), если данные уже существуют, они будут обновлены, а если их нет, они будут вставлены. существуетuser.md
файл, добавьте следующее утверждение:
insertIncreUser
===
* 插入数据
insert into test_user(id,name,phone,title,email,gender,date_of_birth,sys_create_time,sys_create_user,sys_update_time,sys_update_user)
values (#id#,#name#,#phone#,#title#,#email#,#gender#,#dateOfBirth#
,#sysCreateTime#,#sysCreateUser#,#sysUpdateTime#,#sysUpdateUser#)
ON DUPLICATE KEY UPDATE
id = VALUES(id),
name = VALUES(name),
phone = VALUES(phone),
title = VALUES(title),
email = VALUES(email),
gender = VALUES(gender),
date_of_birth = VALUES(date_of_birth),
sys_create_time = VALUES(sys_create_time),
sys_create_user = VALUES(sys_create_user),
sys_update_time = VALUES(sys_update_time),
sys_update_user = VALUES(sys_update_user)
4.5 Написание компонентов пружинной партии
Spring Batch
Структура файла следующая:
4.5.1 ItemReader
Это то же самое, что и раньше, только нужно поставитьgetOriginUser
функция изменена наgetOriginIncreUser
Вот и все.
4.5.2 ItemWriter
Это то же самое, что и предыдущее, только нужно изменить идентификатор sql сuser.insertUser
изменить наuser.insertIncreUser
Вот и все.
4.5.3 ДобавитьIncrementJobEndListener
После синхронизации данных последним шагом является обновление последних временных данных временной таблицы. следующим образом:
@Slf4j
public class IncrementJobEndListener extends JobExecutionListenerSupport {
@Autowired
private CdcTempService cdcTempService;
@Autowired
private TargetUserRepository targetUserRepository;
@Override
public void afterJob(JobExecution jobExecution) {
BatchStatus status = jobExecution.getStatus();
Date latestDate = targetUserRepository.selectMaxUpdateTime();
cdcTempService.updateCdcTempAfterJob(SyncConstants.CDC_TEMP_ID_USER,status,latestDate);
}
}
проиллюстрировать:
- Сначала запросите последнее время данных в текущей базе данных (
selectMaxUpdateTime
) - Обновить данные промежуточной таблицы
cdc_temp
серединаlast_update_time
4.5.4 Добавить инициализацию параметра при запуске задачи
На первом шаге синхронизации данных необходимо инициализировать время последнего обновления данных во временной таблице, поэтому перед запуском задачи необходимо задать параметры задачи, чтобы можно было передать параметры времени к задаче и используется при выполнении задачи. следующим образом:
public JobParameters initJobParam(){
CdcTemp currentCdcTemp = cdcTempService.getCurrentCdcTemp(getCdcTempId());
//若未初始化,则先查询数据库中对应的最后时间
if(SyncConstants.STR_STATUS_INIT.equals(currentCdcTemp.getStatus())
|| SyncConstants.STR_STATUS_FAILED.equals(currentCdcTemp.getStatus())){
Date maxUpdateTime = selectMaxUpdateTime();
//若没有数据,则按初始时间处理
if(Objects.nonNull(maxUpdateTime)){
currentCdcTemp.setLastUpdateTime(maxUpdateTime);
}
}
return JobUtil.makeJobParameters(currentCdcTemp);
}
4.5.5 Сборка полной задачи
Наконец, вам нуженIncrementBatchConfig
Конфигурация собирает чтение, обработку, запись и мониторинг.Стоит упомянуть, что при настройке компонента чтения, поскольку необходимо использовать динамические параметры, его необходимо добавить сюда.@StepScope
Аннотация, также используемая в параметрахspEL
Получите содержимое параметра следующим образом:
@Bean
@StepScope
public ItemReader incrementItemReader(@Value("#{jobParameters['lastUpdateTime']}") String lastUpdateTime) {
IncrementUserItemReader userItemReader = new IncrementUserItemReader();
//设置参数,当前示例可不设置参数
Map<String,Object> params = CollUtil.newHashMap();
params.put(SyncConstants.STR_LAST_UPDATE_TIME,lastUpdateTime);
userItemReader.setParams(params);
return userItemReader;
}
4.5.6 Тестирование
Обратитесь к предыдущей статьеBeetlsqlJobTest
,записыватьIncrementJobTest
тестовый файл. Из-за необходимости протестировать добавочную синхронизацию процесс тестирования выглядит следующим образом:
- Инкрементальное добавление данных перед тестированием
Перед тестом в таблице исходных данных и таблице целевых данных уже есть данные, в таблице исходных данных выполните код в
sql/user-data-new.sql
Добавьте новых пользователей. Обратите внимание, что из-заsys_create_time
а такжеsys_update_time
Определяется следующим образом:
`sys_create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
`sys_update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
Таким образом, время автоматически генерируется при вставке данных, а также автоматически обновляется при изменении данных.
-
запустить тест запускать как модульные тесты
incrementJob
. -
Посмотреть Результаты После запуска результат такой:
После инкрементальной синхронизации данные выглядят следующим образом:
5. Резюме
В этой статье сначала дается краткое введение в добавочную синхронизацию, перечислены наиболее часто используемые в настоящее время методы добавочной синхронизации, а затем используютсяSpring Batch
а такжеBeetlSql
Инкрементальная синхронизация реализована с использованием методов на основе временных меток Этот пример имеет определенную практическую ценность, и я надеюсь, что он будет полезен разработчикам, которые занимаются синхронизацией данных или связанной с ней пакетной обработкой.