Инкрементная синхронизация - динамическая привязка параметров spring batch (6) и инкрементная синхронизация

Spring

Теги: спрингбатч


1. Введение

Предыдущий"Удобное чтение и запись данных — весенний пакет (5) в сочетании с beetlSql для чтения и записи данных"Используется вSpring Batchа такжеBeetlSql, чтобы синхронизировать компоненты чтения и записи базы данных, что фактически является полной синхронизацией. Проблема с полной синхронизацией заключается в том, что все данные таблицы нужно читать каждый раз, если данные таблицы большие, потребление ресурсов велико, а обновлять существующие данные неудобно. Поэтому в процессе синхронизации данных больше используется инкрементальная синхронизация, то есть через определенные условия выделяют новые данные для вставки, обновляют данные, которые изменились, удаляют данные, которых нет и т. д. (конечно, в общем случае не данные будут удалены физически и только логически удалены, поэтому это становится операцией обновления данных).

В большинстве случаев добавочного обновления оно должно основываться на состоянии после последнего обновления (например, время, идентификатор автоинкремента, расположение данных и т. д.), а следующее обновление основывается на состоянии предыдущего обновления. Поэтому необходимо использовать переменную после каждого обновления. Метод параметров сохраняется, и следующее обновление будет использовать эти данные состояния в качестве динамических параметров.Spring BatchОн поддерживает динамические параметры во время выполнения задачи.В сочетании с этой функцией может быть реализована инкрементная синхронизация данных.

2. Среда разработки

  • JDK: jdk1.8
  • Spring Boot: 2.1.4.RELEASE
  • Spring Batch:4.1.2.RELEASE
  • Среда разработки: ИДЕЯ
  • Инструменты сборки Maven: 3.3.9
  • Журнал компонента журнала: 1.2.3
  • lombok:1.18.6

3. Краткий обзор инкрементальной синхронизации

Инкрементная синхронизация относится к полной синхронизации, то есть для каждой синхронизации необходимо синхронизировать только измененную часть исходной базы данных, что повышает эффективность синхронизации данных. Это обычный способ синхронизации текущих данных. Извлечь измененные данные, также известные какCDC,Прямо сейчасChange Data CaptureИзменение сбора данных. существуетРешение Pentaho Kettle: создание решений ETL с открытым исходным кодом с помощью PDIкнига, даCDCсделал более подробное описание. Вот краткое описание.В настоящее время существует четыре способа достижения инкрементальной синхронизации, которые основаны на исходных данных.CDC, на основе триггераCDC, на основе снимковCDC, на основе журналаCDC.

3.1 По исходным даннымCDC

на основе исходных данныхCDCИсходные данные должны иметь соответствующие столбцы атрибутов. Используя эти столбцы атрибутов, вы можете определить, где находятся добавочные данные. Наиболее распространенные столбцы атрибутов:

  • отметка времени Для идентификации данных по времени требуется как минимум один раз, а лучше два: один для определения времени создания и один для определения времени обновления, поэтому обычно мы добавляем его при проектировании базы данных.sys_create_timeа такжеsys_update_timeВ качестве полей по умолчанию и предназначены для текущего времени по умолчанию и обработки обновлений.

  • самоувеличивающаяся последовательность Используйте самоувеличивающееся поле последовательности (обычно первичный ключ) таблицы базы данных, чтобы идентифицировать вновь вставленные данные. Однако в действительности он используется меньше.

Для этого метода требуется временная таблица для хранения времени последнего обновления, или на практике эта таблица обычно создается в отдельной схеме для хранения данных. Следующее обновление сравнивает последнее время или последовательность. Это наиболее часто используемый метод, и добавочная синхронизация в этой статье также использует этот метод.

3.2 На основе триггераCDC

Запишите триггер в базу данных, текущая база данных выполняетсяINSERT,UPDATE,DELETEПри ожидании оператора может быть активирован триггер в базе данных, после чего триггер может сохранить измененные данные в промежуточную временную таблицу, а затем получить данные из временной таблицы и синхронизировать их с целевой базой данных. Конечно, этот метод является наиболее инвазивным, и общие базы данных не позволяют добавлять триггеры в базу данных (влияющие на производительность).

3.3 На основе снимковCDC

Этот метод заключается в том, чтобы извлечь все текущие данные за один раз и поместить их в буфер в виде снимка, прочитать данные из исходных данных при следующей синхронизации, а затем сравнить их со снимком, чтобы узнать измененные данные. Проще говоря, это прочитать и сравнить всю таблицу, чтобы узнать измененные данные. При полном сканировании таблицы проблема заключается в производительности, поэтому этот метод обычно не используется.

3.4 На основе журналаCDC

Наиболее продвинутым и наименее инвазивным методом является метод на основе журнала.База данных будет записывать в журнал операции вставки, обновления и удаления, такие какMysqlбудутbinlog, Инкрементальная синхронизация может прочитать лог-файл, превратить бинарный файл в понятный вид, а затем повторить операции по порядку. Однако этот метод действителен только для баз данных одного типа и не может быть реализован для разнородных баз данных. И добиться этого трудно.

3.5 Описание метода инкрементной синхронизации в этом примере

В этом примере, все еще основанном наtest_userТаблица инкрементно синхронизируется, и в таблице есть поляsys_create_timeа такжеsys_update_timeЧтобы определить время создания и обновления данных (в настоящее время, если в реальности есть только одно время, оно может быть основано только на этом времени, но сложнее определить, обновляются данные или вставляются). Процесс инкрементной синхронизации выглядит следующим образом:

流程

проиллюстрировать:

  • Для каждой синхронизации временная таблица будет считываться первой, чтобы получить время данных после последней синхронизации.
  • Если синхронизация первая, то синхронизируются все, если нет, то время используется как параметр оператора запроса.
  • Прочитав данные по времени, вставьте данные в целевую таблицу.
  • Обновите время данных временной таблицы для следующей синхронизации.

4. Привязка динамических параметров Spring Batch

В соответствии с описанным выше процессом инкрементной синхронизации ключевым моментом является сохранение времени данных во временной таблице, которую можно использовать в качестве условия сравнения при чтении данных. И этот параметр времени является динамическим и передается при выполнении задачи.Spring Batch, поддерживает привязку динамических параметров, просто используйте@StepScopeАннотация может быть объединенаBeetlSql, и вскоре станет возможной добавочная синхронизация. Этот пример основан напредыдущий постпример для дальнейшего развития, вы можетеСкачать исходный кодПосмотрите полный пример.

4.1 Наследовать исходную конфигурацию базы данных и несколько источников данных

  • Исходная база данных:mytest
  • Целевая база данных:my_test1
  • база данных весенней партии:my_spring_batch
  • Таблица синхронизированных данных:test_user

4.2 Создание временной таблицы

Используя пример вsql/initCdcTempTable.sql,существуетmy_spring_batchВ библиотеке создайте временную таблицуcdc_temp, и вставьте запись как1записей, идентифицированных как синхронизацияtest_userповерхность. Здесь нам нужно только обратить внимание наlast_update_timeа такжеcurrent_update_time, первое указывает последнее время данных после последней синхронизации, а второе указывает системное время после последней синхронизации.

4.3 Добавить/изменить дао

4.3.1 Добавление временной таблицы dao и класса обслуживания

  • добавить классCdcTempRepository

В зависимости от конфигурации, посколькуcdc_tempвmy_spring_batch, в то время как его чтение и запись находятся вdao.localпакет, поэтому вам нужно добавитьdao.localpackage, затем добавьте классCdcTempRepository,Следующим образом:

@Repository
public interface CdcTempRepository extends BaseMapper<CdcTemp> {
}
  • добавить классCdcTempService, дляcdc_tempЧтение таблицы и обновление данных В основном он включает в себя две функции, одна из которых - получение текущегоcdc_tempЗаписывается, чтобы получить время последней синхронизации данных. Один из них — обновить после завершения синхронизации.cdc_tempДанные. следующим образом:
/**
 * 根据id获取cdc_temp的记录
 * @param id 记录ID
 * @return {@link CdcTemp}
 */
public CdcTemp getCurrentCdcTemp(int id){
    return cdcTempRepository.getSQLManager().single(CdcTemp.class, id);
}

/**
 * 根据参数更新cdcTemp表的数据
 * @param cdcTempId cdcTempId
 * @param status job状态
 * @param lastUpdateTime 最后更新时间
 */
public void updateCdcTempAfterJob(int cdcTempId,BatchStatus status,Date lastUpdateTime){
    //获取
    CdcTemp cdcTemp = cdcTempRepository.getSQLManager().single(CdcTemp.class, cdcTempId);
    cdcTemp.setCurrentUpdateTime(DateUtil.date());
    //正常完成则更新数据时间
    if( status == BatchStatus.COMPLETED){
        cdcTemp.setLastUpdateTime(lastUpdateTime);
    }else{
        log.info(LogConstants.LOG_TAG+"同步状态异常:"+ status.toString());
    }
    //设置同步状态
    cdcTemp.setStatus(status.name());
    cdcTempRepository.updateById(cdcTemp);
}

4.3.2 Изменить дао исходных данных

В классе дао исходных данныхOriginUserRepositoryдобавить функциюgetOriginIncreUser, эта функция соответствуетuser.mdсерединаsqlутверждение.

4.3.3 Изменить целевой дао данных

В целевом классе дао данныхTargetUserRepositoryдобавить функцию вselectMaxUpdateTime, который используется для запроса последнего времени данных после синхронизации. Поскольку sql этого метода прост, его можно использовать напрямую@SqlАннотация, как показано ниже:

@Sql(value="select max(sys_update_time) from test_user")
Date selectMaxUpdateTime();

4.4 Модификацияuser.mdсерединаsqlутверждение.

4.4.1 Добавление инкрементных данных чтения sql

существуетuser.mdДобавьте оператор sql для постепенного чтения данных следующим образом:

getOriginIncreUser
===
* 查询user数据

select * from test_user
WHERE 1=1
@if(!isEmpty(lastUpdateTime)){
AND (sys_create_time >= #lastUpdateTime# OR sys_update_time >= #lastUpdateTime#)
@}

проиллюстрировать:

  • @начинается сbeetlСинтаксис переменной можно прочитать и логически осмыслить.Здесь это означает, что если переменнаяlastUpdateTimeЕсли он не пустой, читаем согласно этому условию.
  • lastUpdateTimeПеременная передается вызовом (Map)
  • специфическийbeetlИспользуйте синтаксис, см.официальная документация

4.4.2 Запись инструкции инкрементной вставки sql

дляMysqlбаза данных, сinsert into ... on duplicate key update ...Использование , то есть по уникальному ключу (первичному ключу или уникальному индексу), если данные уже существуют, они будут обновлены, а если их нет, они будут вставлены. существуетuser.mdфайл, добавьте следующее утверждение:

insertIncreUser
===
* 插入数据

insert into test_user(id,name,phone,title,email,gender,date_of_birth,sys_create_time,sys_create_user,sys_update_time,sys_update_user)
values (#id#,#name#,#phone#,#title#,#email#,#gender#,#dateOfBirth#
    ,#sysCreateTime#,#sysCreateUser#,#sysUpdateTime#,#sysUpdateUser#)
ON DUPLICATE KEY UPDATE 
id = VALUES(id),
name = VALUES(name),
phone = VALUES(phone),
title = VALUES(title),
email = VALUES(email),
gender = VALUES(gender),
date_of_birth = VALUES(date_of_birth),
sys_create_time = VALUES(sys_create_time),
sys_create_user = VALUES(sys_create_user),
sys_update_time = VALUES(sys_update_time),
sys_update_user = VALUES(sys_update_user)

4.5 Написание компонентов пружинной партии

Spring BatchСтруктура файла следующая:

文件结构

4.5.1 ItemReader

Это то же самое, что и раньше, только нужно поставитьgetOriginUserфункция изменена наgetOriginIncreUserВот и все.

4.5.2 ItemWriter

Это то же самое, что и предыдущее, только нужно изменить идентификатор sql сuser.insertUserизменить наuser.insertIncreUserВот и все.

4.5.3 ДобавитьIncrementJobEndListener

После синхронизации данных последним шагом является обновление последних временных данных временной таблицы. следующим образом:

@Slf4j
public class IncrementJobEndListener extends JobExecutionListenerSupport {

    @Autowired
    private CdcTempService cdcTempService;

    @Autowired
    private TargetUserRepository targetUserRepository;

    @Override
    public void afterJob(JobExecution jobExecution) {
        BatchStatus status = jobExecution.getStatus();
        Date latestDate  = targetUserRepository.selectMaxUpdateTime();
        cdcTempService.updateCdcTempAfterJob(SyncConstants.CDC_TEMP_ID_USER,status,latestDate);
    }
}

проиллюстрировать:

  • Сначала запросите последнее время данных в текущей базе данных (selectMaxUpdateTime)
  • Обновить данные промежуточной таблицыcdc_tempсерединаlast_update_time

4.5.4 Добавить инициализацию параметра при запуске задачи

На первом шаге синхронизации данных необходимо инициализировать время последнего обновления данных во временной таблице, поэтому перед запуском задачи необходимо задать параметры задачи, чтобы можно было передать параметры времени к задаче и используется при выполнении задачи. следующим образом:

public JobParameters initJobParam(){
    CdcTemp currentCdcTemp = cdcTempService.getCurrentCdcTemp(getCdcTempId());
    //若未初始化,则先查询数据库中对应的最后时间
    if(SyncConstants.STR_STATUS_INIT.equals(currentCdcTemp.getStatus())
            || SyncConstants.STR_STATUS_FAILED.equals(currentCdcTemp.getStatus())){
        Date maxUpdateTime = selectMaxUpdateTime();
        //若没有数据,则按初始时间处理
        if(Objects.nonNull(maxUpdateTime)){
            currentCdcTemp.setLastUpdateTime(maxUpdateTime);
        }
    }
    return JobUtil.makeJobParameters(currentCdcTemp);
}

4.5.5 Сборка полной задачи

Наконец, вам нуженIncrementBatchConfigКонфигурация собирает чтение, обработку, запись и мониторинг.Стоит упомянуть, что при настройке компонента чтения, поскольку необходимо использовать динамические параметры, его необходимо добавить сюда.@StepScopeАннотация, также используемая в параметрахspELПолучите содержимое параметра следующим образом:

@Bean
@StepScope
public ItemReader incrementItemReader(@Value("#{jobParameters['lastUpdateTime']}") String lastUpdateTime) {
    IncrementUserItemReader userItemReader = new IncrementUserItemReader();
    //设置参数,当前示例可不设置参数
    Map<String,Object> params = CollUtil.newHashMap();
    params.put(SyncConstants.STR_LAST_UPDATE_TIME,lastUpdateTime);
    userItemReader.setParams(params);

    return userItemReader;
}

4.5.6 Тестирование

Обратитесь к предыдущей статьеBeetlsqlJobTest,записыватьIncrementJobTestтестовый файл. Из-за необходимости протестировать добавочную синхронизацию процесс тестирования выглядит следующим образом:

  • Инкрементальное добавление данных перед тестированием Перед тестом в таблице исходных данных и таблице целевых данных уже есть данные, в таблице исходных данных выполните код вsql/user-data-new.sqlДобавьте новых пользователей. Обратите внимание, что из-заsys_create_timeа такжеsys_update_timeОпределяется следующим образом:
`sys_create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
`sys_update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,

Таким образом, время автоматически генерируется при вставке данных, а также автоматически обновляется при изменении данных.

  • запустить тест запускать как модульные тестыincrementJob.

  • Посмотреть Результаты После запуска результат такой:

    输出

После инкрементальной синхронизации данные выглядят следующим образом:

结果

5. Резюме

В этой статье сначала дается краткое введение в добавочную синхронизацию, перечислены наиболее часто используемые в настоящее время методы добавочной синхронизации, а затем используютсяSpring Batchа такжеBeetlSqlИнкрементальная синхронизация реализована с использованием методов на основе временных меток Этот пример имеет определенную практическую ценность, и я надеюсь, что он будет полезен разработчикам, которые занимаются синхронизацией данных или связанной с ней пакетной обработкой.