Инструмент автономной синхронизации Alibaba Cloud Скимминг исходного кода DataX

Java база данных MySQL исходный код Алибаба

Недавно я выполнял некоторую работу, связанную с миграцией данных, и, наконец, принял DataX.Арендодатель также в общих чертах ознакомился с исходным кодом Datax, чтобы узнать правду и узнать причину.

загрузка кода

Я считаю, что эта часть нас как программистов, мы все знаем, что github - это большая семья, поэтому арендодатель может просто пойти на github и забрать его.Проект построен на основе maven, и вы можете напрямую использовать IDE для импорта проект.

git clone git@github.com:alibaba/DataX.git

быстрый старт

Эта часть арендодателя мало что говорит, потому что арендодатель ясно дал понять в предыдущей статье, заинтересованные друзья, пожалуйста, переместитеАли автономный инструмент синхронизации данных DataX вытаптывает запись

Обзор DataX (выдержка с официального сайта)

DataX结构图Структурная схема DataX

  • концепт дизайна
    Чтобы решить проблему синхронизации разнородных источников данных, DataX превращает сложный канал синхронизации сетки в звездный канал передачи данных, а DataX отвечает за соединение различных источников данных в качестве промежуточной несущей передачи. Когда вам нужно получить доступ к новому источнику данных, вам нужно только подключить этот источник данных к DataX, чтобы добиться бесшовной синхронизации данных с существующим источником данных.

  • Текущий статус использования
    DataX широко используется в Alibaba Group, занимается автономной синхронизацией больших данных и стабильно работает в течение 6 лет. В настоящее время многозадачность 8 Вт синхронизируется каждый день, а ежедневный объем передачи данных превышает 300 ТБ.

Анализ исходного кода

Как показано на рисунке выше, большая часть исходного кода DataX используется для адаптации к средствам чтения и записи различных источников данных. Базовая структура преобразования является базовой моделью. Когда мы открываем эту модель, мы можем непосредственно увидеть модель с именем Engine. , документ

DataX结构图Структурная схема DataX

Согласно нашим привычкам кода, это должен быть код входа DataX, как показано ниже, код — это код для удаления обработки исключений, он очень простой, всего три строки, давайте посмотрим на метод входа


public static void main(String[] args) throws Exception {
      int exitCode = 0;
      Engine.entry(args);
      System.exit(exitCode);
  }

Из кода мы знаем, что этот метод предназначен для инициализации некоторых параметров запуска и вызова метода запуска.


public static void entry(final String[] args) throws Throwable {
        Options options = new Options();
        options.addOption("job", true, "Job config.");
        options.addOption("jobid", true, "Job unique id.");
        options.addOption("mode", true, "Job runtime mode.");

        BasicParser parser = new BasicParser();
        CommandLine cl = parser.parse(options, args);

        String jobPath = cl.getOptionValue("job");

        // 如果用户没有明确指定jobid, 则 datax.py 会指定 jobid 默认值为-1
        String jobIdString = cl.getOptionValue("jobid");
        RUNTIME_MODE = cl.getOptionValue("mode");

        Configuration configuration = ConfigParser.parse(jobPath);

        long jobId;
        if (!"-1".equalsIgnoreCase(jobIdString)) {
            jobId = Long.parseLong(jobIdString);
        } else {
            // only for dsc & ds & datax 3 update
            String dscJobUrlPatternString = "/instance/(\\d{1,})/config.xml";
            String dsJobUrlPatternString = "/inner/job/(\\d{1,})/config";
            String dsTaskGroupUrlPatternString = "/inner/job/(\\d{1,})/taskGroup/";
            List<String> patternStringList = Arrays.asList(dscJobUrlPatternString,
                    dsJobUrlPatternString, dsTaskGroupUrlPatternString);
            jobId = parseJobIdFromUrl(patternStringList, jobPath);
        }

        boolean isStandAloneMode = "standalone".equalsIgnoreCase(RUNTIME_MODE);
        if (!isStandAloneMode && jobId == -1) {
            // 如果不是 standalone 模式,那么 jobId 一定不能为-1
            throw DataXException.asDataXException(FrameworkErrorCode.CONFIG_ERROR, "非 standalone 模式必须在 URL 中提供有效的 jobId.");
        }
        configuration.set(CoreConstant.DATAX_CORE_CONTAINER_JOB_ID, jobId);

        //打印vmInfo
        VMInfo vmInfo = VMInfo.getVmInfo();
        if (vmInfo != null) {
            LOG.info(vmInfo.toString());
        }

        LOG.info("\n" + Engine.filterJobConfiguration(configuration) + "\n");

        LOG.debug(configuration.toJSON());

        ConfigurationValidate.doValidate(configuration);
        Engine engine = new Engine();
        engine.start(configuration);
    }

Следующий код предназначен для получения конфигурации в файле конфигурации, инициализации определенного типа контейнера, инициализации работающего контейнера задания или задачи и запуска логики задания или задачи плагина.


/* check job model (job/task) first */
   public void start(Configuration allConf) {

       // 绑定column转换信息
       ColumnCast.bind(allConf);

       /**
        * 初始化PluginLoader,可以获取各种插件配置
        */
       LoadUtil.bind(allConf);

       boolean isJob = !("taskGroup".equalsIgnoreCase(allConf
               .getString(CoreConstant.DATAX_CORE_CONTAINER_MODEL)));
       //JobContainer会在schedule后再行进行设置和调整值
       int channelNumber =0;
       AbstractContainer container;
       long instanceId;
       int taskGroupId = -1;
       if (isJob) {
           allConf.set(CoreConstant.DATAX_CORE_CONTAINER_JOB_MODE, RUNTIME_MODE);
           container = new JobContainer(allConf);
           instanceId = allConf.getLong(
                   CoreConstant.DATAX_CORE_CONTAINER_JOB_ID, 0);

       } else {
           container = new TaskGroupContainer(allConf);
           instanceId = allConf.getLong(
                   CoreConstant.DATAX_CORE_CONTAINER_JOB_ID);
           taskGroupId = allConf.getInt(
                   CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_ID);
           channelNumber = allConf.getInt(
                   CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_CHANNEL);
       }

       //缺省打开perfTrace
       boolean traceEnable = allConf.getBool(CoreConstant.DATAX_CORE_CONTAINER_TRACE_ENABLE, true);
       boolean perfReportEnable = allConf.getBool(CoreConstant.DATAX_CORE_REPORT_DATAX_PERFLOG, true);

       //standlone模式的datax shell任务不进行汇报
       if(instanceId == -1){
           perfReportEnable = false;
       }

       int priority = 0;
       try {
           priority = Integer.parseInt(System.getenv("SKYNET_PRIORITY"));
       }catch (NumberFormatException e){
           LOG.warn("prioriy set to 0, because NumberFormatException, the value is: "+System.getProperty("PROIORY"));
       }

       Configuration jobInfoConfig = allConf.getConfiguration(CoreConstant.DATAX_JOB_JOBINFO);
       //初始化PerfTrace
       PerfTrace perfTrace = PerfTrace.getInstance(isJob, instanceId, taskGroupId, priority, traceEnable);
       perfTrace.setJobInfo(jobInfoConfig,perfReportEnable,channelNumber);
       container.start();
   }

Файл конфигурации основного модуля DataX

Из файла конфигурации видно, что минимальная куча памяти JVM, необходимая для DataX, составляет 1G, что представляет собой небольшую проблему, с которой арендодатель сталкивался ранее, используя DataX, а другие конфигурации являются некоторыми общими конфигурациями.


{
    "entry": {
        "jvm": "-Xms1G -Xmx1G",
        "environment": {}
    },
    "common": {
        "column": {
            "datetimeFormat": "yyyy-MM-dd HH:mm:ss",
            "timeFormat": "HH:mm:ss",
            "dateFormat": "yyyy-MM-dd",
            "extraFormats":["yyyyMMdd"],
            "timeZone": "GMT+8",
            "encoding": "utf-8"
        }
    },
    "core": {
        "dataXServer": {
            "address": "http://localhost:7001/api",
            "timeout": 10000,
            "reportDataxLog": false,
            "reportPerfLog": false
        },
        "transport": {
            "channel": {
                "class": "com.alibaba.datax.core.transport.channel.memory.MemoryChannel",
                "speed": {
                    "byte": -1,
                    "record": -1
                },
                "flowControlInterval": 20,
                "capacity": 512,
                "byteCapacity": 67108864
            },
            "exchanger": {
                "class": "com.alibaba.datax.core.plugin.BufferedRecordExchanger",
                "bufferSize": 32
            }
        },
        "container": {
            "job": {
                "reportInterval": 10000
            },
            "taskGroup": {
                "channel": 5
            },
            "trace": {
                "enable": "false"
            }

        },
        "statistics": {
            "collector": {
                "plugin": {
                    "taskClass": "com.alibaba.datax.core.statistics.plugin.task.StdoutPluginCollector",
                    "maxDirtyNumber": 10
                }
            }
        }
    }
}

резюме

На этот раз арендодатель только грубо разобрался с кодом DataX, и уровень немного водянистый Я считаю, что арендодатель, который любит учиться, может день.

Автор: haifeiWu Исходная ссылка:Уууу. Пересаживайтесь на студию. Можете/статья/201…Заявление об авторских правах: Неспециальное заявление является оригинальной работой данного сайта.При перепечатке указывайте автора и оригинальную ссылку.