DataX插件开发指南.docx
《DataX插件开发指南.docx》由会员分享,可在线阅读,更多相关《DataX插件开发指南.docx(14页珍藏版)》请在冰点文库上搜索。
DataX插件开发指南
Linux公社(LinuxIDC.com)于2006年9月25日注册并开通网站,Linux现在已经成为一种广受关注和支持的一种操作系统,IDC是互联网数据中心,LinuxIDC就是关于Linux的数据中心。
LinuxIDC.com提供包括Ubuntu,Fedora,SUSE技术,以及最新IT资讯等Linux专业类网站。
DataX插件开发指南
版本号
修改内容
修改日期
修改人
V0.1
创建
2011-09-08
何健超
一、概述
DataX是一个在不同类型的数据库(文件系统)之间交换数据的工具,采用“框架+插件”的结构,框架相当于一个数据中转平台,而插件则为访问不同类型的数据库(文件系统)提供实现。
DataX插件分为Reader和Writer两类。
Reader负责从数据源端读取数据到Storage(交换空间),Writer负责将Storage中的数据写入到数据目的端。
Storage可以适配不同种类的Reader和Writer,从而实现数据同步。
目前DataX版本已经提供的Reader插件如下:
1、hdfsreader:
支持从hdfs文件系统获取数据。
2、mysqlreader:
支持从mysql数据库获取数据。
3、sqlserverreader:
支持从sqlserver数据库获取数据。
4、oraclereader:
支持从oracle数据库获取数据。
5、streamreader:
支持从stream流获取数据(常用于测试)
6、httpreader:
支持从httpURL获取数据。
提供的Writer插件如下:
1、hdfswriter:
支持向hdbf写入数据。
2、mysqlwriter:
支持向mysql写入数据。
3、sqlserverwriter:
支持向sqlserver写入数据。
4、oraclewriter:
支持向oracle写入数据。
5、streamwriter:
支持向stream流写入数据。
(常用于测试)
用户可以根据需要开发自己的Reader&Writer插件。
现在以HttpReader和StreamWriter插件为例,使用eclipse分别说明Reader和Writer插件开发过程。
二、Reader插件开发(以httpreader为例)
1、确定插件所需配置的参数
确定插件参数,并在common.plugin.ParamsKey.java中,创建静态类HttpReader,尤其注意对参数的注释尽量参照源码规范,DataX运行时,会根据此处声明的参数和注释生成对应的模板Job_xml.此处参数设置非常重要,如图:
图1
2、构建相应包和类结构
在源码文件的plugins.reader包下构建httpreader包,再在httpreader包下创建类HttpReader,并让之继承common.plugin.Reader.
图2
图3
3、实现重载方法
获得图3所示效果:
现在开始分别实现init(),connectToDb(),startRead(LineSendersender),finish()四个方法。
(1)Init():
通过从Reader间接继承自DefaultPlugin的PluginParam类型的参数param获取配置httpreader插件的参数(此处可以对参数进行检查和格式处理等操作),如图:
图4
(2)connectToDb():
本插件不需要此操作,函数为空实现。
(在数据库相关插件中,主要操作是通过DbSource.getConnection(keyId)获取connection),如在mysqlreader中,该函数为:
图5
(3)startRead(LineSendersender):
根据init()初始化的参数,连接相应的httpURL,获取其中数据并用BufferedReader封装,循环处理每行数据,调用sender.createLine()产生line,并通过line.addField(fieldStr)把每行数据切分成字段后组装成Line中的field,调用sender.sendToWriter(line)将此行数据写入Storage。
如图:
图6
(4)finish():
本插件不需要此操作,函数实现为空。
(在数据库相关插件中,主要完成关闭connection操作。
)如在mysqlreader中,该函数为:
图7
4、自定义split方法
截止到目前,已经实现了一个简单的httpreader(从一个httpURL读取数据),目前还未实现读取多个httpURL的功能,为此,可以在common.plugin.ParamsKey.java中的静态类HttpReader中,添加如下两项:
图8
然后让httpreader插件覆盖默认的split(PluginParamparam)方法。
操作如下:
(1)split(PluginParamparam)方法对param按照规则切分,生成List,DataX框架会根据此List产生插件实例执行任务。
图9
(2)为了清晰结构或者切分规则复杂,建议再构建类HttpURLSplitter辅助split方法实现切分,该类需要继承Splitter:
图10
图11
实现init()方法和split()方法即可:
图12
三、Writer插件开发(以streamwriter为例)
1、确定插件参数、构建相应包和类结构
与Reader插件类似,略过,截图如下:
图1
图2
图3
2、实现重载方法
(1)init()、connectToDb()、finish()三个方法与Reader中意义相似,此处略过。
(2)commint():
本插件不需要此操作,函数实现为空。
(在数据库相关插件中,主要完成提交任务操作。
)
(3)startWrite(LineReceiverlinereceiver):
根据init()初始化的参数,按照streamwriter插件配置信息,把linereceiver从Storage获取的数据通过xxx方法写入line中,循环处理每行数据(此处处理方式为直接输出),即可。
如图:
图4
四、插件运行配置(以httpreader为例)
1、注册插件
在conf/plugins.xml中,添加如下配置项,以注册插件:
图1
2、修改build.xml文件,打包
图2