datastage入门教程.docx
《datastage入门教程.docx》由会员分享,可在线阅读,更多相关《datastage入门教程.docx(19页珍藏版)》请在冰点文库上搜索。
datastage入门教程
简介
DataStage使用了Client-Server架构,服务器端存储所有的项目和元数据,客户端DataStageDesigner为整个ETL过程提供了一个图形化的开发环境,用所见即所得的方式设计数据的抽取清洗转换整合和加载的过程。
Datastage的可运行单元是DatastageJob,用户在Designer中对DatastageJob的进行设计和开发。
Datastage中的Job分为ServerJob,ParallelJob和MainframeJob,其中MainframeJob专供大型机上用,常用到的Job为ServerJob和ParallelJob。
本文将介绍如何使用ServerJob和ParallelJob进行ETL开发。
ServerJob
一个Job就是一个Datastage的可运行单元。
ServerJob是最简单常用的Job类型,它使用拖拽的方式将基本的设计单元-Stage拖拽到工作区中,并通过连线的方式代表数据的流向。
通过ServerJob,可以实现以下功能。
1.定义数据如何抽取
2.定义数据流程
3.定义数据的集合
4.定义数据的转换
5.定义数据的约束条件
6.定义数据的聚载
7.定义数据的写入
ParallelJob
ServerJob简单而强大,适合快速开发ETL流程。
ParallelJob与ServerJob的不同点在于其提供了并行机制,在支持多节点的情况下可以迅速提高数据处理效率。
ParallelJob中包含更多的Stage并用于不同的需求,每种Stage使用上的限制也往往大于ServerJob。
SequenceJob
SequenceJob用于Job之间的协同控制,使用图形化的方式来将多个Job汇集在一起,并指定了Job之间的执行顺序,逻辑关系和出错处理等。
数据源的连接
DataStage能够直接连接非常多的数据源,应用范围非常大,可连接的数据源包括:
∙文本文件
∙XML文件
∙企业应用程序,比如SAP、PeopleSoft、Siebel、OracleApplication
∙几乎所有的数据库系统,比如DB2、Oracle、SQLServer、SybaseASE/IQ、Teradata、Informix以及可通过ODBC连接的数据库等
∙WebServices
∙SAS、WebSphereMQ
ServerJob
ServerJob中的Stage综述
Stage是构成DatastageJob的基本元素,在ServerJob中,Stage可分为以下五种:
1.General
2.Database
3.File
4.Processing
5.RealTime
本节中将介绍如何使用Datastage开发一个ServerJob。
如图1所示:
图1.ServerJob
点击查看大图
SequentialFileStage
SequentialFileStage可用来从一个Sequential文件中获取源数据或将数据加载到一个Sequential文件中。
在使用SequentialFileStage时需要指定文件的路径和名称,文件的格式,列的定义和文件写入的类型(覆盖或追加)。
图2.SequentialFile属性框
点击查看大图
图3.SequentialFile列定义
点击查看大图
上图是本节例子中使用到的SequenceFile。
在Input页中,FileName参数代表文件的实际路径,如果文件不存在将会被自动建立。
UpdateAction中选择Overwriteexistingfile表示此文件在加载数据之前将被清空;在Format页中,定义文件的格式,例如分隔符,NULL值,首行是否为列定义等;在Column页中,需要输入文件的列定义。
HashFileStage
HashFile以主键将记录分成一个或多个部分的文件,在Datastage中通常被用做参考查找。
在进行参考查找的时候,HashFile文件会被加载到内存中,因此具有较高的查找效率。
和SequenceFile类似,使用HashFile时需要输入文件的实际地址,通过参数设置写入时的选项,并提供数据的列定义。
需要注意的是,HashFile需要指定主键,如果未指定,第一列被默认为主键。
进行参数查找时,使用主键值在HashFile中搜索,如果找到则返回该数据,如果未找到则返回NULL值。
图4.HashFile属性框
点击查看大图
TransformerStage
TransformerStage是一个重要的,功能强大的Stage。
它负责ETL过程中的数据转换操作。
在TransformerStage中可以指定数据的来源和目的地,匹配对应输入字段和输出字段,并指定转换规则和约束条件。
图5.TransformerStage列映射
点击查看大图
TransformerStage中分为5个区域:
左上方区域,是用表格形式描述的输入数据信息。
如果有多条输入数据流,则有很多表格。
本例中有一个输入,一个参照查询,因此左上方有两个表格。
右上方区域,是用表格形式描述的输出信息。
左下方区域为输入的元数据列定义,包括列名,类型和长度等属性。
右下方区域为输出的元数据列定义,包括列名,类型和长度等属性。
左上方和右上方的表格由带有流向的箭头连接,代表了字段的对应关系。
此例中,输入的数据只有一个字段EMPLOYEE_ID,通过此字段在HashFile中进行参照查找,获取EMPLOYEE_NAME字段。
如果在HashFile中找到了EMPLOYEE_NAME则将数据发送到输出端,这个条件是通过TransformerStage提高的约束功能实现,我们在约束中的定义为NOT(ISNULL(lkp_name.EMPLOYEE_ID))。
另外无论是否在HashFile中查找到对应的数据,我们都将数据记录到一个csv文件中,即对应的save_all输出。
ParallelJob
ParallelJob的Stage综述
与Serverjob相比,ParallelJob提供了更丰富的stage,增加了Development/Debug,Restructure和Transactional类的stage。
同时,对于一些在serverjob中可以在transformer中完成的功能,Paralleljob也提供了专用的stage以提高运行性能和开发效率,比如lookup,join,Compare等。
另外一个显著的区别是在ParallelJob中内置地支持job的并行运行,并行执行也就意味着数据在job中的各个stage见处理时需要处理partition和combination的问题,所以在开发job时,我们需要设定partition和combination的策略。
LookupDataSet与Lookup Stage
ParallelJob对lookup的实现做了一些调整,在ServerJob中,我们一般是用TransformerStage配合lookup数据源(一般是hash文件)来实现lookup,同一个transformer中可以同时完成多个lookup,类似于sql中的多表自然联接,如果lookup数据源使用的是databasestage而不是hashfile而且对于一条记录返回多条lookupdata的话,job会产生warning(hashfile的键唯一特性使得它不会存在这个问题,后面插入的重复数据会覆盖前面的同主键的数据)。
而在ParallelJob中,lookup需要用一个单独的stage来实现,transformer不再兼职lookup的“副业”,在一个lookupstage中,可以有一个主数据link和多个lookuplink。
同时,Parallel中的lookup还有以下的新特性
∙支持multirows,在一个lookupstage中对于一行主输入数据可以有一个lookuplink返回多于一行的lookup数据。
结果也会变成多行。
∙Parallel中不在支持hashfile,转而使用封装更强的DataSetstage,DataSet本质上也是hash数据结构,但对Job开发人员隐藏了实现细节,我们不用象开发ServerJob那样去手动设定详细参数
∙Parallel中除了支持等值lookup外,还直接支持Rangelookup和Caselesslookup。
这样我们在完成类似月份转换为季度性质的设计时就会非常的方便和自然。
类似于ServerJob中的hash文件,在ParallelJob中我们使用DataSet文件来缓存lookup数据,并加载到内存中,在DataSetstage中,我们只需要制定记录的主键和存储的文件名,Parallel引擎会为我们处理其他的操作。
但为了达到性能的最优化,我们有时需要制定DataSet的缓存策略和缓存大小,系统默认的缓存大小是3M,如果我们的lookup数据比较大,就需要设定合适的缓存大小,否则会严重影响lookup的性能。
图6.DataSet缓存设置
点击查看大图
SortStage
ParallelSortstage的行为类似于Sql中的orderby,但是比orderby提供了更多的选项。
在job中,Sortstage接收一个输入link并产生一个输出link。
对于写过sqlorderby或者排序程序的开发人员使用SortStage的基本功能应该是很简单的,但是要充分发挥Parallelstage的强大功能,我们还是需要注意下面几点:
∙并行还是串行执行,如果选择串行执行,那么Sortstage的行为就类似于ServerJob中的SortStage,整个输入数据都会按照设定的排序选项排序,但如果选择分区/并行排序,则只有每个分区内的输出是有序的,这在有些情况下是可以接受的,但在另外一些情况下会导致代码缺陷,需要根据sort的后续操作做出选择。
∙如果有可能,尽量在数据源端的数据库中进行排序,这样不但会提高数据排序的效率,还能大大减少job对内存,I/O的压力。
Sortstage只有在接收完输入之后才能完成排序,进而输出数据,使得job的后续stage都处于等待状态。
∙类似于orderby后面的字段列表,我们可以指定排序的方向,是升序还是降序,SortStage也可以指定对多个字段进行排序,排在前面的column称为主排序字段,如果排序字段中有某一个或几个字段已经是有序的,我么也可以指定其为有序,这样在排序的时候就可以提高排序的效率。
∙稳定排序(stablesort)/允许重复,stablesort默认是yes,这样如果两条记录sortkey相同的话,排序的输出和输入顺序将是相同的,如果没有选择允许重复,两条或者多条记录的sortkey相同的话,将只保留一条记录。
∙限制内存的使用,数据的排序操作是非常耗费内存的,如果不加限制,让所有的数据的排序都在内存中完成的话,job的其他操作或者其他job的执行的效率将受到严重影响,所有在SortStage中,我们可以设定此排序可以使用的最大内存数(M),这样我们在可以接受的排序效率和使用的内存数量之间找到平衡点。
Compare/Difference/ChangeCaptureStage
Compare,Difference和ChangeCaptureStage是Paralleljob中三个用于比较数据集合异同的stage,对于这三个stage本身的使用没有太多困难的地方,基本的参数和设置都很简明直观,我们的介绍主要集中在这三个stage在使用中的相同点和不同点上,一旦了解了这些stage的特点,使用的时候不但能根据需求选择正确的stage,也能根据stage特性知道需要设置哪些参数。
相同点:
∙都有两个输入,产生一个输出,
∙输入的数据主键的字段名相同,都需要指定需要比较的字段。
∙产生的结果数据中都会增加一个整型的结果字段,用于表示两行数据的比较结果
不同点:
∙CaptureChangeStage输出的是以after输入流为基础,外加changecode字段,适合和changeapply配合使用,把before输入流同步为和after一样。
∙DifferenceStage的输出是以before输入流为基础,外加changecode字段
∙CompareStage产生的结果包括before和after,以及changecode字段
下面是一个CaptureChangeStage的示例:
图7.CaptureChangeStage的示例
点击查看大图
1
2
3
4
5
6
7
BeforesourceSql:
SELECTk,v
FROM(values(1,1),(2,2),(3,3),(4,4),(5,5),(6,6),(7,7),(8,8),(9,9),(10,10))
astemp(k,v)orderbykasc
AftersourceSql:
SELECTk,v
FROM(values(1,1),(2,2),(11,11),(4,5),(5,5),(6,6),(7,7),(8,8),(9,9),(10,10))
astemp(k,v)orderbykasc
图8.CaptureChangeStage参数设置
点击查看大图
从以上设置可以看到,我们选择了明确指定主键,剩余column都当作value,对于比较的结果,如果结果相同,则从结果中删除,也就是我们只希望看到对Before数据做Delete,Edit,和insert后产生的差异,下图是我们job运行得到的结果:
图9.Comparsion结果
从以上的结果可以看到,before和after数据有三处差异,change_code的值对应为2,3,1。
分别表示对before执行Delete,Update,Insert产生的差异。
要同步这些差异,我们只需要对before数据执行相应的Delete,Update和Insert即可实现两个数据集合的同步。
FilterStage
FilterStage顾名思义是一个用于过滤的Stage,其作用类似于我们写sql中的where子句,而且其支持的逻辑表达式和运算符也类似于sql语句的where子句,比如,在filterstage中,我们可以使用以下常见的逻辑表达式和运算符以及其组合,
∙true和false
∙六个比较运算符:
=,<>,<,>,<=,>=
∙isnull和isnotnull
∙like和between
从其语法作用上看,则类似于java或者C语言中的switchcase语句,我们可以通过设置“OutputRowOnlyOnce”选项的值来决定是否在每个casewhen子句后面是否添加break,通过添加或者删除“RejectLink”来确定是否添加一个default子句.下面是一个简单的例子。
展示了我们如何通过员工编号和薪水的组合条件来过滤员工的记录到不同的结果文件的。
图10.FilterStage的示例
点击查看大图
图11.FilterStage的设置
点击查看大图
对于每一个where条件,我们需要设置相应的输出链接,这是一个整型数字,我们可以在“LinkOrdering”页签上找到输出链接的编号和名称之间的对应关系。
另外需要注意的一点是,FilterStage不对输入的记录做任何改动,只做分发。
但是你可以手动设置输出的column,使得每个输出的column列表不一样,但只要是输入column列表的子集即可,但是对于RejectLink,Column列表是默认完全等同于输入的,且不可更改。
用于调试的Stages
我们知道DataStageServerJob中提供了Debug功能,我们在开发过程中遇到问题的时候可以让Job运行在debug模式下,仔细查看每行数据在Job中各个Stage之间的流动和转换情况,但ParallelJob并没有给我们提供调试功能,但ParallelJob用另外一种方式提供了调试的能力:
ParallelJob内置了用于调试的Stage,使用这些Stage,我们可以按照我们的需要,把我们怀疑有问题的中间数据输出,进而可以深入查找问题的根源。
在ParallelJob中提供了以下的Stage用于调试:
∙HeadStage
∙TailStage
∙SampleStage
∙PeekStage
∙RowGeneratorStage
∙ColumnGeneratorStage
我们以一个peek的实例展示一下development/debugStage的使用,其他的Stage的用法类似,可以参见后面的表格和文档。
如下图是我们的Job,DB2Stage的SourceSql如下:
1
2
3
SELECTk,vFROM(values(1,1),(2,2),(3,3),(4,4),(5,5),(6,6),(7,7),(8,8),(9,9),
(10,10),(11,11),(12,12),(13,13),(14,14),(15,15),(16,16),(17,17),(18,18),(19,19),(20,20))
astemp(k,v)
图12.PeekJob的示例
点击查看大图
图13.PeekStage的设置
点击查看大图
图14.Peek的结果
点击查看大图
下表简述了这些Stage的特点和用法
表1.Stage的特点和用法
Stage类型
用途
可设置项目
HeadStage
从头部开始抓取输入流的数据
一个输入一个输出
抓取的行数
从哪些分区抓取
每个分区的起始位置
每次抓取的间隔
TailStage
抓取输入流尾部的N行数据
一个输入一个输出
抓取的行数
从哪些分区抓取
SampleStage
一个输入,多个输出根据设置的策略从输入流的各个分区抓取数据,每个输出流有不同的百分比设置
百分比
随机数的种子
每个分区抓取的最多行数
PeekStage
从一个数据流中有选择地“偷窥”流经的数据,一个输入,两个输出,一个输出原样输出输入数据,一个输出生成一个文本,包含“偷窥到的数据”
每次抓取的间隔
每个分区抓取的行数
“偷窥”哪些行
输出“偷窥”结果到log还是输出
RowGeneratorStage
根据定义的数据schema,生成模拟数据,无需输入,一个输出
Schema(columnlist或者schemafile)
生成的行数
ColumnGeneratorStage
在一个输入流上添加新的column,为这些新添加的或原有的column生成模拟数据
需要生成模拟数据的column
SequenceJob
如果说每个个ServerJob或ParallelJob完成ETL流程中的一个数据抽取,转换,加载的子过程的话,那么SequenceJob的作用就是把这些子过程给串联起来,形成一个完整的全局的ETL过程。
从结构上看,一个SequenceJob类似于一个C或者Java语言的一个函数,但功能更为强大。
∙可以使用UserVariablesActivityStage定义局部变量,变量在定义的时候需要赋值,赋值表达式可以是系统变量,Job参数,Datastage的宏,常量,Routine的返回结果等,还可以是这些单独变量的条件,数学或者字符串运算后的结果。
几乎你在一个函数中能完成的局部变量定义的功能这儿都能实现。
下面的示例定义了六个变量。
∙可以调用其他的功能模块,通过JobActivityStage可以调用ServerJob,ParallelJob;通过ExecuteCommandStage调用unix/windowscmd,通过RoutineActivity支持调用datastageroutine。
∙支持循环,SequenceJob通过StartLoopActivityStage和EndLoopActivityStage提供了循环的功能。
循环变量可以是基于起始值,结束值和步长的整数循环,也可以基于给定的列表进行循环,还可以把这些循环中的临时变量传递给每个具体的循环步骤。
在StartLoopActivityStage和EndLoopActivityStage之间,可以加入任意多个的
∙支持逻辑运算,NestedConditionStage支持类似switch的逻辑,SequencerStage支持与和或的逻辑运算,通过这些Stage的组合,可以支持任意复杂的逻辑控制。
∙支持email通知,使用NotificationStage,在job运行成功,失败或者满足其他设定条件时,SequenceJob可以发送一封或者多封的通知邮件,使我们可以更方便地监控Job的运行状态,邮件的内容可以包含job的运行状态,当前的参数等等,凡是可以在UserVariablesStage中引用的变量都可以包含在邮件中,同时还可以包含我们指定的文件和SequenceJob的运行状态等。
∙支持错误处理和现场清理,使用TerminatorActivityStage和ExceptionHandlerStage,我们可以定义需要处理的错误,并在错误发生的使用根据定义的策略停止不必要的Job运行。
∙通过WaitforFileActivityStage可以支持等待时间,我们可以定义只