Эта статья расскажет вам о простом использовании инструмента ETL — datax.

сбор данных

在这里插入图片描述

Что такое ЭТЛ?

ETL отвечает за извлечение данных из распределенных и разнородных источников данных, таких как реляционные данные, плоские файлы данных и т. д., на временный промежуточный уровень, очистку, преобразование, интеграцию и, наконец, загрузку их в хранилище данных или витрину данных, чтобы они стали онлайн аналитическая обработка, Основы интеллектуального анализа данных.

ETL — очень важная часть хранилища данных. Это необходимый шаг, чтобы связать прошлое и будущее. По сравнению с реляционными базами данных технология хранилищ данных не имеет строгой математической теоретической основы и больше ориентирована на практические инженерные приложения. Поэтому, с точки зрения инженерного приложения, загрузите данные в соответствии с требованиями физической модели данных и выполните некоторую серию обработки данных.Процесс обработки напрямую связан с опытом.В то же время эта часть работы напрямую связано с качеством данных в хранилище данных, что влияет на качество результатов онлайновой аналитической обработки и интеллектуального анализа данных.

Что такое ДАТАКС?

DataX — это широко используемый автономный инструмент/платформа синхронизации данных в Alibaba Group.Эффективная синхронизация данных между источниками данных.

адрес, связанный с datax:GitHub.com/Alibaba/DAT…

DataX — это инструмент автономной синхронизации для разнородных источников данных, предназначенный для реализации стабильной и эффективной синхронизации данных между различными разнородными источниками данных, включая реляционные базы данных (MySQL, Oracle и т. д.), HDFS, Hive, ODPS, HBase, FTP и т. д.

在这里插入图片描述
Принцип построения фреймворка Datax:
在这里插入图片描述
Как инфраструктура автономной синхронизации данных, сама DataX построена с архитектурой Framework + plugin. Чтение и запись источника данных абстрагируются в подключаемые модули Reader/Writer и включаются во всю структуру синхронизации.

  • Читатель: Читатель — это модуль сбора данных, который отвечает за сбор данных из источников данных и отправку данных в платформу.
  • Writer: Writer — это модуль записи данных, отвечающий за непрерывную выборку данных из Framework и запись данных в место назначения.
  • Платформа: Платформа используется для соединения устройств чтения и записи в качестве канала передачи данных для обоих, а также для решения основных технических проблем, таких как буферизация, управление потоком, параллелизм и преобразование данных.

Введение основного модуля:

在这里插入图片描述
DataX завершает одно задание синхронизации данных, которое мы называем заданием.После того, как DataX получит задание, он запустит процесс для завершения всего процесса синхронизации задания. Модуль DataX Job является центральным узлом управления одним заданием и выполняет такие функции, как очистка данных, сегментация подзадач (преобразование одного вычисления задания в несколько подзадач) и управление группами задач.

После запуска DataXJob он разделит задание на несколько небольших задач (подзадач) в соответствии с различными стратегиями сегментации исходной и конечной частей для облегчения одновременного выполнения. Задача — это наименьшая единица задания DataX, и каждая задача отвечает за синхронизацию части данных.

После разделения нескольких задач задание DataX вызовет модуль планировщика, чтобы повторно объединить разделенные задачи и собрать их в TaskGroup (группу задач) в соответствии с настроенным объемом одновременных данных. Каждая группа задач отвечает за выполнение всех назначенных задач с определенным параллелизмом.Количество одновременных задач по умолчанию для одной группы задач равно 5.

Каждая задача запускается TaskGroup.После запуска задачи она запускает поток Reader—>Channel—>Writer для завершения синхронизации задачи. После запуска задания DataX задание отслеживает и ожидает завершения нескольких задач модуля TaskGroup, а задание успешно завершается после завершения всех задач TaskGroup. В противном случае он завершается аварийно, и значение выхода процесса не равно 0.

Интерпретация работы:

{
  "job": {
    "content": [
      {
        "reader": {        //读入库配置,比如说是sql server
          "name": "",      //数据源名称,别瞎写
          "parameter": {}  //数据库配置信息
        },
        "writer": {        //写入库配置,比如说是mysql
          "name": "",      //数据源名称,别瞎写
          "parameter": {}  //数据库配置信息
        }
      }
    ],
    "setting": {            //基本设置
      "speed": {            //流量控制
        "channel": 1,       //同步时候的并发数
        "byte": 104857600   //同步时候的字节大小,影响速度
      },
      "errorLimit": {       //脏数据控制
        "record": 10,       //脏数据最大记录数阈值
        "percentage": 0.05  //脏数据占比阈值
      }
    }
  }

Простой пример:

{
    "job": {
        "content": [
            {
                  "reader": {
                    "name": "mysqlreader",
                    "parameter": {
						"password": "gswzdp!1234",
                        "username": "gswzdp",
                        "connection": [{
							"querySql": [ 
								"select SUBSTR(m.MATNR,10) MATNR,k.MAKTX,m.WERKS,m.LGORT STORAGE_LOCATION,
									t.LGOBE STORAGE_LOCATION_DESC,m.charg BATCH,m.clabs MENGE,a.MEINS UNIT,
									m.ERSDA WAREHOUS_TIME
								from mchb m
								join mara a on a.MATNR = m.matnr
								join makt k on k.MATNR = m.matnr
								join t001l t on t.LGORT = m.lgort
								where m.lgort REGEXP '97'  and m.clabs > 0 limit 1"
							],
                             "jdbcUrl": ["jdbc:mysql://10.213.111.102:23306/gsepclastmile?useUnicode=true&characterEncoding=utf8&yearIsDateType=false&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false&rewriteBatchedStatements=true"]
                            }
                        ]                       
                    }
                },
                "writer": {
                    "name": "mysqlwriter",
                    "parameter": {
						"writeMode": "insert",
                        "column": ["MATNR","MAKTX","WERKS","STORAGE_LOCATION","STORAGE_LOCATION_DESC","BATCH",
							"SUPPLIER","SUPPLIER_DESC","MENGE","UNIT","WAREHOUS_TIME"],
						"preSql": [
                            "TRUNCATE TABLE DWD_MAT_MARA"
                        ],
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:mysql://10.212.31.83:13306/imap?useUnicode=true&characterEncoding=utf8",
                                "table": ["DWD_MAT_MARA"]
                            }
                        ],
                        "password": "Wzzhygl@1234",
                        "username": "root",
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": 1
            }
        }
    }
}

Создание задач на время:

Создайте файл crontab с именем testdatax, войдите в соответствующий каталог vi testdatax из командной строки, создайте и откройте файл и введите следующее: 0 3 * * * python /usr/datax/datax3/bin/datax.py /home/admin/oracle2oracle.json >>/home/hanfa.shf/log.date +\%Y\%m\%d\%H\%M\%S

0 3 * * * означает выполнять эту задачу в 3 часа каждый день. python /usr/datax/datax3/bin/datax.py представляет абсолютный путь к каталогу, в котором находится установленный DataX datax.py, обычно в каталоге datax/bin/. /usr/datax/mysqk.json.json представляет собой абсолютный путь к файлу конфигурации задания. /usr/test.shf/журнал.date +\%Y\%m\%d\%H\%M\%SУказывает выходной путь журнала, созданного при выполнении задачи, и названного в честь log.current time, который следует заменить реальным абсолютным путем. После завершения редактирования файла нажмите esc, затем shift+;, а затем введите wq для сохранения и выхода из режима редактирования файла.

Команда запуска:

Описание команды:

  • запустить службу /sbin/service cron start

  • закрыть службу /sbin/service crond stop

  • перезапустить службу /sbin/service crond перезапустить

  • перезагрузить конфигурацию /sbin/service перезагрузить crond

END

Наконец, чтобы подвести итог, на самом деле, прочитав введение и использование выше, вы можете обнаружить, что идея реализации datax представляет собой временную задачу, которая регулярно считывает данные из библиотеки A и синхронизирует их с библиотекой B, что как раз то же самое, что и наше обычное разделение чтения и записи: выполните временную задачу A, затем настройте два источника данных и регулярно считывайте процесс из источника данных A в источник данных B. Заинтересованные партнеры могут самостоятельно создать простой файл данных. Конечно, ELT инструмент который вы привели часто используется и не только datax, но и Kettle можно использовать для синхронизации данных.

在这里插入图片描述