Kettle使用文档.docx

上传人:b****8 文档编号:12395655 上传时间:2023-06-05 格式:DOCX 页数:44 大小:3.42MB
下载 相关 举报
Kettle使用文档.docx_第1页
第1页 / 共44页
Kettle使用文档.docx_第2页
第2页 / 共44页
Kettle使用文档.docx_第3页
第3页 / 共44页
Kettle使用文档.docx_第4页
第4页 / 共44页
Kettle使用文档.docx_第5页
第5页 / 共44页
Kettle使用文档.docx_第6页
第6页 / 共44页
Kettle使用文档.docx_第7页
第7页 / 共44页
Kettle使用文档.docx_第8页
第8页 / 共44页
Kettle使用文档.docx_第9页
第9页 / 共44页
Kettle使用文档.docx_第10页
第10页 / 共44页
Kettle使用文档.docx_第11页
第11页 / 共44页
Kettle使用文档.docx_第12页
第12页 / 共44页
Kettle使用文档.docx_第13页
第13页 / 共44页
Kettle使用文档.docx_第14页
第14页 / 共44页
Kettle使用文档.docx_第15页
第15页 / 共44页
Kettle使用文档.docx_第16页
第16页 / 共44页
Kettle使用文档.docx_第17页
第17页 / 共44页
Kettle使用文档.docx_第18页
第18页 / 共44页
Kettle使用文档.docx_第19页
第19页 / 共44页
Kettle使用文档.docx_第20页
第20页 / 共44页
亲,该文档总共44页,到这儿已超出免费预览范围,如果喜欢就下载吧!
下载资源
资源描述

Kettle使用文档.docx

《Kettle使用文档.docx》由会员分享,可在线阅读,更多相关《Kettle使用文档.docx(44页珍藏版)》请在冰点文库上搜索。

Kettle使用文档.docx

Kettle使用文档

1.Kettle简介

Kettle是一款国外开源的ETL工具,纯java编写,可以在Window、Linux、Unix上运行,数据抽取高效稳定。

Kettle中文名称叫水壶,该项目的主程序员MATT希望把各种数据放到一个壶里,然后以一种指定的格式流出。

Kettle主要由资源库、数据库、作业(job)、转换、步骤五部分组成,资源库是用来存储用户所编写的作业和转换(即kjb文件和ktr文件)一般是在数据库中存储,方便用户的查找和使用。

数据库就是处理数据是需要的数据库。

作业是用来确定一个工程中需要使用的转换和转换的执行顺序。

转换是数据在ktr文件中的具体转换过程,类似于Java的一个方法,而作业就类似于java的一个类,它可以调用各种不同的方法(转换)。

2.Kettle框架搭建

(1)下载安装

可以从http:

//kettle.pentaho.org下载最新版的Kettle软件,同时,Kettle是绿色软件,下载后,解压到任意目录即可。

(2)配置环境变量

使用Kettle前提是配置好Java的环境变量,因为Kettle是java编写,需要本地的JVM的运行环境。

配置Java环境变量可参考:

配置Kettle环境变量步骤:

一、在系统的环境变量中添加KETTLE_HOME变量,目录指向kettle的安装目录:

D:

kettledata-integration(具体以安装路径为准)

二、新建系统变量:

KETTLE_HOME

变量值:

D:

kettledata-integration(具体以安装路径为准,Kettle的解压路径,直到Kettle.exe所在目录)

三、选择PATH添加环境变量:

 

变量名:

PATH 

变量值:

%KETTLE_HOME%;

(3)Kettle工具的运行

在Windows系统下运行,只需要解压kettle文件后,双击data-integration文件夹中的Spoon.bat文件

在Linux下运行则双击data-integration文件夹中的Spoon.sh文件

3.Kettle的基本概念

(1)作业(job)

负责将【转换】组织在一起进而完成某一块工作,通常我们需要把一个大的任务分解成几个逻辑上隔离的作业,当这几个作业都完成了,也就说明这项任务完成了。

 

1.JobEntry:

一个JobEntry是一个任务的一部分,它执行某些内容。

 

2.Hop:

一个Hop代表两个步骤之间的一个或者多个数据流。

一个Hop总是代表着两个JobEntry之间的连接,并且能够被原始的JobEntry设置,无条件的执行下一个JobEntry, 

直到执行成功或者失败。

 

3.Note:

一个Note是一个任务附加的文本注释信息。

1)创建作业

2)START控件

核心对象-通用-START中选取

START是一个job的任务入口,只有无条件的任务可以从此入口开始任务,也可以在此设置任务定时运行和重复运行。

3)成功控件

核心对象-通用-成功中选取

代表job的结束,没有任何数据的操作

4)转换

核心对象-通用-转换中选取

是job中具体的数据转换操作,只要把之前写好的ktr文件路径设置上去,就可以运行对应的ktr文件

(2)转换(Transformation)

定义对数据操作的容器,数据操作就是数据从输入到输出的一个过程,可以理解为比作业粒度更小一级的容器,我们将任务分解成作业,然后需要将作业分解成一个或多个转换,每个转换只完成一部分工作。

1.Value:

Value是行的一部分,并且是包含以下类型的的数据:

Strings、floatingpointNumbers、unlimitedprecisionBigNumbers、Integers、Dates、或者Boolean。

 

2.Row:

一行包含0个或者多个Values。

 

3.OutputStream:

一个OutputStream是离开一个步骤时的行的堆栈。

 

4.InputStream:

一个InputStream是进入一个步骤时的行的堆栈。

 

5.Step:

转换的一个步骤,可以是一个Stream或是其他元素。

 

6.Hop:

一个Hop代表两个步骤之间的一个或者多个数据流。

一个Hop总是代表着一个步骤的输出流和一个步骤的输入流。

 

7.Note:

一个Note是一个转换附加的文本注释信息。

1)创建转换

 创建简单的转换,首先配置数据库连接,一个转换中可以创建多个数据库连接,而且是不同类型的数据库也都可以,方便从一个数据库中抽取数据到另一个数据库中

2)表输入(数据来源)

从核心组件树菜单中拖拽一个表输入组件,用于编写sql语句

3)表输出(数据接收)

从核心组件树菜单中拖拽一个表输出组件,用于将上一步骤传递的数据保存到数据库中

4)获取系统信息

双击后弹出页面如下图:

点击类型弹出下图框如下图所示:

点击选择所需要的信息即可

5)字段选择

双击打开控件:

选择和修改:

制定需要流向输出流的字段的精度顺序和名称。

获取选择的字段:

默认获取输入流中的字段。

列映射:

指的是字段名称==》字段该名成之后的映射,如果选择了移除或者元数据,则不能生成映射

移除:

指定从输出流删除的字段。

获取移除的字段:

默认获取所有的输入流字段

元数据:

修改元数据字段的名称、类型、长度、精度等。

6)合并记录

功能描述:

合并两个数据流,并根据某个关键字排序。

这两个数据被比较,以标识相等的、变更的、删除的和新建的记录。

这个步骤允许你比较两个行流,如果你想在两个不同的时间比较数据,这个是比较有用的。

两个行流被合并,一个旧数据源,一个是新数据源

双击【合并记录】控件,根据表示字段flagfield的值判断相等、变更、删除的和新建的记录。

1.“identical”–旧数据和新数据一样

2.“changed”–数据发生了变化;

3.“new”–新数据中有而旧数据中没有的记录

4.“deleted”–旧数据中有而新数据中没有的记录

关键字段:

用于定位两个数据源中的同一条记录。

比较字段:

对于两个数据源中的同一条记录中,指定需要比较的字段。

合并后的数据将包括旧数据来源和新数据来源里的所有数据,对于变化的数据,使用新数据代替旧数据,同时在结果里用一个标示字段,来指定新旧数据的比较结果。

注意:

旧数据和新数据需要事先按照关键字段排序。

旧数据和新数据要有相同的字段名称。

例子:

旧数据:

新数据合并后的数据

field1,field2field1,field2field1;field2;flag

1,11,11;1;identical

2,22,92;9;changed

3,35,53;3;deleted

4,44;4;deleted

5;5;new

 

4.Kettle使用

打开kettle工具后会出现如下界面:

(1)新建作业

路径:

文件–>新建作业。

(2)新建2个连接(mysql、oracle)

路径:

主对象树-作业-作业1-DB连接。

连接名称:

数据库连接名称(随意填写,不能为空)

连接:

连接的数据库类型

连接方式:

数据库的连接方式(本文档只选择了Native的连接方式)

主机名称:

数据库具体的Ip地址 

数据库名称:

指定连接的数据库名称

端口号:

数据库监听的tcp/ip端口号 

用户名:

指定连接数据库登录时的用户名 

密码:

指定连接数据库登录时的密码

填写完数据库连接的基本信息后,点解界面中的测试按钮进行数据库连接测试,如成功连接会弹出如下信息,反之则会弹出出错信息,出错后根据相关的出错信息进行修改。

Oracle数据库同理:

注:

需要把数据库连接需要的jar包添加到lib文件夹下。

 

(1)Kettle实现数据库整库迁移

一、采用软件自带方式

首先在目标数据库中先创建一个抽取记录表TIME_TABLE

表内2个字段为:

etl_time(抽取时间)

table_name(抽取表名)

1.新建一个job

创建两个DB连接:

mysql、ORCL(源数据库和目标数据库连接),在菜单中找到【复制多表向导】,点击进行相关操作:

2.选择源数据库和目标数据库

3.选择所需迁移的表

4.编辑生成的job文件名:

qy.kjb,和文件目录,编辑好后+【Finish】

点击Finish之后界面如下图所示,

5.把当前抽取时间和表名记录到创建的TIME_TABLE表中,通过SQL脚本,核心对象-脚本-SQL,左键拖拽进主页面通过Shift将其连接起来

在这个SQL脚本中填写insert语句,把当前时间和表名添加到数据库抽取记录表中

 

6.点击运行按钮

二、自定义数据库迁移(oracle>>mysql)

1.创建一个主job:

数据库迁移.kjb

2.创建转换:

获取表名称.ktr

1)创建一个DB连接:

oraclesource(源数据库连接)

2) 表输入

注:

oracle中获取表名SQL:

select*fromuser_tables,mysql中为:

showtables

3)字段选择

点击【获取选择的字段】,添加新的字段,选中"TABLE_NAME"并改名成“tablename”,将其余字段删除

4)添加一个【复制记录到结果字符串】

3.新建一个job:

抽取表.kjb

1)新建一个DB连接:

mysqltarget

4.新建一个转换:

表名称变量设置.ktr

1)添加一个【从结果获取记录】

2)添加一个【设置变量】

5.添加一个【检查表是否存在】

6.新建一个转换:

创建目标库表结构.ktr

1)新建两个DB连接:

oraclesource,mysqltarget创建过程同上

2)添加一个【表输入】

 

3)添加一个【Java代码】,代码如下:

1.

publicbooleanprocessRow(StepMetaInterfacesmi,StepDataInterfacesdi)throwsKettleException

2.

3.

{

4.

5.

Object[]r=getRow();

6.

7.

8.

//本地连接获取数据库元数据

9.

10.

//org.pentaho.di.core.database.DatabaseMetadbmeta=getTransMeta().findDatabase("target");

11.

12.

13.

//资源库连接获取数据库元数据

14.

15.

org.pentaho.di.core.database.DatabaseMetadbmeta=null;

16.

17.

18.

java.util.Listlist=getTrans().getRepository().readDatabases();

19.

20.

21.

if(list!

=null&&!

list.isEmpty())

22.

23.

{

24.

25.

for(inti=0;i

26.

27.

{

28.

29.

dbmeta=(org.pentaho.di.core.database.DatabaseMeta)list.get(i);

30.

31.

if("target".equalsIgnoreCase(dbmeta.getName()))

32.

33.

{

34.

35.

break;

36.

37.

}

38.

39.

}

40.

41.

}

42.

43.

44.

if(dbmeta!

=null)

45.

46.

{

47.

48.

org.pentaho.di.core.database.Databasedb=neworg.pentaho.di.core.database.Database(dbmeta);

49.

50.

51.

try

52.

53.

{

54.

55.

db.connect();

56.

57.

58.

Stringtablename=getVariable("TABLENAME");

59.

60.

61.

//在日志中显示创建表名

62.

63.

logBasic("开始创建表:

"+tablename);

64.

65.

66.

if(tablename!

=null&&tablename.trim().length()>0)

67.

68.

{

69.

70.

71.

//获取表名称和输入行数据

72.

73.

Stringsql=db.getDDLCreationTable(tablename,getInputRowMeta());

74.

75.

76.

db.execStatement(sql.replace(";",""));

77.

78.

79.

//在日志中显示sql语句

80.

81.

logBasic(sql);

82.

83.

}

84.

85.

}

86.

87.

catch(Exceptione)

88.

89.

{

90.

91.

logError("创建表出现异常",e);

92.

93.

94.

}finally{

95.

96.

db.disconnect();

97.

98.

}

99.

100.

}

101.

102.

returnfalse;

103.

104.

}

105.

7.新建一个转换:

表抽取.ktr

1)新建两个DB连接:

oraclesource,mysqltarget创建过程同上

2)添加一个【表输入】

3)添加一个表输出

8.运行数据库迁移.kjb

(2)Kettle增量抽取方案

新建一个转换

在核心对象中找到【表输入】【表输入2】【表输出】【阻塞数据】

【执行SQL脚本】组件,并通过Shift连接在一起

【表输入】:

查询该表的最新的一次抽取时间

【表输入2】:

把上一步查询出来的最新抽取时间当做查询条件,查出要抽取的数据

【表输出】把查询出来要抽取的数据,抽取到oracle数据库中

【执行SQL语句】:

修改成目标数据库抽取过来的时间最大的一条记录,把数据抽取记录表的时间修改以便下一次抽取

表输入:

旧数据来源的步骤(来源于mysql数据库)

表输入2:

新数据来源的步骤(来源于oracle数据库)

合并记录:

将输入源与目的源的每个字段数据根据唯一字段比较后到值映射

值映射:

使用字段名为起的后面用到的一个变量名(可任意起),源值列为系统默认

1.“identical”–旧数据和新数据一样

2.“changed”–数据发生了变化;

3.“new”–新数据中有而旧数据中没有的记录

4.“deleted”–旧数据中有而新数据中没有的记录

 

过滤记录步骤:

通过flagfield的值来执行后面的分支,也可以通过Switch/Case组件执行后面的分支例如:

5.Kettle定时任务

新建一个【作业】,【核心对象】【通用】里面把【START】【转换】【成功】组件拖拽出来并通过Shift连接

【START】组件:

设置重复操作,时间间隔2秒

【转换】组件:

浏览,选择已保存的ktr文件

【成功】:

提示操作成功。

判断数据数据是否变化:

 

展开阅读全文
相关资源
猜你喜欢
相关搜索
资源标签

当前位置:首页 > IT计算机 > 电脑基础知识

copyright@ 2008-2023 冰点文库 网站版权所有

经营许可证编号:鄂ICP备19020893号-2