Недавно я выполнял некоторую работу, связанную с миграцией данных, и, наконец, принял DataX.Арендодатель также в общих чертах ознакомился с исходным кодом Datax, чтобы узнать правду и узнать причину.
загрузка кода
Я считаю, что эта часть нас как программистов, мы все знаем, что github - это большая семья, поэтому арендодатель может просто пойти на github и забрать его.Проект построен на основе maven, и вы можете напрямую использовать IDE для импорта проект.
git clone git@github.com:alibaba/DataX.git
быстрый старт
Эта часть арендодателя мало что говорит, потому что арендодатель ясно дал понять в предыдущей статье, заинтересованные друзья, пожалуйста, переместитеАли автономный инструмент синхронизации данных DataX вытаптывает запись
Обзор DataX (выдержка с официального сайта)
-
концепт дизайна
Чтобы решить проблему синхронизации разнородных источников данных, DataX превращает сложный канал синхронизации сетки в звездный канал передачи данных, а DataX отвечает за соединение различных источников данных в качестве промежуточной несущей передачи. Когда вам нужно получить доступ к новому источнику данных, вам нужно только подключить этот источник данных к DataX, чтобы добиться бесшовной синхронизации данных с существующим источником данных. -
Текущий статус использования
DataX широко используется в Alibaba Group, занимается автономной синхронизацией больших данных и стабильно работает в течение 6 лет. В настоящее время многозадачность 8 Вт синхронизируется каждый день, а ежедневный объем передачи данных превышает 300 ТБ.
Анализ исходного кода
Как показано на рисунке выше, большая часть исходного кода DataX используется для адаптации к средствам чтения и записи различных источников данных. Базовая структура преобразования является базовой моделью. Когда мы открываем эту модель, мы можем непосредственно увидеть модель с именем Engine. , документ
Согласно нашим привычкам кода, это должен быть код входа DataX, как показано ниже, код — это код для удаления обработки исключений, он очень простой, всего три строки, давайте посмотрим на метод входа
public static void main(String[] args) throws Exception {
int exitCode = 0;
Engine.entry(args);
System.exit(exitCode);
}
Из кода мы знаем, что этот метод предназначен для инициализации некоторых параметров запуска и вызова метода запуска.
public static void entry(final String[] args) throws Throwable {
Options options = new Options();
options.addOption("job", true, "Job config.");
options.addOption("jobid", true, "Job unique id.");
options.addOption("mode", true, "Job runtime mode.");
BasicParser parser = new BasicParser();
CommandLine cl = parser.parse(options, args);
String jobPath = cl.getOptionValue("job");
// 如果用户没有明确指定jobid, 则 datax.py 会指定 jobid 默认值为-1
String jobIdString = cl.getOptionValue("jobid");
RUNTIME_MODE = cl.getOptionValue("mode");
Configuration configuration = ConfigParser.parse(jobPath);
long jobId;
if (!"-1".equalsIgnoreCase(jobIdString)) {
jobId = Long.parseLong(jobIdString);
} else {
// only for dsc & ds & datax 3 update
String dscJobUrlPatternString = "/instance/(\\d{1,})/config.xml";
String dsJobUrlPatternString = "/inner/job/(\\d{1,})/config";
String dsTaskGroupUrlPatternString = "/inner/job/(\\d{1,})/taskGroup/";
List<String> patternStringList = Arrays.asList(dscJobUrlPatternString,
dsJobUrlPatternString, dsTaskGroupUrlPatternString);
jobId = parseJobIdFromUrl(patternStringList, jobPath);
}
boolean isStandAloneMode = "standalone".equalsIgnoreCase(RUNTIME_MODE);
if (!isStandAloneMode && jobId == -1) {
// 如果不是 standalone 模式,那么 jobId 一定不能为-1
throw DataXException.asDataXException(FrameworkErrorCode.CONFIG_ERROR, "非 standalone 模式必须在 URL 中提供有效的 jobId.");
}
configuration.set(CoreConstant.DATAX_CORE_CONTAINER_JOB_ID, jobId);
//打印vmInfo
VMInfo vmInfo = VMInfo.getVmInfo();
if (vmInfo != null) {
LOG.info(vmInfo.toString());
}
LOG.info("\n" + Engine.filterJobConfiguration(configuration) + "\n");
LOG.debug(configuration.toJSON());
ConfigurationValidate.doValidate(configuration);
Engine engine = new Engine();
engine.start(configuration);
}
Следующий код предназначен для получения конфигурации в файле конфигурации, инициализации определенного типа контейнера, инициализации работающего контейнера задания или задачи и запуска логики задания или задачи плагина.
/* check job model (job/task) first */
public void start(Configuration allConf) {
// 绑定column转换信息
ColumnCast.bind(allConf);
/**
* 初始化PluginLoader,可以获取各种插件配置
*/
LoadUtil.bind(allConf);
boolean isJob = !("taskGroup".equalsIgnoreCase(allConf
.getString(CoreConstant.DATAX_CORE_CONTAINER_MODEL)));
//JobContainer会在schedule后再行进行设置和调整值
int channelNumber =0;
AbstractContainer container;
long instanceId;
int taskGroupId = -1;
if (isJob) {
allConf.set(CoreConstant.DATAX_CORE_CONTAINER_JOB_MODE, RUNTIME_MODE);
container = new JobContainer(allConf);
instanceId = allConf.getLong(
CoreConstant.DATAX_CORE_CONTAINER_JOB_ID, 0);
} else {
container = new TaskGroupContainer(allConf);
instanceId = allConf.getLong(
CoreConstant.DATAX_CORE_CONTAINER_JOB_ID);
taskGroupId = allConf.getInt(
CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_ID);
channelNumber = allConf.getInt(
CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_CHANNEL);
}
//缺省打开perfTrace
boolean traceEnable = allConf.getBool(CoreConstant.DATAX_CORE_CONTAINER_TRACE_ENABLE, true);
boolean perfReportEnable = allConf.getBool(CoreConstant.DATAX_CORE_REPORT_DATAX_PERFLOG, true);
//standlone模式的datax shell任务不进行汇报
if(instanceId == -1){
perfReportEnable = false;
}
int priority = 0;
try {
priority = Integer.parseInt(System.getenv("SKYNET_PRIORITY"));
}catch (NumberFormatException e){
LOG.warn("prioriy set to 0, because NumberFormatException, the value is: "+System.getProperty("PROIORY"));
}
Configuration jobInfoConfig = allConf.getConfiguration(CoreConstant.DATAX_JOB_JOBINFO);
//初始化PerfTrace
PerfTrace perfTrace = PerfTrace.getInstance(isJob, instanceId, taskGroupId, priority, traceEnable);
perfTrace.setJobInfo(jobInfoConfig,perfReportEnable,channelNumber);
container.start();
}
Файл конфигурации основного модуля DataX
Из файла конфигурации видно, что минимальная куча памяти JVM, необходимая для DataX, составляет 1G, что представляет собой небольшую проблему, с которой арендодатель сталкивался ранее, используя DataX, а другие конфигурации являются некоторыми общими конфигурациями.
{
"entry": {
"jvm": "-Xms1G -Xmx1G",
"environment": {}
},
"common": {
"column": {
"datetimeFormat": "yyyy-MM-dd HH:mm:ss",
"timeFormat": "HH:mm:ss",
"dateFormat": "yyyy-MM-dd",
"extraFormats":["yyyyMMdd"],
"timeZone": "GMT+8",
"encoding": "utf-8"
}
},
"core": {
"dataXServer": {
"address": "http://localhost:7001/api",
"timeout": 10000,
"reportDataxLog": false,
"reportPerfLog": false
},
"transport": {
"channel": {
"class": "com.alibaba.datax.core.transport.channel.memory.MemoryChannel",
"speed": {
"byte": -1,
"record": -1
},
"flowControlInterval": 20,
"capacity": 512,
"byteCapacity": 67108864
},
"exchanger": {
"class": "com.alibaba.datax.core.plugin.BufferedRecordExchanger",
"bufferSize": 32
}
},
"container": {
"job": {
"reportInterval": 10000
},
"taskGroup": {
"channel": 5
},
"trace": {
"enable": "false"
}
},
"statistics": {
"collector": {
"plugin": {
"taskClass": "com.alibaba.datax.core.statistics.plugin.task.StdoutPluginCollector",
"maxDirtyNumber": 10
}
}
}
}
}
резюме
На этот раз арендодатель только грубо разобрался с кодом DataX, и уровень немного водянистый Я считаю, что арендодатель, который любит учиться, может день.
Автор: haifeiWu Исходная ссылка:Уууу. Пересаживайтесь на студию. Можете/статья/201…Заявление об авторских правах: Неспециальное заявление является оригинальной работой данного сайта.При перепечатке указывайте автора и оригинальную ссылку.