本帖最后由 Mirinda 于 2022-2-24 10:50 编辑
问题导读:
1.对Addax熟悉吗?
2.Addax框架设计是什么?
3.Addax核心框架是什么?
概览
Addax 是一个异构数据源离线同步工具,最初来源于阿里的 DataX ,致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。
为了解决异构数据源同步问题,Addax将复杂的网状的同步链路变成了星型数据链路,Addax作为中间传输载体负责连接各种数据源。当需要接入一个新的数据源的时候,只需要将此数据源对接到Addax,便能跟已有的数据源做到无缝数据同步。
官网地址 https://wgzhao.github.io/Addax/4.0.7/quickstart/
框架设计
Addax本身作为离线数据同步框架,采用 Framework + plugin 架构构建。将数据源读取和写入抽象成为 Reader/Writer 插件,纳入到整个同步框架中。
● Reader:Reader 为数据采集模块,负责采集数据源的数据,将数据发送给Framework。
● Writer: Writer 为数据写入模块,负责不断向Framework取数据,并将数据写入到目的端。
● Framework:Framework用于连接reader和writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。
Addax Framework提供了简单的接口与插件交互,提供简单的插件接入机制,只需要任意加上一种插件,就能无缝对接其他数据源。
核心架构
本小节按一个Addax作业生命周期的时序图,从整体架构设计非常简要说明各个模块相互关系。
核心模块介绍
Addax 完成单个数据同步的作业,我们称之为Job,Adda x接受到一个 Job 之后,将启动一个进程来完成整个作业同步过程。Addax Job模块是单个作业的中枢管理节点,承担了数据清理、子任务切分(将单一作业计算转化为多个子 Task)、TaskGroup 管理等功能。
Addax Job 启动后,会根据不同的源端切分策略,将 Job 切分成多个小的 Task (子任务),以便于并发执行。Task 便是 Addax 作业的最小单元,每一个 Task 都会负责一部分数据的同步工作。
切分多个 Task 之后,Addax Job 会调用 Scheduler 模块,根据配置的并发数据量,将拆分成的 Task 重新组合,组装成 TaskGroup(任务组)。每一个 TaskGroup 负责以一定的并发运行完毕分配好的所有 Task,默认单个任务组的并发数量为5。
每一个 Task 都由 TaskGroup 负责启动,Task 启动后,会固定启动Reader—>Channel—>Writer的线程来完成任务同步工作。
Addax 作业运行起来之后, Job 监控并等待多个 TaskGroup 模块任务完成,等待所有 TaskGroup 任务完成后 Job 成功退出。否则,异常退出,进程退出值非0
调度流程
举例来说,用户提交了一个作业,并且配置了 20 个并发,目的是将一个100张分表的mysql数据同步到oracle里面。调度决策思路是:
Addax Job 根据分库分表切分成了 100 个 Task。
根据 20 个并发,计算共需要分配20/5 = 4个 TaskGroup。
4 个 TaskGroup 平分切分好的 100 个 Task,每一个 TaskGroup 负责以5个并发共计运行25个 Task。
核心优势
可靠的数据质量监控
● 完美解决数据传输个别类型失真问题
支持所有的强数据类型,每一种插件都有自己的数据类型转换策略,让数据可以完整无损的传输到目的端。
● 提供作业全链路的流量、数据量运行时监控
运行过程中可以将作业本身状态、数据流量、数据速度、执行进度等信息进行全面的展示,让用户可以实时了解作业状态。并可在作业执行过程中智能判断源端和目的端的速度对比情况,给予用户更多性能排查信息。
● 提供脏数据探测
在大量数据的传输过程中,必定会由于各种原因导致很多数据传输报错(比如类型转换错误),这种数据 Addax 认为就是脏数据。Addax 目前可以实现脏数据精确过滤、识别、采集、展示,为用户提供多种的脏数据处理模式,让用户准确把控数据质量大关!
丰富的数据转换功能
作为一个服务于大数据的 ETL 工具,除了提供数据快照搬迁功能之外,还提供了丰富数据转换的功能,让数据在传输过程中可以轻松完成数据脱敏,补全,过滤等数据转换功能,另外还提供了自动groovy函数,让用户自定义转换函数。详情请看transformer详细介绍。
精准的速度控制
提供了包括通道(并发)、记录流、字节流三种流控模式,可以随意控制你的作业速度,让你的作业在库可以承受的范围内达到最佳的同步速度。
{
“speed”: {
“channel”: 5,
“byte”: 1048576,
“record”: 10000
}
} 复制代码
强劲地同步性能
每一种读插件都有一种或多种切分策略,都能将作业合理切分成多个Task并行执行,单机多线程执行模型可以让速度随并发成线性增长。在源端和目的端性能都足够的情况下,单个作业一定可以打满网卡。
健壮的容错机制
作业是极易受外部因素的干扰,网络闪断、数据源不稳定等因素很容易让同步到一半的作业报错停止。因此稳定性是 Addax 的基本要求,在 Addax 的设计中,重点完善了框架和插件的稳定性。目前 Addax 可以做到线程级别、作业级别多层次局部/全局的重试,保证用户的作业稳定运行。
安装 Addax
一键安装
job/test.json
{
“content”:{
“reader”:{
“name”:“streamreader”,
“parameter”:{
“column”:[
{
“value”:“addax”,
“type”:“string”
},
{
“value”:19890604,
“type”:“long”
},
{
“value”:“1989-06-04 00:00:00”,
“type”:“date”
},
{
“value”:true,
“type”:“bool”
}
],
“sliceRecordCount”:10
}
},
“writer”:{
“name”:“streamwriter”,
“parameter”:{
“print”:true
}
}
}
} 复制代码
将上述文件保存为 job/test.json
然后执行下面的命令:
bin/addax.sh job/test.json
如果没有报错,应该会有类似这样的输出
/ _ \ || ||
/ /\ \ __|| __|| __ ___ __
| _ |/ |/ _|/ `\ / /
||||(||(||(||> <
_||/_,|_,|_,//_\
:: Addax version :: (v4.0.3-SNAPSHOT)
2021-08-23 13:45:17.199 [ main] INFO VMInfo - VMInfo# operatingSystem class=> com.sun.management.internal.OperatingSystemImpl
2021-08-23 13:45:17.223 [ main] INFO Engine -
{
“content”:
{
“reader”:{
“parameter”:{
“column”:[
{
“type”:“string”,
“value”:“addax”
},
{
“type”:“long”,
“value”:19890604
},
{
“type”:“date”,
“value”:“1989-06-04 00:00:00”
},
{
“type”:“bool”,
“value”:true
}
],
“sliceRecordCount”:10
},
“name”:“streamreader”
},
“writer”:{
“parameter”:{
“print”:true
},
“name”:“streamwriter”
}
}
}
2021-08-23 13:45:17.238 [ main] INFO PerfTrace - PerfTrace traceId=job_-1, isEnable=false, priority=0
2021-08-23 13:45:17.239 [ main] INFO JobContainer - Addax jobContainer starts job.
2021-08-23 13:45:17.240 [ main] INFO JobContainer - Set jobId=0
2021-08-23 13:45:17.250 [ job-0] INFO JobContainer - Addax Reader.Job [streamreader]do prepare work .
2021-08-23 13:45:17.250 [ job-0] INFO JobContainer - Addax Writer.Job [streamwriter]do prepare work .
2021-08-23 13:45:17.251 [ job-0] INFO JobContainer - Job set Channel-Number to 1 channels.
2021-08-23 13:45:17.251 [ job-0] INFO JobContainer - Addax Reader.Job [streamreader] splits to [1] tasks.
2021-08-23 13:45:17.252 [ job-0] INFO JobContainer - Addax Writer.Job [streamwriter] splits to [1] tasks.
2021-08-23 13:45:17.276 [ job-0] INFO JobContainer - Scheduler starts [1] taskGroups.
2021-08-23 13:45:17.282 [ taskGroup-0] INFO TaskGroupContainer - taskGroupId=[0] start [1] channels for[1] tasks.
2021-08-23 13:45:17.287 [ taskGroup-0] INFO Channel - Channel set byte_speed_limit to -1, No bps activated.
2021-08-23 13:45:17.288 [ taskGroup-0] INFO Channel - Channel set record_speed_limit to -1, No tps activated.
addax 19890604 1989-06-04 00:00:00 true
addax 19890604 1989-06-04 00:00:00 true
addax 19890604 1989-06-04 00:00:00 true
addax 19890604 1989-06-04 00:00:00 true
addax 19890604 1989-06-04 00:00:00 true
addax 19890604 1989-06-04 00:00:00 true
addax 19890604 1989-06-04 00:00:00 true
addax 19890604 1989-06-04 00:00:00 true
addax 19890604 1989-06-04 00:00:00 true
addax 19890604 1989-06-04 00:00:00 true
2021-08-23 13:45:20.295 [ job-0] INFO AbstractScheduler - Scheduler accomplished all tasks.
2021-08-23 13:45:20.296 [ job-0] INFO JobContainer - Addax Writer.Job [streamwriter]do post work.
2021-08-23 13:45:20.297 [ job-0] INFO JobContainer - Addax Reader.Job [streamreader]do post work.
2021-08-23 13:45:20.302 [ job-0] INFO JobContainer - PerfTrace not enable!
2021-08-23 13:45:20.305 [ job-0] INFO StandAloneJobContainerCommunicator - Total 10 records, 220 bytes | Speed 73B/s, 3 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.000s | All Task WaitReaderTime 0.011s | Percentage 100.00%
2021-08-23 13:45:20.307 [ job-0] INFO JobContainer -
任务启动时刻 : 2021-08-23 13:45:17
任务结束时刻 : 2021-08-23 13:45:20
任务总计耗时 : 3s
任务平均流量 : 73B/s
记录写入速度 : 3rec/s
读出记录总数 : 10
读写失败总数 : 0 复制代码
任务配置
一个采集任务就是一个 JSON 格式配置文件,该配置文件的模板如下:
{
“job”:{
“settings”:{},
“content”:{
“reader”:{},
“writer”:{},
“transformer”:[]
}
}
} 复制代码
任务配置由 key 为 job 的字典组成,其字典元素由三部分组成:
· settings: 用来定义本次任务的一些控制参数,比如指定多少线程,最大错误率,最大错误记录条数等,这是可选配置。
· reader: 用来配置数据读取所需要的相关信息,这是必填内容
· writer: 用来配置写入数据所需要的相关信息,这是必填内容
· transformer: 数据转换规则,如果需要对读取的数据在写入之前做一些变换,可以配置该项,否则可以不配置
reader 配置项
reader 配置项依据不同的 reader 插件而有些微不同,但大部分的配置大同小异,特别是针对关系型数据库而言,其基本配置如下:
{
“name”:“mysqlreader”,
“parameter”:{
“username”:"",
“password”:"",
“column”:[],
“connection”:[
{
“jdbcUrl”:[],
“table”:[]
}
],
“where”:""
}
} 复制代码
其中 name 是插件的名称,每个插件的名称都是唯一的,每个插件更详细的配置可以参考读取插件章节的各插件内容
writer 配置项
writer 配置项和 reader 配置项差不多,其基本模板如下:
{
“name”:“mysqlwriter”,
“parameter”:{
“username”:"",
“password”:"",
“writeMode”:"",
“column”:[],
“session”:[],
“preSql”:[],
“connection”:[
{
“jdbcUrl”:"",
“table”:[]
}
]
}
} 复制代码
同样的,这里的 name 也是唯一的,每个插件更详细的配置可以参考写入插件章节的各插件内容
settings 配置项
settings 可配置的内容如下:
解释如下:
{
“setting”:{
“speed”:{
“byte”:-1,
“record”:100,
“channel”:1
},
“errorLimit”:{
“record”:0,
“percentage”:0.02
}
}
} 复制代码
speed
顾名思义,这里是用来做流控的配置项,可以从网络传输速度,每秒的记录数以及线程数上做控制,分别描述如下:
speed.byte
设置每秒可获取的字节数(Bps),一般是为了防止执行任务时将整个带宽跑满,从而影响到其他服务。如果不做限制,可设置为 -1
speed.record
设置记录每秒可获取的最大记录条数,该参数需要和 speed.byte 配合使用
speed.channel
设置通道数,该通道路确定了每个任务的线程数,目前一个channel对应5个线程,比如设置 channel 为 3,则有 3 * 5 + 1 = 16 个线程,其中一个线程为统计线程。
errorLimit
errorLimit 用来配置在数据写入报错时的行为,具体如下
errorLimit.record
允许错误的记录条数,如果超过这个数,则认为本次任务失败,否则认为成功
errorLimit.percentage
允许错误记录的比率,超过这个比率,则认为本次任务失败,否则认为成功
注意,上述参数在 conf/core.json 配置文件均有默认配置,用来控制全局的设置。
Data Reader
DataReader 插件是专门提供用于开发和测试环境中,生产满足一定规则要求的数据的插件。
在实际开发和测试中,我们需要按照一定的业务规则来生产测试数据,而不仅仅是随机内容,比如身份证号码,银行账号,股票代码等。
为什么要重复发明轮子
诚然,网络上有相当多的专门的数据生产工具,其中不乏功能强大、性能也强悍。但这些工具大部分是考虑到了数据生成这一段,而忽略了数据写入到目标端的问题,或者说有些考虑到了,但仅仅只考虑了一种或有限的几种数据库。
恰好 Addax 工具能够提供足够多的目标端写入能力,加上之前的已有的 streamReader 已经算是一个简单版的数据生成工具,因此在此功能上增加一些特定规则,再利用写入端多样性的能力,自然就成为了一个较好的数据生成工具。
配置示例
这里我把目前插件支持的规则全部列举到下面的例子中
datareader2stream.json
{
“job”: {
“setting”: {
“speed”: {
“byte”: -1,
“channel”: 1
},
“errorLimit”: {
“record”: 0,
“percentage”: 0.02
}
},
“content”: {
“reader”: {
“name”: “datareader”,
“parameter”: {
“column”: [
{
“value”: “1,100,”,
“rule”: “random”,
“type”: “double”
},
{
“value”: “DataX”,
“type”: “string”
},
{
“value”: “1”,
“rule”: “incr”,
“type”: “long”
},
{
“value”: “1989/06/04 00:00:01,-1”,
“rule”: “incr”,
“type”: “date”,
“dateFormat”: “yyyy/MM/dd hh:mm:ss”
},
{
“value”: “test”,
“type”: “bytes”
},
{
“rule”: “address”
},
{
“rule”: “bank”
},
{
“rule”: “company”
},
{
“rule”: “creditCard”
},
{
“rule”: “debitCard”
},
{
“rule”: “idCard”
},
{
“rule”: “lat”
},
{
“rule”: “lng”
},
{
“rule”: “name”
},
{
“rule”: “job”
},
{
“rule”: “phone”
},
{
“rule”: “stockCode”
},
{
“rule”: “stockAccount”
}
],
“sliceRecordCount”: 10
}
},
“writer”: {
“name”: “streamwriter”,
“parameter”: {
“print”: true,
“encoding”: “UTF-8”
}
}
}
}
} 复制代码
保存上述内容到 job/datareader2stream.json
然后执行该任务,其输出结果类似如下:
$ bin/addax.sh job/datareader2stream.json
/ _ \ | | | |
/ /\ \ __| | __| | __ ___ __
| _ |/ |/ _ |/ ` \ / /
| | | | (| | (| | (| |> <
_| |/_,|_,|_,//_\
:: Addax version :: (v4.0.2-SNAPSHOT)
2021-08-13 17:02:00.888 [ main] INFO VMInfo - VMInfo# operatingSystem class => com.sun.management.internal.OperatingSystemImpl
2021-08-13 17:02:00.910 [ main] INFO Engine -
{
“content”:
{
“reader”:{
“parameter”:{
“column”:[
{
“rule”:“random”,
“type”:“double”,
“scale”: “2”,
“value”:“1,100,”
},
{
“type”:“string”,
“value”:“DataX”
},
{
“rule”:“incr”,
“type”:“long”,
“value”:“1”
},
{
“dateFormat”:“yyyy/MM/dd hh:mm:ss”,
“rule”:“incr”,
“type”:“date”,
“value”:“1989/06/04 00:00:01,-1”
},
{
“type”:“bytes”,
“value”:“test”
},
{
“rule”:“address”
},
{
“rule”:“bank”
},
{
“rule”:“company”
},
{
“rule”:“creditCard”
},
{
“rule”:“debitCard”
},
{
“rule”:“idCard”
},
{
“rule”:“lat”
},
{
“rule”:“lng”
},
{
“rule”:“name”
},
{
“rule”:“job”
},
{
“rule”:“phone”
},
{
“rule”:“stockCode”
},
{
“rule”:“stockAccount”
}
],
“sliceRecordCount”:10
},
“name”:“datareader”
},
“writer”:{
“parameter”:{
“print”:true,
“encoding”:“UTF-8”
},
“name”:“streamwriter”
}
},
“setting”:{
“errorLimit”:{
“record”:0,
“percentage”:0.02
},
“speed”:{
“byte”:-1,
“channel”:1
}
}
}
2021-08-13 17:02:00.937 [ main] INFO PerfTrace - PerfTrace traceId=job_-1, isEnable=false, priority=0
2021-08-13 17:02:00.938 [ main] INFO JobContainer - Addax jobContainer starts job.
2021-08-13 17:02:00.940 [ main] INFO JobContainer - Set jobId = 0
2021-08-13 17:02:00.976 [ job-0] INFO JobContainer - Addax Reader.Job [datareader] do prepare work .
2021-08-13 17:02:00.977 [ job-0] INFO JobContainer - Addax Writer.Job [streamwriter] do prepare work .
2021-08-13 17:02:00.978 [ job-0] INFO JobContainer - Job set Channel-Number to 1 channels.
2021-08-13 17:02:00.979 [ job-0] INFO JobContainer - Addax Reader.Job [datareader] splits to [1] tasks.
2021-08-13 17:02:00.980 [ job-0] INFO JobContainer - Addax Writer.Job [streamwriter] splits to [1] tasks.
2021-08-13 17:02:01.002 [ job-0] INFO JobContainer - Scheduler starts [1] taskGroups.
2021-08-13 17:02:01.009 [ taskGroup-0] INFO TaskGroupContainer - taskGroupId=[0] start [1] channels for [1] tasks.
2021-08-13 17:02:01.017 [ taskGroup-0] INFO Channel - Channel set byte_speed_limit to -1, No bps activated.
2021-08-13 17:02:01.017 [ taskGroup-0] INFO Channel - Channel set record_speed_limit to -1, No tps activated.
7.65 DataX 1 1989-06-04 00:00:01 test 天津市南京县长寿区光明路263号 交通银行 易动力信息有限公司 6227894836568607 6235712610856305437 450304194808316766 31.3732613 -125.3507716 龚军 机电工程师 13438631667 726929 8741848665
18.58 DataX 2 1989-06-03 00:00:01 test 江苏省太原市浔阳区东山路33号中国银行 时空盒数字信息有限公司 4096666711928233 6217419359154239015 220301200008188547 48.6648764 104.8567048 匡飞 化妆师 18093137306 006845 1815787371
16.16 DataX 3 1989-06-02 00:00:01 test 台湾省邯郸市清河区万顺路10号大同商行 开发区世创科技有限公司 4096713966912225 6212977716107080594 150223196408276322 29.0134395 142.6426842 支波 审核员 13013458079 020695 3545552026
63.89 DataX 4 1989-06-01 00:00:01 test 上海市辛集县六枝特区甘园路119号 中国农业银行 泰麒麟传媒有限公司 6227893481508780 6215686558778997167 220822196208286838 -71.6484635 111.8181273 敬坤 房地产客服 13384928291 174445 0799668655
79.18 DataX 5 1989-05-31 00:00:01 test 陕西省南京市朝阳区大胜路170号 内蒙古银行 晖来计算机信息有限公司 6227535683896707 6217255315590053833 350600198508222018 -24.9783587 78.017024 蒋杨 固定资产会计 18766298716 402188 9633759917
14.97 DataX 6 1989-05-30 00:00:01 test 海南省长春县璧山区碧海街147号 华夏银行 浙大万朋科技有限公司 6224797475369912 6215680436662199846 220122199608190275 -3.5088667 -40.2634359 边杨 督导/巡店 13278765923 092780 2408887582
45.49 DataX 7 1989-05-29 00:00:01 test 台湾省潜江县梁平区七星街201号 晋城商行 开发区世创信息有限公司 5257468530819766 6213336008535546044 141082197908244004 -72.9200596 120.6018163 桑明 系统工程师 13853379719 175864 8303448618
8.45 DataX 8 1989-05-28 00:00:01 test 海南省杭州县城北区天兴路11号大同商行 万迅电脑科技有限公司 6227639043120062 6270259717880740332 430405198908214042 -16.5115338 -39.336119 覃健 人事总监 13950216061 687461 0216734574
15.01 DataX 9 1989-05-27 00:00:01 test 云南省惠州市和平区海鸥街201号 内蒙古银行 黄石金承信息有限公司 6200358843233005 6235730928871528500 130300195008312067 -61.646097 163.0882369 卫建华电话采编 15292600492 001658 1045093445
55.14 DataX 10 1989-05-26 00:00:01 test 辽宁省兰州市徐汇区东山街176号 廊坊银行 创汇科技有限公司 6227605280751588 6270262330691012025 341822200908168063 77.2165746 139.5431377 池浩 多媒体设计 18693948216 201678 0692522928
2021-08-13 17:02:04.020 [ job-0] INFO AbstractScheduler - Scheduler accomplished all tasks.
2021-08-13 17:02:04.021 [ job-0] INFO JobContainer - Addax Writer.Job [streamwriter] do post work.
2021-08-13 17:02:04.022 [ job-0] INFO JobContainer - Addax Reader.Job [datareader] do post work.
2021-08-13 17:02:04.025 [ job-0] INFO JobContainer - PerfTrace not enable!
2021-08-13 17:02:04.028 [ job-0] INFO StandAloneJobContainerCommunicator - Total 10 records, 1817 bytes | Speed 605B/s, 3 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.000s | All Task WaitReaderTime 0.000s | Percentage 100.00%
2021-08-13 17:02:04.030 [ job-0] INFO JobContainer -
任务启动时刻 : 2021-08-13 17:02:00
任务结束时刻 : 2021-08-13 17:02:04
任务总计耗时 : 3s
任务平均流量 : 605B/s
记录写入速度 : 3rec/s
读出记录总数 : 10
读写失败总数 : 0 复制代码
配置说明
column 的配置和其他插件的配置稍有不同,一个字段由以下配置项组成
配置项 是否必须 默认值 示例 说明
value 否 无 Addax 数据值,在某些情况下为必选项
rule 否 constant idCard 数据生产规则,详细下面的描述
type 否 string double 数据值类型
dateFormat 否 yyyy-MM-dd HH:mm:ss yyyy/MM/dd HH:mm:ss 日期格式,仅在type 为 date 时有效
rule 说明
该插件的字段配置核心是 rule 字段,它用来指示应该生成什么样的数据,并依据不同规则,配合其他配置选项来生产满足期望的数据。当前 rule 的配置均为内置支持的规则,暂不支持自定义,以下详细说明
constant
constant 是 rule 的默认配置,该规则意味着要生成的数据值由 value 配置项决定,其不做任何变更。比如
{
“value”: “Addax”,
“type”: “string”,
“rule”: “constant”
} 复制代码
表示该字段生产的数据值均为 Addax
incr
incr 配置项的含义和 streamreader 插件中的 incr 含义一致,表示这是一个递增的数据生产规则,比如
{
“value”: “1,2”,
“rule”: “incr”,
“type”: “long”
} 复制代码
表示该字段的数据是一个长整形,数值从 1 开始,每次递增 2,也就是形成 1 开始,步长为 2 的递增数列。
该字段更详细的配置规则和注意事项,可以参考 streamreader 中的 incr 说明。
random
random 配置项的含义和 streamreader 插件中的 random 含义一致,表示这是一个递增的数据生产规则,比如
{
“value”: “1,10”,
“rule”: “random”,
“type”: “string”
} 复制代码
表示该字段的数据是一个长度为 1 到 10 (1和10都包括)随机字符串。
该字段更详细的配置规则和注意事项,可以参考 streamreader 中的 random 说明。
规则名称 含义 示例 数据类型 说明
address 随机生成一条基本满足国内实际情况的地址信息 辽宁省兰州市徐汇区东山街176号 string
bank 随机生成一个国内银行名称 华夏银行 string
company 随机生成一个公司的名称 万迅电脑科技有限公司 string
creditCard 随机生成一个信用卡卡号 430405198908214042 string 16位
debitCard 随机生成一个储蓄卡卡号 6227894836568607 string 19位
idCard 随机生成一个国内身份证号码 350600198508222018 string 18位,负责校验规则,头6位编码满足行政区划要求
lat 随机生成维度数据 48.6648764 double 固定7位小数 ,也可以用latitude 表示
lng 随机生成经度数据 120.6018163 double 固定7位小数,也可以使用longitude 表示
name 随机生成一个国内名字 池浩 string 暂没考虑姓氏在国内的占比度
job 随机生成一个国内岗位名称 系统工程师 string 数据来源于招聘网站
phone 随机生成一个国内手机号码 15292600492 string 暂不考虑虚拟手机号
stockCode 随机生成一个6位的股票代码 687461 string 前两位满足国内股票代码编号规范
stockAccount 随机生成一个10位的股票交易账户 0692522928 string 完全随机,不满足账户规范
uuid 随机生成一个 UUID 字符串 bc1cf125-929b-43b7-b324-d7c4cc5a75d2 string 完全随机,不满足账户规范
注意:上述表格中的规则返回的数据类型是固定的,且不支持修改,因此 type 无需配置,配置的类型也会被忽略,因为数据生成来自内部规则,所以 value 也无需配置,配置的内容也会被忽略。
MySQL Reader
MysqlReader 插件实现了从Mysql读取数据
示例
我们在 MySQL 的 test 库上创建如下表,并插入一条记录
下面的配置是读取该表到终端的作业:
CREATETABLEaddax_reader
(
c_bigint bigint,
c_varchar varchar(100),
c_timestamp timestamp,
c_text text,
c_decimal decimal(8,3),
c_mediumtextmediumtext,
c_longtext longtext,
c_int int,
c_time time,
c_datetime datetime,
c_enum enum(‘one’,‘two’,‘three’),
c_float float,
c_smallint smallint,
c_bit bit,
c_double double,
c_blob blob,
c_char char(5),
c_varbinary varbinary(100),
c_tinyint tinyint,
c_json json,
c_setSET(‘a’,‘b’,‘c’,‘d’),
c_binary binary,
c_longblob longblob,
c_mediumblobmediumblob
);
INSERTINTOaddax_reader
VALUES(2E18,
‘a varchar data’,
‘2021-12-12 12:12:12’,
‘a long text’,
12345.122,
‘a medium text’,
‘a long text’,
2^32-1,
‘12:13:14’,
‘2021-12-12 12:13:14’,
‘one’,
17.191,
126,
0,
1114.1114,
‘blob’,
‘a123b’,
‘a var binary content’,
126,
‘{“k1”:“val1”,“k2”:“val2”}’,
‘b’,
binary(1),
x’89504E470D0A1A0A0000000D494844520000001000000010080200000090916836000000017352474200’,
x’89504E470D0A1A0A0000000D’);
job/mysql2stream.json
{
“job”:{
“setting”:{
“speed”:{
“channel”:3,
“bytes”:-1
}
},
“content”:{
“reader”:{
“name”:“mysqlreader”,
“parameter”:{
“username”:“root”,
“password”:“root”,
“column”:[
“*”
],
“connection”:[
{
“table”:[
“addax_reader”
],
“jdbcUrl”:[
“jdbc:mysql://127.0.0.1:3306/test”
],
“driver”:“com.mysql.jdbc.Driver”
}
]
}
},
“writer”:{
“name”:“streamwriter”,
“parameter”:{
“print”:true
}
}
}
}
} 复制代码
将上述配置文件保存为 job/mysql2stream.json
执行采集命令
执行以下命令进行数据采集
bin/addax.sh job/mysql2stream.json
参数说明
配置项 是否必须 类型 默认值 描述
jdbcUrl 是 list 无 对端数据库的JDBC连接信息,jdbcUrl按照RDBMS官方规范,并可以填写连接附件控制信息
driver 否 string 无 自定义驱动类名,解决兼容性问题,详见下面描述
username 是 string 无 数据源的用户名
password 否 string 无 数据源指定用户名的密码
table 是 list 无 所选取的需要同步的表名,使用JSON数据格式,当配置为多张表时,用户自己需保证多张表是同一表结构
column 是 list 无 所配置的表中需要同步的列名集合,详细描述 rdbmreader
splitPk 否 string 无 使用splitPk代表的字段进行数据分片,详细描述见 rdbmreader
autoPk 否 bool false 是否自动猜测分片主键,3.2.6 版本引入
where 否 string 无 针对表的筛选条件
querySql 否 list 无 使用自定义的SQL而不是指定表来获取数据,当配置了这一项之后,Addax系统就会忽略 table,column这些配置项
driver
当前 Addax 采用的 MySQL JDBC 驱动为 8.0 以上版本,驱动类名使用的 com.mysql.cj.jdbc.Driver,而不是 com.mysql.jdbc.Driver。如果你需要采集的 MySQL 服务低于 5.6,需要使用到 Connector/J 5.1 驱动,则可以采取下面的步骤:
替换插件内置的驱动
rm -f plugin/reader/mysqlreader/lib/mysql-connector-java-*.jar
拷贝老的驱动到插件目录
cp mysql-connector-java-5.1.48.jar plugin/reader/mysqlreader/lib/
指定驱动类名称
在你的 json 文件类,配置 “driver”: “com.mysql.jdbc.Driver”
类型转换
目前MysqlReader支持大部分Mysql类型,但也存在部分个别类型没有支持的情况,请注意检查你的类型。
下面列出MysqlReader针对Mysql类型转换列表:
Addax 内部类型 MySQL 数据类型
Long int, tinyint, smallint, mediumint, int, bigint
Double float, double, decimal
String varchar, char, tinytext, text, mediumtext, longtext, year
Date date, datetime, timestamp, time
Boolean bit, bool
Bytes tinyblob, mediumblob, blob, longblob, varbinary
请注意:
· 除上述罗列字段类型外,其他类型均不支持
· tinyint(1) Addax视作为整形
· year Addax视作为字符串类型
· bit Addax属于未定义行为
数据库编码问题
Mysql本身的编码设置非常灵活,包括指定编码到库、表、字段级别,甚至可以均不同编码。优先级从高到低为字段、表、库、实例。我们不推荐数据库用户设置如此混乱的编码,最好在库级别就统一到UTF-8。
MysqlReader底层使用JDBC进行数据抽取,JDBC天然适配各类编码,并在底层进行了编码转换。因此MysqlReader不需用户指定编码,可以自动获取编码并转码。
对于Mysql底层写入编码和其设定的编码不一致的混乱情况,MysqlReader对此无法识别,对此也无法提供解决方案,对于这类情况,导出有可能为乱码。
Hdfs Writer
HdfsWriter 提供向HDFS文件系统指定路径中写入 TEXTFile , ORCFile , PARQUET 等格式文件的能力,文件内容可与 hive 中表关联。
配置样例
{
“job”:{
“setting”:{
“speed”:{
“channel”:2,
“bytes”:-1
}
},
“content”:{
“reader”:{
“name”:“streamreader”,
“parameter”:{
“column”:[
{
“value”:“Addax”,
“type”:“string”
},
{
“value”:19890604,
“type”:“long”
},
{
“value”:“1989-06-04 00:00:00”,
“type”:“date”
},
{
“value”:true,
“type”:“bool”
},
{
“value”:“test”,
“type”:“bytes”
}
],
“sliceRecordCount”:1000
},
“writer”:{
“name”:“hdfswriter”,
“parameter”:{
“defaultFS”:“hdfs://xxx:port”,
“fileType”:“orc”,
“path”:"/user/hive/warehouse/writerorc.db/orcfull",
“fileName”:“xxxx”,
“column”:[
{
“name”:“col1”,
“type”:“string”
},
{
“name”:“col2”,
“type”:“int”
},
{
“name”:“col3”,
“type”:“string”
},
{
“name”:“col4”,
“type”:“boolean”
},
{
“name”:“col5”,
“type”:“string”
}
],
“writeMode”:“overwrite”,
“fieldDelimiter”:"\u0001",
“compress”:“SNAPPY”
}
}
}
}
}
} 复制代码
参数说明
配置项 是否必须 默认值 说明
path 是 无 要读取的文件路径
defaultFS 是 无 Hadoop hdfs 文件系统 NAMENODE 节点地址,如果配置了 HA 模式,则为 defaultFS 的值
fileType 是 无 文件的类型
fileName 是 无 要写入的文件名,用于当作前缀
column 是 无 写入的字段列表
writeMode 是 无 写入模式,支持 append, overwrite, nonConflict
fieldDelimiter 否 , 指定文本文件的字段分隔符,二进制文件不需要指定该项
encoding 否 utf-8 文件的编码配置, 目前仅支持 utf-8
nullFormat 否 无 自定义哪些字符可以表示为空,例如如果用户配置: “\N” ,那么如果源头数据是 “\N” ,视作 null 字段
haveKerberos 否 无 是否启用 Kerberos 认证,如果启用,则需要同时配置 kerberosKeytabFilePath,kerberosPrincipal
kerberosKeytabFilePath 否 无 用于 Kerberos 认证的凭证文件路径, 比如 /your/path/addax.service.keytab
kerberosPrincipal 否 无 用于 Kerberos 认证的凭证主体, 比如 addax/node1@WGZHAO.COM
compress 否 无 文件的压缩格式
hadoopConfig 否 无 里可以配置与 Hadoop 相关的一些高级参数,比如HA的配置
path
存储到 Hadoop hdfs文件系统的路径信息,HdfsWriter 会根据并发配置在 Path 目录下写入多个文件。为与hive表关联,请填写hive表在hdfs上的存储路径。例:Hive上设置的数据仓库的存储路径为:/user/hive/warehouse/ ,已建立数据库:test,表:hello;则对应的存储路径为:/user/hive/warehouse/test.db/hello (如果建表时指定了location 属性,则依据该属性的路径)
defaultFS
Hadoop hdfs文件系统 namenode 节点地址。格式:hdfs://ip:port ;例如:hdfs://127.0.0.1:9000 , 如果启用了HA,则为 servicename 模式,比如 hdfs://sandbox
fileType
描述:文件的类型,目前只支持用户配置为
· text 表示 Text file文件格式
· orc 表示 OrcFile文件格式
· parquet 表示 Parquet 文件格式
· rc 表示 Rcfile 文件格式
· seq 表示sequence file文件格式
· csv 表示普通hdfs文件格式(逻辑二维表)
column
写入数据的字段,不支持对部分列写入。为与hive中表关联,需要指定表中所有字段名和字段类型,其中:name 指定字段名,type 指定字段类型。
用户可以指定 column 字段信息,配置如下:
{
“column”:[
{
“name”:“userName”,
“type”:“string”
},
{
“name”:“age”,
“type”:“long”
},
{
“name”:“salary”,
“type”:“decimal(8,2)”
}
]
} 复制代码
对于数据类型是 decimal 类型的,需要注意:
如果没有指定精度和小数位,则使用默认的 decimal(38,10) 表示
1
如果仅指定了精度但未指定小数位,则小数位用0表示,即 decimal(p,0)
1
如果都指定,则使用指定的规格,即 decimal(p,s)
1
writeMode
写入前数据清理处理模式:
· append,写入前不做任何处理,直接使用 filename 写入,并保证文件名不冲突。
· overwrite 如果写入目录存在数据,则先删除,后写入
· nonConflict,如果目录下有 fileName 前缀的文件,直接报错。
compress
描述:hdfs文件压缩类型,默认不填写意味着没有压缩。其中:text类型文件支持压缩类型有gzip、bzip2;orc类型文件支持的压缩类型有NONE、SNAPPY(需要用户安装SnappyCodec)
hadoopConfig
hadoopConfig 里可以配置与 Hadoop 相关的一些高级参数,比如HA的配置
{
“hadoopConfig”:{
“dfs.nameservices”:“cluster”,
“dfs.ha.namenodes.cluster”:“nn1,nn2”,
“dfs.namenode.rpc-address.cluster.nn1”:“node1.example.com:8020”,
“dfs.namenode.rpc-address.cluster.nn2”:“node2.example.com:8020”,
“dfs.client.failover.proxy.provider.cluster”:“org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider”
}
} 复制代码
这里的 cluster 表示 HDFS 配置成HA时的名字,也是 defaultFS 配置项中的名字如果实际环境中的名字不是 cluster ,则上述配置中所有写有 cluster 都需要替换
haveKerberos
是否有Kerberos认证,默认 false, 如果用户配置true,则配置项 kerberosKeytabFilePath,kerberosPrincipal 为必填。
kerberosKeytabFilePath
Kerberos认证 keytab文件路径,绝对路径
kerberosPrincipal
描述:Kerberos认证Principal名,如 xxxx/hadoopclient@xxx.xxx
类型转换
Addax 内部类型 HIVE 数据类型
Long TINYINT,SMALLINT,INT,INTEGER,BIGINT
Double FLOAT,DOUBLE,DECIMAL
String STRING,VARCHAR,CHAR
Boolean BOOLEAN
Date DATE,TIMESTAMP
Bytes BINARY 复制代码
功能与限制
目前不支持:binary、arrays、maps、structs、union类型
1
Doris Writer
DorisWriter 插件用于向 Doris 数据库以流式方式写入数据。其实现上是通过访问 Doris http 连接(8030) ,然后通过 stream load 加载数据到数据中,相比 insert into 方式效率要高不少,也是官方推荐的生产环境下的数据加载方式。
Doris 是一个兼容 MySQL 协议的数据库后端,因此 Doris 读取可以使用 MySQLReader 进行访问。
示例
假定要写入的表的建表语句如下:
CREATE
DATABASEexample_db;
CREATETABLEexample_db.table1
(
siteid INT DEFAULT’10’,
citycodeSMALLINT,
usernameVARCHAR(32)DEFAULT’’,
pv BIGINTSUMDEFAULT’0’
)AGGREGATEKEY(siteid,citycode,username)
DISTRIBUTEDBYHASH(siteid)BUCKETS10
PROPERTIES(“replication_num”=“1”); 复制代码
下面配置一个从内存读取数据,然后写入到 doris 表的配置文件
{
“job”:{
“setting”:{
“speed”:{
“channel”:2
}
},
“content”:{
“writer”:{
“name”:“doriswriter”,
“parameter”:{
“username”:“test”,
“password”:“123456”,
“batchSize”:1024,
“connection”:[
{
“table”:“table1”,
“database”:“example_db”,
“endpoint”:“http://127.0.0.1:8030/”
}
],
“loadProps”:{},
“lineDelimiter”:"\n",
“format”:“csv”
}
},
“reader”:{
“name”:“streamreader”,
“parameter”:{
“column”:[
{
“random”:“1,500”,
“type”:“long”
},
{
“random”:“1,127”,
“type”:“long”
},
{
“value”:“this is a text”,
“type”:“string”
},
{
“random”:“5,200”,
“type”:“long”
}
],
“sliceRecordCount”:100
}
}
}
}
} 复制代码
将上述配置文件保存为 job/stream2doris.json
执行下面的命令
bin/addax.sh job/stream2doris.json
输出类似如下:
2021-02-23 15:22:57.851 [main] INFO VMInfo - VMInfo# operatingSystem class => sun.management.OperatingSystemImpl
2021-02-23 15:22:57.871 [main] INFO Engine -
{
“content”:{
“reader”:{
“parameter”:{
“column”:[
{
“random”:“1,500”,
“type”:“long”
},
{
“random”:“1,127”,
“type”:“long”
},
{
“type”:“string”,
“value”:“username”
}
],
“sliceRecordCount”:100
},
“name”:“streamreader”
},
“writer”:{
“parameter”:{
“password”:"*****",
“batchSize”:1024,
“connection”:[
{
“database”:“example_db”,
“endpoint”:“http://127.0.0.1:8030/”,
“table”:“table1”
}
],
“username”:“test”
},
“name”:“doriswriter”
}
},
“setting”:{
“speed”:{
“channel”:2
}
}
}
2021-02-23 15:22:57.886 [main] INFO PerfTrace - PerfTrace traceId=job_-1, isEnable=false, priority=0
2021-02-23 15:22:57.886 [main] INFO JobContainer - Addax jobContainer starts job.
2021-02-23 15:22:57.920 [job-0] INFO JobContainer - Scheduler starts [1] taskGroups.
2021-02-23 15:22:57.928 [taskGroup-0] INFO TaskGroupContainer - taskGroupId=[0] start [2] channels for [2] tasks.
2021-02-23 15:22:57.935 [taskGroup-0] INFO Channel - Channel set byte_speed_limit to -1, No bps activated.
2021-02-23 15:22:57.936 [taskGroup-0] INFO Channel - Channel set record_speed_limit to -1, No tps activated.
2021-02-23 15:22:57.970 [0-0-1-writer] INFO DorisWriterTask - connect DorisDB with http://127.0.0.1:8030//api/example_db/table1/_stream_load
2021-02-23 15:22:57.970 [0-0-0-writer] INFO DorisWriterTask - connect DorisDB with http://127.0.0.1:8030//api/example_db/table1/_stream_load
2021-02-23 15:23:00.941 [job-0] INFO JobContainer - PerfTrace not enable!
2021-02-23 15:23:00.946 [job-0] INFO JobContainer -
任务启动时刻 : 2021-02-23 15:22:57
任务结束时刻 : 2021-02-23 15:23:00
任务总计耗时 : 3s
任务平均流量 : 1.56KB/s
记录写入速度 : 66rec/s
读出记录总数 : 200
读写失败总数 : 0 复制代码
参数说明
配置项 是否必须 类型 默认值 描述
endpoint 是 string 无 Doris 的HTTP连接方式,只需要写到主机和端口即可,具体路径插件会自动拼装 |
username 是 string 无 HTTP 签名验证帐号
password 否 string 无 HTTP 签名验证密码
table 是 string 无 所选取的需要同步的表名
column 否 list 无 所配置的表中需要同步的列名集合,详细描述见 rdbmswriter
batchSize 否 int 1024 定义了插件和数据库服务器端每次批量数据获取条数,调高该值可能导致 Addax 出现OOM或者目标数据库事务提交失败导致挂起
lineDelimiter 否 string \n 每行的的分隔符,支持多个字节, 例如 \x02\x03
format 否 string csv 导入数据的格式, 可以使是 json 或者 csv
loadProps 否 map csv streamLoad 的请求参数,详情参照StreamLoad介绍页面
connectTimeout 否 int -1 StreamLoad单次请求的超时时间, 单位毫秒(ms)
endpoint
endpoint 只是的任意一个 BE 的主机名及 webserver_port 端口,官方文档描述也可以填写 FE 主机名和 http_port 端口,但实际测试一直处于连接拒绝状态。
column
该插件中的 column 不是必须项,如果没有配置该项,或者配置为 ["*"] ,则按照 reader 插件获取的字段值进行顺序拼装。否则可以按照如下方式指定需要插入的字段
{
“column”:[
“siteid”,
“citycode”,
“username”
]
} 复制代码
如何调试项目
如果你想在本地修改或者调试代码,可以参考以下方式,以下的操作均以 Intellij IDEA 开发工具为例。
调试可以分成两方方式,一种是代码和二进制包都在本地,我们称之为本地调试;另外一种源代码在本地,但二进制程序已经部署在远程,这种情况下我们称之为远程调试。
下面分别描述
本地调试
一些设定
我们假定本地部署的 Addax 在 /opt/app/addax/4.0.3 文件夹下。其 job 目录下有这样的一个 job.json 配置文件,内容如下:
job/job.json
{
“content”:{
“reader”:{
“name”:“streamreader”,
“parameter”:{
“column”:[
{
“value”:“addax”,
“type”:“string”
},
{
“value”:19890604,
“type”:“long”
},
{
“value”:“1989-06-04 00:00:00”,
“type”:“date”
},
{
“value”:true,
“type”:“bool”
}
],
“sliceRecordCount”:10
}
},
“writer”:{
“name”:“streamwriter”,
“parameter”:{
“print”:true
}
}
}
} 复制代码
上述 job 文件运行没有符合我们的预期,猜测是 streamreader 这个插件的 parseMixupFunctions 函数有问题,我想调试看具体问题在哪里。
配置 IDEA
打开 IDEA工具,并打开 addax 项目源代码,打开 plugin/reader/streamreader/StreamReader.java 文件,找到 parseMixupFunctions 函数,并在函数申明处的点击左侧边缘处增加断点。如下图所示:
点击 IDEA 的 Run->Edit Configurations… 菜单,在弹出的 Run/Debug Configurations 窗口,点击左上角的 + 按钮,然后选择选择 Application ,在右侧配置框中,填写相关信息如下:
· Name: 调试描述名称,这里可以按照自己喜好填写
· Run on: 选择 Local machine
· Build and run:
· 第一个框: 选择 JDK 版本,目前仅在1.8版本上严格测试,建议选择1.8版本
· 第二个框: 选择 addax-core 模块
· Main class: 填写 com.wgzhao.addax.core.Engine
· 点击 Modify options,在弹出的下拉框中,选择 Add VM Options,在增加的 VM Options 中,填写 -Daddax.home=/opt/app/addax/4.0.3
· Program arguments: 填写 -job job/job.json
· Working directory:填写 /opt/app/addax/4.0.3
其他保持不变,点击 Apply 按钮。得到类似下图配置内容
点击 OK 按钮保存上述配置,回到 IDEA 主窗口,在窗口菜单栏有绿色🔨的右侧,应该可以看到刚才配置的描述文件,类似下图:
点击上述截图中的绿色 DEBUG 小虫按钮,进入调试,得到类似下图的调试窗口:
远程调试
一些假定
假定程序部署在远程服务器上,需要直接调试远程服务器上运行的程序,假定远程服务器IP地址为 192.168.1.100,Addax 部署在 /opt/addax/4.0.3 目录下,其 job 文件夹下,也有一个和本地调试中描述的 job.json 文件。同样的,上述 job 文件运行没有符合我们的预期,猜测是 streamreader 这个插件的 parseMixupFunctions 函数有问题,我想调试看具体问题在哪里。
注意:远程调试需要在服务器打开 9999 端口,因此要保证服务器上的 9999 端口没有被占用,如果被占用,则需要修改此端口。
修改方式如下:
打开 bin/addax.sh 脚本
1
定位到大约24行处,找到 address=0.0.0.0:9999 字符串
1
将 9999 修改成其他未被占用的端口
1
保存退出
1
配置 IDEA
打开 IDEA工具,并打开 addax 项目源代码,打开 plugin/reader/streamreader/StreamReader.java 文件,找到 parseMixupFunctions 函数,并在函数申明处的点击左侧边缘处增加断点。如下图所示:
点击 IDEA 的 Run->Edit Configurations… 菜单,在弹出的 Run/Debug Configurations 窗口,点击左上角的 + 按钮,然后选择选择 Remove JVM Debug ,在右侧配置框中,填写相关信息如下:
· Name: 调试描述名称,这里可以按照自己喜好填写
· Configuration:
· Host: 填写远程服务器IP地址,这里填写 192.168.1.100
· Port: 填写调试端口,这里填写 9999 或者你修改过的端口
其他保持不变,点击 Apply 按钮,得到如下配置信息:
点击 OK 按钮保存并返回到 IDEA 主窗口
确保在窗口工具栏有绿色🔨的右侧选择的是上述填写 Name 的描述配置,否则在下拉框中选择刚才的配置。
运行调试
运行远程调试分成两个步骤,一是启动程序,二是调试工具联接到运行的程序上。
在远程服务器上运行如下命令:
bin/addax.sh -d job/job.json
如果运行正常,会得到如下信息:
bin/addax.sh -d job/job.json
Listening for transport dt_socket at address: 9999
表示程序监听在 9999 端口上,等待联接。
返回 IDEA 窗口,点击工具栏上绿色 DEBUG 小虫按钮,开始调试,如果运行正常,会得到类似下图的调试窗口:
数据转换
Transformer 定义
在数据同步、传输过程中,存在用户对于数据传输进行特殊定制化的需求场景,包括裁剪列、转换列等工作,可以借助ETL的T过程实现(Transformer)。Addax包含了完成的E(Extract)、T(Transformer)、L(Load)支持。
运行模型
UDF 函数
dx_substr
dx_substr(idx, pos, length) -> str
参数
· idx: 字段编号,对应record中第几个字段
· pos: 字段值的开始位置
· length: 目标字段长度
返回:从字符串的指定位置(包含)截取指定长度的字符串。如果开始位置非法抛出异常。如果字段为空值,直接返回(即不参与本transformer)
dx_pad
dx_pad(idx, flag, length, chr)
参数
· idx: 字段编号,对应record中第几个字段
· flag: “l”,“r”, 指示是在头进行填充,还是尾进行填充
· length: 目标字段长度
· chr: 需要填充的字符
返回:如果源字符串长度小于目标字段长度,按照位置添加pad字符后返回。如果长于,直接截断(都截右边)。如果字段为空值,转换为空字符串进行pad,即最后的字符串全是需要pad的字符
举例:
· dx_pad(1,“l”,“4”,“A”): 如果 column 1 的值为 xyz=> Axyz,则转换后的值为 xyzzzzz => xyzz
· dx_pad(1,“r”,“4”,“A”), 如果 column 1 的值为 xyz=> xyzA,值为 xyzzzzz => xyzz
dx_replace
dx_replace(idx, pos, length, str) -> str
参数
· idx: 字段编号,对应record中第几个字段
· pos: 字段值的开始位置
· length: 需要替换的字段长度
· str: 要替换的字符串
返回:从字符串的指定位置(包含)替换指定长度的字符串。如果开始位置非法抛出异常。如果字段为空值,直接返回(即不参与本transformer)
举例:
· dx_replace(1,“2”,“4”,“"): 如果 column 1 的值为 addaxTest, 则转换为 daest
· dx_replace(1,“5”,“10”,“") 如果 column 1 的值为 addaxTest 则转换为 data
dx_filter
dx_filter(idx, operator, expr) -> str
参数:
· idx: 字段编号,对应record中第几个字段
· operator: 运算符, 支持 like, not like, >, =, <, >=, !=, <=
· expr: 正则表达式(java正则表达式)、值
· str: 要替换的字符串
返回:
· 如果匹配正则表达式,返回Null,表示过滤该行。不匹配表达式时,表示保留该行。(注意是该行)。对于 >, =, <都是对字段直接compare的结果.
· like , not like 是将字段转换成字符类型,然后和目标正则表达式进行全匹配。
· >, =, <, >=, !=, <= ,按照类型进行比较, 数值类型按大小比较,字符及布尔类型按照字典序比较
· 如果目标字段为空(null),对于 = null 的过滤条件,将满足条件,被过滤。!=null 的过滤条件,null不满足过滤条件,不被过滤。 like,字段为null不满足条件,不被过滤,和 not like,字段为null满足条件,被过滤。
举例
· dx_filter(1,“like”,“dataTest”)
· dx_filter(1,”>=”,“10”)
关联filter暂不支持,即多个字段的联合判断,函参太过复杂,用户难以使用。
dx_groovy
dx_groovy(code, package) -> record
参数
· coee: 符合 groovy 编码要求的代码
· package: extraPackage, 列表或者为空
返回
Record 数据类型
注意:
· dx_groovy 只能调用一次。不能多次调用。
· groovy code 中支持 java.lang, java.util 的包,可直接引用的对象有 record ,以及element下的各种column(BoolColumn.class,BytesColumn.class,DateColumn.class,DoubleColumn.class,LongColumn.class,StringColumn.class)。不支持其他包,如果用户有需要用到其他包,可设置extraPackage,注意extraPackage不支持第三方jar包。
· groovy code 中,返回更新过的 Record(比如record.setColumn(columnIndex, new StringColumn(newValue));),或者null。返回null表示过滤此行。
· 用户可以直接调用静态的Util方式(GroovyTransformerStaticUtil)
举例:
groovy 实现的 subStr
Stringcode=“Column column = record.getColumn(1);\n”+
" String oriValue = column.asString();\n"+
" String newValue = oriValue.substring(0, 3);\n"+
" record.setColumn(1, new StringColumn(newValue));\n"+
" return record;";
dx_groovy(record);
groovy 实现的Replace
Stringcode2=“Column column = record.getColumn(1);\n”+
" String oriValue = column.asString();\n"+
" String newValue = “****” + oriValue.substring(3, oriValue.length());\n"+
" record.setColumn(1, new StringColumn(newValue));\n"+
" return record;";
groovy 实现的Pad
Stringcode3=“Column column = record.getColumn(1);\n”+
" String oriValue = column.asString();\n"+
" String padString = “12345”;\n"+
" String finalPad = “”;\n"+
" int NeedLength = 8 - oriValue.length();\n"+
" while (NeedLength > 0) {\n"+
“\n”+
" if (NeedLength >= padString.length()) {\n"+
" finalPad += padString;\n"+
" NeedLength -= padString.length();\n"+
" } else {\n"+
" finalPad += padString.substring(0, NeedLength);\n"+
" NeedLength = 0;\n"+
" }\n"+
" }\n"+
" String newValue= finalPad + oriValue;\n"+
" record.setColumn(1, new StringColumn(newValue));\n"+
" return record;";
Job定义
本例中,配置4个UDF。
{
“job”:{
“setting”:{
“speed”:{
“channel”:1
}
},
“content”:{
“reader”:{
“name”:“streamreader”,
“parameter”:{
“column”:[
{
“value”:“My name is xxxx”,
“type”:“string”
},
{
“value”:“password is Passw0rd”,
“type”:“string”
},
{
“value”:19890604,
“type”:“long”
},
{
“value”:“1989-06-04 00:00:00”,
“type”:“date”
},
{
“value”:true,
“type”:“bool”
},
{
“value”:“test”,
“type”:“bytes”
},
{
“random”:“0,10”,
“type”:“long”
}
],
“sliceRecordCount”:10
}
},
“writer”:{
“name”:“streamwriter”,
“parameter”:{
“print”:true,
“encoding”:“UTF-8”
}
},
“transformer”:[
{
“name”:“dx_replace”,
“parameter”:{
“columnIndex”:0,
“paras”:[
“11”,
“6”,
“wgzhao”
]
}
},
{
“name”:“dx_substr”,
“parameter”:{
“columnIndex”:1,
“paras”:[
“0”,
“12”
]
}
},
{
“name”:“dx_map”,
“parameter”:{
“columnIndex”:2,
“paras”:[
“^”,
“2”
]
}
},
{
“name”:“dx_filter”,
“parameter”:{
“columnIndex”:6,
“paras”:[
“<”,
“5”
]
}
}
]
}
}
} 复制代码
自定义函数
如果自带的函数不满足数据转换要求,我们可以在 transformer 编写满足 groovy 规范要求的代码,下面给出一个完整的例子
上述 transformer 代码针对每条记录的前面两个字段做了修改,对第一个字段的字符串,在字符串前面增加 Header_ 字符;第二个整数字段值进行倍增处理。最后执行的结果如下:
$ bin/addax.sh job/transformer_demo.json
/ _ \ || ||
/ /\ \ __|| __|| __ ___ __
| _ |/ |/ _|/ `\ / /
||||(||(||(||> <
_||/_,|_,|_,//_\
:: Addax version :: (v4.0.2-SNAPSHOT)
2021-08-04 15:45:56.421 [ main] INFO VMInfo - VMInfo# operatingSystem class=> com.sun.management.internal.OperatingSystemImpl
2021-08-04 15:45:56.443 [ main] INFO Engine -
…
2021-08-04 15:45:56.458 [ main] INFO PerfTrace - PerfTrace traceId=job_-1, isEnable=false, priority=0
2021-08-04 15:45:56.459 [ main] INFO JobContainer - Addax jobContainer starts job.
2021-08-04 15:45:56.460 [ main] INFO JobContainer - Set jobId=0
2021-08-04 15:45:56.470 [ job-0] INFO JobContainer - Addax Reader.Job [streamreader]do prepare work .
2021-08-04 15:45:56.471 [ job-0] INFO JobContainer - Addax Writer.Job [streamwriter]do prepare work .
2021-08-04 15:45:56.471 [ job-0] INFO JobContainer - Job set Channel-Number to 1 channels.
2021-08-04 15:45:56.472 [ job-0] INFO JobContainer - Addax Reader.Job [streamreader] splits to [1] tasks.
2021-08-04 15:45:56.472 [ job-0] INFO JobContainer - Addax Writer.Job [streamwriter] splits to [1] tasks.
2021-08-04 15:45:56.498 [ job-0] INFO JobContainer - Scheduler starts [1] taskGroups.
2021-08-04 15:45:56.505 [ taskGroup-0] INFO TaskGroupContainer - taskGroupId=[0] start [1] channels for[1] tasks.
2021-08-04 15:45:56.517 [ taskGroup-0] INFO Channel - Channel set byte_speed_limit to -1, No bps activated.
2021-08-04 15:45:56.517 [ taskGroup-0] INFO Channel - Channel set record_speed_limit to -1, No tps activated.
2021-08-04 15:45:56.520 [ taskGroup-0] INFO TransformerUtil - user config transformers [[dx_groovy]], loading…
2021-08-04 15:45:56.531 [ taskGroup-0] INFO TransformerUtil - 1 of transformer init success. name=dx_groovy, isNative=trueparameter=
{“code”:“record.setColumn(0, new StringColumn(‘Header_’ + record.getColumn(0).asString()));record.setColumn(1, new LongColumn(record.getColumn(1).asLong() * 2));return record;”}
Header_Addax 2 1989-06-04 00:00:01 true test
Header_Addax 4 1989-06-03 00:00:01 true test
Header_Addax 6 1989-06-02 00:00:01 true test
Header_Addax 8 1989-06-01 00:00:01 true test
Header_Addax 10 1989-05-31 00:00:01 true test
Header_Addax 12 1989-05-30 00:00:01 true test
Header_Addax 14 1989-05-29 00:00:01 true test
Header_Addax 16 1989-05-28 00:00:01 true test
Header_Addax 18 1989-05-27 00:00:01 true test
Header_Addax 20 1989-05-26 00:00:01 true test
2021-08-04 15:45:59.515 [ job-0] INFO AbstractScheduler - Scheduler accomplished all tasks.
2021-08-04 15:45:59.517 [ job-0] INFO JobContainer - Addax Writer.Job [streamwriter]do post work.
2021-08-04 15:45:59.518 [ job-0] INFO JobContainer - Addax Reader.Job [streamreader]do post work.
2021-08-04 15:45:59.521 [ job-0] INFO JobContainer - PerfTrace not enable!
2021-08-04 15:45:59.524 [ job-0] INFO StandAloneJobContainerCommunicator - Total 10 records, 330 bytes | Speed 110B/s, 3 records/s | Error 0 records, 0 bytes |
All Task WaitWriterTime 0.000s | All Task WaitReaderTime 0.000s | Transformer Success 10 records | Transformer Error 0 records | Transformer Filter 0 records
| Transformer usedTime 0.383s | Percentage 100.00%
2021-08-04 15:45:59.527 [ job-0] INFO JobContainer -
任务启动时刻 : 2021-08-04 15:45:56
任务结束时刻 : 2021-08-04 15:45:59
任务总计耗时 : 3s
任务平均流量 : 110B/s
记录写入速度 : 3rec/s
读出记录总数 : 10
读写失败总数 : 0
2021-08-04 15:45:59.528 [ job-0] INFO JobContainer -
Transformer成功记录总数 : 10
Transformer失败记录总数 : 0
Transformer过滤记录总数 : 0 复制代码
计量和脏数据
Transform过程涉及到数据的转换,可能造成数据的增加或减少,因此更加需要精确度量,包括:
· Transform的入参Record条数、字节数。
· Transform的出参Record条数、字节数。
· Transform的脏数据Record条数、字节数。
· 如果是多个Transform,某一个发生脏数据,将不会再进行后面的transform,直接统计为脏数据。
· 目前只提供了所有Transform的计量(成功,失败,过滤的count,以及transform的消耗时间)。
涉及到运行过程的计量数据展现定义如下:
Total 1000000 records, 22000000 bytes | Transform 100000 records(in), 10000 records(out)| Speed 2.10MB/s, 100000 records/s | Error 0 records, 0 bytes | Percentage 100.00%
注意,这里主要记录转换的输入输出,需要检测数据输入输出的记录数量变化。
涉及到最终作业的计量数据展现定义如下:
任务启动时刻 : 2015-03-10 17:34:21
任务结束时刻 : 2015-03-10 17:34:31
任务总计耗时 : 10s
任务平均流量 : 2.10MB/s
记录写入速度 : 100000rec/s
转换输入总数 : 1000000
转换输出总数 : 1000000
读出记录总数 : 1000000
同步失败总数 : 0
注意,这里主要记录转换的输入输出,需要检测数据输入输出的记录数量变化
/ _ \ || ||
/ /\ \ __|| __|| __ ___ __
| _ |/ |/ _|/ `\ / /
||||(||(||(||> <
_||/_,|_,|_,//_\
:: Addax version :: (v4.0.2-SNAPSHOT)
2021-08-04 15:45:56.421 [ main] INFO VMInfo - VMInfo# operatingSystem class=> com.sun.management.internal.OperatingSystemImpl
2021-08-04 15:45:56.443 [ main] INFO Engine -
…
2021-08-04 15:45:56.458 [ main] INFO PerfTrace - PerfTrace traceId=job_-1, isEnable=false, priority=0
2021-08-04 15:45:56.459 [ main] INFO JobContainer - Addax jobContainer starts job.
2021-08-04 15:45:56.460 [ main] INFO JobContainer - Set jobId=0
2021-08-04 15:45:56.470 [ job-0] INFO JobContainer - Addax Reader.Job [streamreader]do prepare work .
2021-08-04 15:45:56.471 [ job-0] INFO JobContainer - Addax Writer.Job [streamwriter]do prepare work .
2021-08-04 15:45:56.471 [ job-0] INFO JobContainer - Job set Channel-Number to 1 channels.
2021-08-04 15:45:56.472 [ job-0] INFO JobContainer - Addax Reader.Job [streamreader] splits to [1] tasks.
2021-08-04 15:45:56.472 [ job-0] INFO JobContainer - Addax Writer.Job [streamwriter] splits to [1] tasks.
2021-08-04 15:45:56.498 [ job-0] INFO JobContainer - Scheduler starts [1] taskGroups.
2021-08-04 15:45:56.505 [ taskGroup-0] INFO TaskGroupContainer - taskGroupId=[0] start [1] channels for[1] tasks.
2021-08-04 15:45:56.517 [ taskGroup-0] INFO Channel - Channel set byte_speed_limit to -1, No bps activated.
2021-08-04 15:45:56.517 [ taskGroup-0] INFO Channel - Channel set record_speed_limit to -1, No tps activated.
2021-08-04 15:45:56.520 [ taskGroup-0] INFO TransformerUtil - user config transformers [[dx_groovy]], loading…
2021-08-04 15:45:56.531 [ taskGroup-0] INFO TransformerUtil - 1 of transformer init success. name=dx_groovy, isNative=trueparameter=
{“code”:“record.setColumn(0, new StringColumn(‘Header_’ + record.getColumn(0).asString()));record.setColumn(1, new LongColumn(record.getColumn(1).asLong() * 2));return record;”}
Header_Addax 2 1989-06-04 00:00:01 true test
Header_Addax 4 1989-06-03 00:00:01 true test
Header_Addax 6 1989-06-02 00:00:01 true test
Header_Addax 8 1989-06-01 00:00:01 true test
Header_Addax 10 1989-05-31 00:00:01 true test
Header_Addax 12 1989-05-30 00:00:01 true test
Header_Addax 14 1989-05-29 00:00:01 true test
Header_Addax 16 1989-05-28 00:00:01 true test
Header_Addax 18 1989-05-27 00:00:01 true test
Header_Addax 20 1989-05-26 00:00:01 true test
2021-08-04 15:45:59.515 [ job-0] INFO AbstractScheduler - Scheduler accomplished all tasks.
2021-08-04 15:45:59.517 [ job-0] INFO JobContainer - Addax Writer.Job [streamwriter]do post work.
2021-08-04 15:45:59.518 [ job-0] INFO JobContainer - Addax Reader.Job [streamreader]do post work.
2021-08-04 15:45:59.521 [ job-0] INFO JobContainer - PerfTrace not enable!
2021-08-04 15:45:59.524 [ job-0] INFO StandAloneJobContainerCommunicator - Total 10 records, 330 bytes | Speed 110B/s, 3 records/s | Error 0 records, 0 bytes |
All Task WaitWriterTime 0.000s | All Task WaitReaderTime 0.000s | Transformer Success 10 records | Transformer Error 0 records | Transformer Filter 0 records
| Transformer usedTime 0.383s | Percentage 100.00%
2021-08-04 15:45:59.527 [ job-0] INFO JobContainer -
任务启动时刻 : 2021-08-04 15:45:56
任务结束时刻 : 2021-08-04 15:45:59
任务总计耗时 : 3s
任务平均流量 : 110B/s
记录写入速度 : 3rec/s
读出记录总数 : 10
读写失败总数 : 0
2021-08-04 15:45:59.528 [ job-0] INFO JobContainer -
Transformer成功记录总数 : 10
Transformer失败记录总数 : 0
Transformer过滤记录总数 : 0 复制代码
插件开发
本指南主要面向那些需要开发符合自己需求的 Addax 插件开发人员。
插件机制
Addax 为了应对不同数据源的差异、同时提供一致地同步原语和扩展能力,采用了 框架 + 插件 的模式:
· 插件只需关心数据的读取或者写入本身。
· 而同步的共性问题,比如:类型转换、性能、统计,则交由框架来处理。
作为插件开发人员,则需要关注两个问题:
数据源本身的读写数据正确性。
1
如何与框架沟通、合理正确地使用框架。
1
插件视角看框架
逻辑执行模型
插件开发者不用关心太多,基本只需要关注特定系统读和写,以及自己的代码在逻辑上是怎样被执行的,哪一个方法是在什么时候被调用的。在此之前,需要明确以下概念:
· Job: 用以描述从一个源头到一个目的端的同步作业,是数据同步的最小业务单元。比如:从一张 MySQL 的表同步到 PostgreSQL 的一个表。
· Task: 为最性能大化而把 Job 拆分得到的最小执行单元。比如:读一张有 1024 个分表的 MySQL 分库分表的 Job,拆分成 1024 个读 Task,用若干个并发执行。
· TaskGroup: 一组 Task 集合。在同一个 TaskGroupContainer 执行下的 Task 集合称之为 TaskGroup
· JobContainer: Job 执行器,负责 Job 全局拆分、调度、前置语句和后置语句等工作的工作单元。类似 Yarn 中的 JobTracker
· TaskGroupContainer: TaskGroup 执行器,负责执行一组 Task 的工作单元,类似 Yarn 中的 TaskTracker。
简而言之, Job拆分成Task,分别在框架提供的容器中执行,插件只需要实现 Job 和 Task 两部分逻辑。
编程接口
那么,Job 和 Task 的逻辑应是怎么对应到具体的代码中的?
首先,插件的入口类必须扩展 Reader 或 Writer 抽象类,并且实现分别实现 Job 和 Task 两个内部抽象类,Job 和 Task 的实现必须是 内部类 的形式,原因见 加载原理 一节。以 Reader 为例:
public class SomeReader
extends Reader
{
public static class Job
extends Reader.Job
{
@Override
public void init()
{
}
@Override
public void prepare()
{
}
@Override
public List<Configuration> split(int adviceNumber)
{
return null;
}
@Override
public void post()
{
}
@Override
public void destroy()
{
}
}
public static class Task
extends Reader.Task
{
@Override
public void init()
{
}
@Override
public void prepare()
{
}
@Override
public void startRead(RecordSender recordSender)
{
}
@Override
public void post()
{
}
@Override
public void destroy()
{
}
}
} 复制代码
Job 接口功能如下:
· init: Job对象初始化工作,此时可以通过 super.getPluginJobConf() 获取与本插件相关的配置。读插件获得配置中 reader 部分,写插件获得 writer 部分。
· prepare: 全局准备工作,比如 MySQL 清空目标表。
· split: 拆分 Task。参数 adviceNumber 框架建议的拆分数,一般是运行时所配置的并发度。值返回的是 Task 的配置列表。
· post: 全局的后置工作,比如 MySQL writer 同步完影子表后的 rename 操作。
· destroy: Job对象自身的销毁工作。
Task 接口功能如下:
· init:Task 对象的初始化。此时可以通过 super.getPluginJobConf() 获取与本 Task 相关的配置。这里的配置是 Job#split 方法返回的配置列表中的其中一个。
· prepare:局部的准备工作。
· startRead: 从数据源读数据,写入到 RecordSender 中。RecordSender 会把数据写入连接 Reader 和 Writer 的缓存队列。
· startWrite:从 RecordReceiver 中读取数据,写入目标数据源。RecordReceiver 中的数据来自 Reader 和 Writer 之间的缓存队列。
· post: 局部的后置工作。
· destroy: Task 对象自身的销毁工作。
需要注意的是:
· Job 和 Task 之间一定不能有共享变量,因为分布式运行时不能保证共享变量会被正确初始化。两者之间只能通过配置文件进行依赖。
· prepare 和 post 在 Job 和 Task 中都存在,插件需要根据实际情况确定在什么地方执行操作。
框架按照如下的顺序执行 Job 和 Task 的接口:
上图中,黄色表示 Job 部分的执行阶段,蓝色表示 Task 部分的执行阶段,绿色表示框架执行阶段。
相关类关系如下:
插件定义
在每个插件的项目中,都有一个plugin.json文件,这个文件定义了插件的相关信息,包括入口类。例如:
{
“name”: “mysqlwriter”,
“class”: “com.wgzhao.addax.plugin.writer.mysqlwriter.MysqlWriter”,
“description”: “Use Jdbc connect to database, execute insert sql.”,
“developer”: “wgzhao”
}
· name: 插件名称,大小写敏感。框架根据用户在配置文件中指定的名称来搜寻插件。 十分重要 。
· class: 入口类的全限定名称,框架通过反射创建入口类的实例。十分重要 。
· description: 描述信息。
· developer: 开发人员。
打包发布
Addax 使用 assembly 打包,打包命令如下:
mvn clean package
mvn package assembly:single
Addax 插件需要遵循统一的目录结构:
尽管框架加载插件时,会把 ${PLUGIN_HOME} 下所有的 jar 包添加到 classpath 环境变量中,但还是推荐依赖库的jar和插件本身的jar分开存放。
特别提醒
插件的目录名字必须和 plugin.json 中定义的插件名称一致。
${ADDAX_HOME}
├── bin
│ ├── addax.sh
├── conf
│ ├── core.json
│ └── logback.xml
├── job
├── lib
│ ├── addax-common-4.0.5.jar
│ ├── addax-core-4.0.7-SNAPSHOT.jar
│ ├── addax-rdbms-4.0.5.jar
│ ├── addax-storage-4.0.5.jar
│ ├── addax-transformer-4.0.5.jar
│ ├── aircompressor-0.21.jar
│ ├── annotations-2.0.3.jar
│ ├── checker-qual-2.11.1.jar
│ ├── commons-beanutils-1.9.4.jar
├── log
├── plugin
│ ├── reader
│ │ ├── cassandrareader
│ │ │ ├── cassandrareader-4.0.5.jar
│ │ │ ├── libs
│ │ │ │ ├──
│ │ │ ├── plugin.json
│ │ │ └── plugin_job_template.json
│ └── writer
│ ├── cassandrawriter
│ │ ├── cassandrawriter-4.0.5.jar
│ │ ├── libs
│ │ │ ├──
│ │ ├── plugin.json
│ │ └── plugin_job_template.json
· ${ADDAX_HOME}/bin: 可执行程序目录
· ${ADDAX_HOME}/conf: 框架配置目录
· ${ADDAX_HOME}/lib: 框架依赖库目录
· ${ADDAX_HOME}/shared: 插件依赖目录
· ${ADDAX_HOME}/plugin: 插件目录
插件目录分为 reader 和 writer 子目录,读写插件分别存放。插件目录规范如下:
· ${PLUGIN_HOME}/libs: 插件的依赖库,为了减少程序包大小,这些依赖包都是指向 shared 目录的符号链接
· ${PLUGIN_HOME}/plugin-name-version.jar: 插件本身的jar。
· ${PLUGIN_HOME}/plugin.json: 插件描述文件。 复制代码
配置文件
Addax 使用 json 作为配置文件的格式。一个典型的 Addax 任务配置如下:
{
“job”: {
“setting”: {
“speed”: {
“byte”: -1,
“channel”: 1
}
},
“content”: {
“reader”: {
“name”: “postgresqlreader”,
“parameter”: {
“username”: “pgtest”,
“password”: “pgtest”,
“column”: [
“"
],
“connection”: [
{
“table”: [
“addax_tbl”
],
“jdbcUrl”: [
“jdbc:postgresql://localhost:5432/pgtest”
]
}
]
}
},
“writer”: {
“name”: “postgresqlwriter”,
“parameter”: {
“column”: [
"”
],
“preSql”: [
“truncate table @table”
],
“connection”: [
{
“jdbcUrl”: “jdbc:postgresql://127.0.0.1:5432/pgtest”,
“table”: [
“addax_tbl1”
]
}
],
“username”: “pgtest”,
“password”: “pgtest”,
“writeMode”: “insert”
}
}
}
}
} 复制代码
Addax 框架有 core.json 配置文件,指定了框架的默认行为。任务的配置里头可以指定框架中已经存在的配置项,而且具有更高的优先级,会覆盖 core.json 中的默认值。
配置中job.content.reader.parameter 的 value 部分会传给 Reader.Job;job.content.writer.parameter 的 value 部分会传给Writer.Job , Reader.Job 和 Writer.Job 可以通过 super.getPluginJobConf() 来获取。
如何设计配置参数
配置文件的设计是插件开发的第一步!
任务配置中 reader 和 writer 下 parameter 部分是插件的配置参数,插件的配置参数应当遵循以下原则:
· 驼峰命名:所有配置项采用小驼峰命名法,首字母小写。
· 正交原则:配置项必须正交,功能没有重复,没有潜规则。
· 富类型:合理使用json的类型,减少无谓的处理逻辑,减少出错的可能。
· 使用正确的数据类型。比如,bool 类型的值使用 true/false,而非 “yes”/“true”/0 等。
· 合理使用集合类型,比如,用数组替代有分隔符的字符串。
· 类似通用:遵守同一类型的插件的习惯,比如关系型数据库的 connection 参数都是如下结构:
{
“connection”: [
{
“table”: [
“table_1”,
“table_2”
],
“jdbcUrl”: [
“jdbc:mysql://127.0.0.1:3306/database_1”,
“jdbc:mysql://127.0.0.2:3306/database_1_slave”
]
},
{
“table”: [
“table_3”,
“table_4”
],
“jdbcUrl”: [
“jdbc:mysql://127.0.0.3:3306/database_2”,
“jdbc:mysql://127.0.0.4:3306/database_2_slave”
]
}
]
} 复制代码
如何使用 Configuration 类
为了简化对 json 的操作,Addax 提供了简单的 DSL 配合 Configuration 类使用。
Configuration 提供了常见的 get, 带类型get,带默认值get,set 等读写配置项的操作,以及 clone, toJSON 等方法。配置项读写操作都需要传入一个 path 做为参数,这个 path 就是 Addax 定义的 DSL。语法有两条:
子map用 .key 表示,path 的第一个点省略。
1
数组元素用 [index] 表示。
1
比如操作如下json:
{
“a”: {
“b”: {
“c”: 2
},
“f”: [
1,
2,
{
“g”: true,
“h”: false
},
4
]
},
“x”: 4
} 复制代码
比如调用 configuration.get(path) 方法,当path为如下值的时候得到的结果为:
· x:4
· a.b.c:2
· a.b.c.d:null
· a.b.f[0]:1
· a.b.f[2].g:true 复制代码
注意,因为插件看到的配置只是整个配置的一部分。使用 Configuration 对象时,需要注意当前的根路径是什么。
更多 Configuration 的操作请参考 Configuration.java 。
插件数据传输
跟一般的 生产者-消费者 模式一样,Reader 插件和 Writer 插件之间也是通过 channel 来实现数据的传输的。channel 可以是内存的,也可能是持久化的,插件不必关心。插件通过 RecordSender 往 channel 写入数据,通过 RecordReceiver 从 channel 读取数据。
channel 中的一条数据为一个 Record 的对象,Record 中可以放多个 Column 对象,这可以简单理解为数据库中的记录和列。
Record 有如下方法:
public interface Record
{
// 加入一个列,放在最后的位置
void addColumn(Column column);
// 在指定下标处放置一个列
void setColumn(int i, final Column column);
// 获取一个列
Column getColumn(int i);
// 转换为json String
String toString();
// 获取总列数
int getColumnNumber();
// 计算整条记录在内存中占用的字节数
int getByteSize();
} 复制代码
因为 Record 是一个接口,Reader 插件首先调用 RecordSender.createRecord() 创建一个 Record 实例,然后把 Column 一个个添加到 Record 中。
Writer 插件调用 RecordReceiver.getFromReader() 方法获取 Record,然后把 Column 遍历出来,写入目标存储中。当 Reader 尚未退出,传输还在进行时,如果暂时没有数据 RecordReceiver.getFromReader() 方法会阻塞直到有数据。如果传输已经结束,会返回null,Writer 插件可以据此判断是否结束 startWrite 方法。
类型转换
为了规范源端和目的端类型转换操作,保证数据不失真,Addax 支持六种内部数据类型:
· Long:定点数(Int、Short、Long、BigInteger等)。
· Double:浮点数(Float、Double、BigDecimal(无限精度)等)。
· String:字符串类型,底层不限长,使用通用字符集(Unicode)。
· Date:日期类型。
· Timestamp: 时间戳
· Bool:布尔值。
· Bytes:二进制,可以存放诸如MP3等非结构化数据。
对应地,有 DateColumn、LongColumn、DoubleColumn、BytesColumn、StringColumn 、BoolColumn 和 TimestampColumn 七种 Column 的实现。
Column 除了提供数据相关的方法外,还提供一系列以 as 开头的数据类型转换转换方法。
Addax的内部类型在实现上会选用不同的java类型:
内部类型 实现类型 备注
Date java.util.Date
Timestamp java.sql.Timestamp 可以精确到纳秒
Long java.math.BigInteger 使用无限精度的大整数,保证不失真
Double java.lang.String 用String表示,保证不失真
Bytes byte[]
String java.lang.String
Bool java.lang.Boolean
类型之间相互转换的关系如下:
脏数据处理
什么是脏数据
目前主要有三类脏数据:
Reader读到不支持的类型、不合法的值。
1
不支持的类型转换,比如:Bytes 转换为 Date。
1
写入目标端失败,比如:写 MySQL 整型长度超长。
1
如何处理脏数据
在 Reader.Task 和 Writer.Task 中,通过 AbstractTaskPlugin.getPluginCollector() 可以拿到一个 TaskPluginCollector,它提供了一系列 collectDirtyRecord 的方法。当脏数据出现时,只需要调用合适的 collectDirtyRecord 方法,把被认为是脏数据的 Record 传入即可。
用户可以在任务的配置中指定脏数据限制条数或者百分比限制,当脏数据超出限制时,框架会结束同步任务,退出。插件需要保证脏数据都被收集到,其他工作交给框架就好。
加载原理
框架扫描 plugin/reader 和 plugin/writer目录,加载每个插件的 plugin.json 文件。
1
以 plugin.json 文件中 name 为 key,索引所有的插件配置。如果发现重名的插件,框架会异常退出。
1
用户在插件中在 reader/writer 配置的 name 字段指定插件名字。框架根据插件的类型(reader/writer)和插件名称去插件的路径下扫描所有的jar,加入 classpath。
1
根据插件配置中定义的入口类,框架通过反射实例化对应的 Job 和 Task 对象。
最新经典文章,欢迎关注公众号