datax 学习记录
创始人
2025-06-01 15:28:34
0
  • 初识

  DataX是由阿里巴巴研发并开源的一个异构数据源离线同步工具,DataX实现了包括MySQL、Oracle、OceanBase、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、Hologres、DRDS等各种异构数据源之间高效的数据同步功能。

  • 了解datax基本概念和下载方式

   参考:https://zhuanlan.zhihu.com/p/564944396 

  • datax源码下载和编译 (编译后的文件内容可以与第一点同步看)

   参考:https://blog.csdn.net/slxz001/article/details/122984162?utm_medium=distribute.pc_relevant.none-task-blog-2~default~baidujs_baidulandingword~default-0-122984162-blog-124238427.pc_relevant_multi_platform_whitelistv3&spm=1001.2101.3001.4242.1&utm_relevant_index=3

  • springboot集成datax (三个同步看)

   参考:https://blog.csdn.net/slxz001/article/details/122984162?utm_medium=distribute.pc_relevant.none-task-blog-2~default~baidujs_baidulandingword~default-0-122984162-blog-124238427.pc_relevant_multi_platform_whitelistv3&spm=1001.2101.3001.4242.1&utm_relevant_index=3

   参考:https://blog.csdn.net/l2931050/article/details/124238427

   参考:https://mp.weixin.qq.com/s/ZF8tjxGEQrPPUw1GxVVXOA

问题:

   没懂System.setProperty("datax.home", dataxHome); datax.home值怎么定义的(有博主说是datax下载后,本地的存储地址,但测试环境是怎么定义的?)-----经过测试,如果已经在本地项目中导入的datax相关jar 包,datax.home值也可以是本地项目datax文件的绝对路径,只找到了获取classes文件的绝对路径方法(没有找到获取src--main--java下datax目录的方法),所以把本地的datax文件放到resources目录下。也可以使用datax下载后,解压后本地的存储地址。两种方式都可以。

答疑解惑:Java集成datax https://blog.csdn.net/weixin_34390105/article/details/89627226

https://blog.csdn.net/dz77dz/article/details/106055693

https://blog.csdn.net/qq_20042935/article/details/122573669?utm_medium=distribute.pc_relevant.none-task-blog-2~default~baidujs_baidulandingword~default-0-122573669-blog-106055693.pc_relevant_recovery_v2&spm=1001.2101.3001.4242.1&utm_relevant_index=3

  • 从mysql到mysql数据读取到写入,同步表数据配置data.json具体值的定义及使用:

  https://www.cnblogs.com/EnzoDin/p/9979638.html

  • json文件中的WriteModel详解(insert\update区别)

  https://blog.csdn.net/shenshengsu1990/article/details/120163634

注:如果使用update,且同步到mysql库中,因为执行ON DUPLICATE KEY UPDATE语句时,mysql是通过主键或者唯一索引来判断两条数据是否重复,然后做更新/新增。

  • datax启动,同步操作遇到的问题及解决:

  https://developer.aliyun.com/article/780924

  • datax批量生成json同步文件(Java版本)

   利用类方式拼接:https://codeleading.com/article/81471734016/

   利用map拼接:https://blog.csdn.net/jsbylibo/article/details/97940824

  • 同步上百张表时,定义一个模版json,后续通过在数据库中配置同步表的信息,动态查询实现替换json文件中变量,来同步数据。

  • json 文件具体参数说明:

   参考:https://www.cnblogs.com/EnzoDin/p/9979638.html

    --column

   描述:所配置的表中需要同步的列名集合,使用JSON的数组描述字段信息。用户使用*代表默认使用所有列配置,例如['*']。

  支持列裁剪,即列可以挑选部分列进行导出。

  支持列换序,即列可以不按照表schema信息进行导出。

  支持常量配置,用户需要按照Mysql SQL语法格式: ["id", "`table`", "1", "'bazhen.csy'", "null",     "to_char(a + 1)", "2.3" , "true"] id为普通列名,`table`为包含保留字的列名,1为整形数字常 量,'bazhen.csy'为字符串常量,null为空指针,to_char(a + 1)为表达式,2.3为浮点数,true为布尔 值。

  splitPk字段作用, 具体怎么体现出来?

 --where

描述:筛选条件,MysqlReader根据指定的column、table、where条件拼接SQL,并根据这个SQL进行数据抽取。在实际业务场景中,往往会选择当天的数据进行同步,可以将where条件指定为gmt_create > $bizdate 。注意:不可以将where条件指定为limit 10,limit不是SQL的合法where子句。

where条件可以有效地进行业务增量同步。如果不填写where语句,包括不提供where的key或者value,DataX均视作同步全量数据。

梳理:

  1. 在本地安装datax,有两种方式,第一种:直接下载DataX工具包;第二种:下载DataX源码,自己编译(第二种没懂怎么打包)。

  下载地址:https://github.com/alibaba/DataX/blob/master/userGuid.md

  1. 将下载的datax.tar解压后,存储在本地。解压后的文件夹包含(bin、config、job、lib、plugin、script、tmp),在本地springboot项目中,将解压后datax文件夹拷贝到resources目录下,只留config、job、lib、plugin文件夹,plugin文件夹内有很多数据源,只留自己需要的读写文件夹即可。

说明:放到resource原因是,因为启动datax时,会赋值datax.home 属性,可以直接通过下面getCurrentClassPath()方法,获取到datax的class文件目录。有一种说法,datax.home 的属性值要定义为本地datax.tar解压后的目录。试了两种方法都可以,所以我目前datax.home属性使用的是项目下datax文件夹目录,方便以后上测试和生产,不用在服务器中存储一份。

public class TestMain {public static String getCurrentClasspath(){ClassLoader classLoader = Thread.currentThread().getContextClassLoader();String currentClasspath = classLoader.getResource("").getPath();// 当前操作系统String osName = System.getProperty("os.name");if (osName.startsWith("Win")) {// 删除path中最前面的/currentClasspath = currentClasspath.substring(1, currentClasspath.length()-1);}return currentClasspath;}public static void main(String[] args) {System.setProperty("datax.home","D:\\datax\\datax");String[] datxArgs2 = {"-job", getCurrentClasspath()+"/datax/test.json", "-mode", "standalone", "-jobid", "-1"};try {Engine.entry(datxArgs2);} catch (Throwable e) {e.printStackTrace();}}
}
  1. pom文件依赖datax-core、datax-common和需要的reader和writer。如果jar包不在私服霍maven仓库中,只在本地,可利用下面方式导入。

  1. job文件夹内,加入需要做同步数据使用的json文件,例如从mysql同步到mysql,oracle同步到mysql。

{"job": {"setting": {"speed": {"channel": 4}},"content": [{"reader": {"name": "mysqlreader","parameter": {"username": "root","password": "123456","connection": [{"jdbcUrl": ["jdbc:mysql://127.0.0.1:3306/test"],"querySql": ["select t.id,t.name,t.status from users t"]}]}},"writer": {"name": "mysqlwriter","parameter": {"username": "root","password": "123456","writeMode": "insert","column": ["id","name","status"],"connection": [{"table": ["temp_users"],"jdbcUrl": "jdbc:mysql://127.0.0.1:3306/test"}]}}}]}
}
  1. 编写main方法,测试启动,可使用上面2的代码块。

  • 环境变量设置datax.home(或者利用System#setProperty(String))和一些需要替换脚本中的变量:脚本中${}占位符的变量将被系统变量替换。

  • 构造参数数组:{"-job", "xxx.json", "-mode", "standalone", "-jobid", "-1"}

  • 调用Engin#main(String[])或者Engine#entry(String[])

备注:从读的数据源到写入的数据源,里面的column定义数量、顺序要一致,否则同步失败或数据错乱。如果读的表字段与写入表的字段名不一样,也没关系,只要数量和顺序一致就可以同步成功。

  • 将datax输入的日志信息,同步写入数据库中

 参考:https://blog.csdn.net/ffyangzhch/article/details/88784007?spm=1001.2101.3001.6650.9&utm_medium=distribute.pc_relevant.none-task-blog-2%7Edefault%7EBlogCommendFromBaidu%7ERate-9-88784007-blog-112097501.pc_relevant_3mothn_strategy_and_data_recovery&depth_1-utm_source=distribute.pc_relevant.none-task-blog-2%7Edefault%7EBlogCommendFromBaidu%7ERate-9-88784007-blog-112097501.pc_relevant_3mothn_strategy_and_data_recovery&utm_relevant_index=18

  根据上面博主的方法,JobContainer的**start()**方法重写后,父类里面是无返回值无参的,如果改成DataxResult类型,那么其它类继承这个AbstractContainer类,然后实现star()方法的地方都要加上DataxResult,改很多源代码比较麻烦。所以换了一种方法,但思路基本差不多。

 步骤如下:

 1. 在源码文件夹中新建一个实体类,字段可以写原控制输出的那些信息,注意写构造器(随便找个合适的文件夹位置就行);

 2. 在JobContainer类中,先定义新建的实体变量dataxResultBo(使用static修饰);

 3. 在JobContainer类中的logStatistic()方法中最后,将原输出的日志信息后面,为dataxResultBo实体类赋值(构造器赋值);

 4. 定义一个返回实体的方法,return dataxResultBo;

 5. Engin.entry() 执行完成后,通过JobContainer.返回实体的方法,即可获取到日志实体类dataxResultBo,然后获取具体属性,插入数据库。

 备注:新加的实体和方法,都是在源码中修改的,所以涉及到修改源码即打包。

          修改jar源码方式:https://blog.csdn.net/Dream_Weave/article/details/115445122

  • datax同步速率优化:

  1. 考虑过使用多线程同步数据,但使用多线程后(线程池),datax同步数据会出现数据错乱,即读A表,写入B表。

  1. 通过datax配置文件中 bytes、records、channel 方面优化

查看项目运行时jvm内存情况:https://www.cnblogs.com/curedfisher/p/12839485.html

datax 并发数channel参数理解及优化:https://www.sohu.com/a/313886409_612370

     https://segmentfault.com/a/1190000041036461

相关内容

热门资讯

监控摄像头接入GB28181平... 流程简介将监控摄像头的视频在网站和APP中直播,要解决的几个问题是:1&...
Windows10添加群晖磁盘... 在使用群晖NAS时,我们需要通过本地映射的方式把NAS映射成本地的一块磁盘使用。 通过...
protocol buffer... 目录 目录 什么是protocol buffer 1.protobuf 1.1安装  1.2使用...
educoder数据结构与算法...                                                   ...
MySQL下载和安装(Wind... 前言:刚换了一台电脑,里面所有东西都需要重新配置,习惯了所...
MFC文件操作  MFC提供了一个文件操作的基类CFile,这个类提供了一个没有缓存的二进制格式的磁盘...
在Word、WPS中插入AxM... 引言 我最近需要写一些文章,在排版时发现AxMath插入的公式竟然会导致行间距异常&#...
有效的括号 一、题目 给定一个只包括 '(',')','{','}'...
Fluent中创建监测点 1 概述某些仿真问题,需要创建监测点,用于获取空间定点的数据࿰...
【Ctfer训练计划】——(三... 作者名:Demo不是emo  主页面链接:主页传送门 创作初心ÿ...