Kettle使用文档.docx
《Kettle使用文档.docx》由会员分享,可在线阅读,更多相关《Kettle使用文档.docx(44页珍藏版)》请在冰点文库上搜索。
![Kettle使用文档.docx](https://file1.bingdoc.com/fileroot1/2023-6/5/c34082c1-8b9b-49d3-b100-af682bf4b474/c34082c1-8b9b-49d3-b100-af682bf4b4741.gif)
Kettle使用文档
1.Kettle简介
Kettle是一款国外开源的ETL工具,纯java编写,可以在Window、Linux、Unix上运行,数据抽取高效稳定。
Kettle中文名称叫水壶,该项目的主程序员MATT希望把各种数据放到一个壶里,然后以一种指定的格式流出。
Kettle主要由资源库、数据库、作业(job)、转换、步骤五部分组成,资源库是用来存储用户所编写的作业和转换(即kjb文件和ktr文件)一般是在数据库中存储,方便用户的查找和使用。
数据库就是处理数据是需要的数据库。
作业是用来确定一个工程中需要使用的转换和转换的执行顺序。
转换是数据在ktr文件中的具体转换过程,类似于Java的一个方法,而作业就类似于java的一个类,它可以调用各种不同的方法(转换)。
2.Kettle框架搭建
(1)下载安装
可以从http:
//kettle.pentaho.org下载最新版的Kettle软件,同时,Kettle是绿色软件,下载后,解压到任意目录即可。
(2)配置环境变量
使用Kettle前提是配置好Java的环境变量,因为Kettle是java编写,需要本地的JVM的运行环境。
配置Java环境变量可参考:
配置Kettle环境变量步骤:
一、在系统的环境变量中添加KETTLE_HOME变量,目录指向kettle的安装目录:
D:
kettledata-integration(具体以安装路径为准)
二、新建系统变量:
KETTLE_HOME
变量值:
D:
kettledata-integration(具体以安装路径为准,Kettle的解压路径,直到Kettle.exe所在目录)
三、选择PATH添加环境变量:
变量名:
PATH
变量值:
%KETTLE_HOME%;
(3)Kettle工具的运行
在Windows系统下运行,只需要解压kettle文件后,双击data-integration文件夹中的Spoon.bat文件
在Linux下运行则双击data-integration文件夹中的Spoon.sh文件
3.Kettle的基本概念
(1)作业(job)
负责将【转换】组织在一起进而完成某一块工作,通常我们需要把一个大的任务分解成几个逻辑上隔离的作业,当这几个作业都完成了,也就说明这项任务完成了。
1.JobEntry:
一个JobEntry是一个任务的一部分,它执行某些内容。
2.Hop:
一个Hop代表两个步骤之间的一个或者多个数据流。
一个Hop总是代表着两个JobEntry之间的连接,并且能够被原始的JobEntry设置,无条件的执行下一个JobEntry,
直到执行成功或者失败。
3.Note:
一个Note是一个任务附加的文本注释信息。
1)创建作业
2)START控件
核心对象-通用-START中选取
START是一个job的任务入口,只有无条件的任务可以从此入口开始任务,也可以在此设置任务定时运行和重复运行。
3)成功控件
核心对象-通用-成功中选取
代表job的结束,没有任何数据的操作
4)转换
核心对象-通用-转换中选取
是job中具体的数据转换操作,只要把之前写好的ktr文件路径设置上去,就可以运行对应的ktr文件
(2)转换(Transformation)
定义对数据操作的容器,数据操作就是数据从输入到输出的一个过程,可以理解为比作业粒度更小一级的容器,我们将任务分解成作业,然后需要将作业分解成一个或多个转换,每个转换只完成一部分工作。
1.Value:
Value是行的一部分,并且是包含以下类型的的数据:
Strings、floatingpointNumbers、unlimitedprecisionBigNumbers、Integers、Dates、或者Boolean。
2.Row:
一行包含0个或者多个Values。
3.OutputStream:
一个OutputStream是离开一个步骤时的行的堆栈。
4.InputStream:
一个InputStream是进入一个步骤时的行的堆栈。
5.Step:
转换的一个步骤,可以是一个Stream或是其他元素。
6.Hop:
一个Hop代表两个步骤之间的一个或者多个数据流。
一个Hop总是代表着一个步骤的输出流和一个步骤的输入流。
7.Note:
一个Note是一个转换附加的文本注释信息。
1)创建转换
创建简单的转换,首先配置数据库连接,一个转换中可以创建多个数据库连接,而且是不同类型的数据库也都可以,方便从一个数据库中抽取数据到另一个数据库中
2)表输入(数据来源)
从核心组件树菜单中拖拽一个表输入组件,用于编写sql语句
3)表输出(数据接收)
从核心组件树菜单中拖拽一个表输出组件,用于将上一步骤传递的数据保存到数据库中
4)获取系统信息
双击后弹出页面如下图:
点击类型弹出下图框如下图所示:
点击选择所需要的信息即可
5)字段选择
双击打开控件:
选择和修改:
制定需要流向输出流的字段的精度顺序和名称。
获取选择的字段:
默认获取输入流中的字段。
列映射:
指的是字段名称==》字段该名成之后的映射,如果选择了移除或者元数据,则不能生成映射
移除:
指定从输出流删除的字段。
获取移除的字段:
默认获取所有的输入流字段
元数据:
修改元数据字段的名称、类型、长度、精度等。
6)合并记录
功能描述:
合并两个数据流,并根据某个关键字排序。
这两个数据被比较,以标识相等的、变更的、删除的和新建的记录。
这个步骤允许你比较两个行流,如果你想在两个不同的时间比较数据,这个是比较有用的。
两个行流被合并,一个旧数据源,一个是新数据源
双击【合并记录】控件,根据表示字段flagfield的值判断相等、变更、删除的和新建的记录。
1.“identical”–旧数据和新数据一样
2.“changed”–数据发生了变化;
3.“new”–新数据中有而旧数据中没有的记录
4.“deleted”–旧数据中有而新数据中没有的记录
关键字段:
用于定位两个数据源中的同一条记录。
比较字段:
对于两个数据源中的同一条记录中,指定需要比较的字段。
合并后的数据将包括旧数据来源和新数据来源里的所有数据,对于变化的数据,使用新数据代替旧数据,同时在结果里用一个标示字段,来指定新旧数据的比较结果。
注意:
旧数据和新数据需要事先按照关键字段排序。
旧数据和新数据要有相同的字段名称。
例子:
旧数据:
新数据合并后的数据
field1,field2field1,field2field1;field2;flag
1,11,11;1;identical
2,22,92;9;changed
3,35,53;3;deleted
4,44;4;deleted
5;5;new
4.Kettle使用
打开kettle工具后会出现如下界面:
(1)新建作业
路径:
文件–>新建作业。
(2)新建2个连接(mysql、oracle)
路径:
主对象树-作业-作业1-DB连接。
连接名称:
数据库连接名称(随意填写,不能为空)
连接:
连接的数据库类型
连接方式:
数据库的连接方式(本文档只选择了Native的连接方式)
主机名称:
数据库具体的Ip地址
数据库名称:
指定连接的数据库名称
端口号:
数据库监听的tcp/ip端口号
用户名:
指定连接数据库登录时的用户名
密码:
指定连接数据库登录时的密码
填写完数据库连接的基本信息后,点解界面中的测试按钮进行数据库连接测试,如成功连接会弹出如下信息,反之则会弹出出错信息,出错后根据相关的出错信息进行修改。
Oracle数据库同理:
注:
需要把数据库连接需要的jar包添加到lib文件夹下。
(1)Kettle实现数据库整库迁移
一、采用软件自带方式
首先在目标数据库中先创建一个抽取记录表TIME_TABLE
表内2个字段为:
etl_time(抽取时间)
table_name(抽取表名)
1.新建一个job
创建两个DB连接:
mysql、ORCL(源数据库和目标数据库连接),在菜单中找到【复制多表向导】,点击进行相关操作:
2.选择源数据库和目标数据库
3.选择所需迁移的表
4.编辑生成的job文件名:
qy.kjb,和文件目录,编辑好后+【Finish】
点击Finish之后界面如下图所示,
5.把当前抽取时间和表名记录到创建的TIME_TABLE表中,通过SQL脚本,核心对象-脚本-SQL,左键拖拽进主页面通过Shift将其连接起来
在这个SQL脚本中填写insert语句,把当前时间和表名添加到数据库抽取记录表中
6.点击运行按钮
二、自定义数据库迁移(oracle>>mysql)
1.创建一个主job:
数据库迁移.kjb
2.创建转换:
获取表名称.ktr
1)创建一个DB连接:
oraclesource(源数据库连接)
2) 表输入
注:
oracle中获取表名SQL:
select*fromuser_tables,mysql中为:
showtables
3)字段选择
点击【获取选择的字段】,添加新的字段,选中"TABLE_NAME"并改名成“tablename”,将其余字段删除
4)添加一个【复制记录到结果字符串】
3.新建一个job:
抽取表.kjb
1)新建一个DB连接:
mysqltarget
4.新建一个转换:
表名称变量设置.ktr
1)添加一个【从结果获取记录】
2)添加一个【设置变量】
5.添加一个【检查表是否存在】
6.新建一个转换:
创建目标库表结构.ktr
1)新建两个DB连接:
oraclesource,mysqltarget创建过程同上
2)添加一个【表输入】
3)添加一个【Java代码】,代码如下:
1.
publicbooleanprocessRow(StepMetaInterfacesmi,StepDataInterfacesdi)throwsKettleException
2.
3.
{
4.
5.
Object[]r=getRow();
6.
7.
8.
//本地连接获取数据库元数据
9.
10.
//org.pentaho.di.core.database.DatabaseMetadbmeta=getTransMeta().findDatabase("target");
11.
12.
13.
//资源库连接获取数据库元数据
14.
15.
org.pentaho.di.core.database.DatabaseMetadbmeta=null;
16.
17.
18.
java.util.Listlist=getTrans().getRepository().readDatabases();
19.
20.
21.
if(list!
=null&&!
list.isEmpty())
22.
23.
{
24.
25.
for(inti=0;i26.
27.
{
28.
29.
dbmeta=(org.pentaho.di.core.database.DatabaseMeta)list.get(i);
30.
31.
if("target".equalsIgnoreCase(dbmeta.getName()))
32.
33.
{
34.
35.
break;
36.
37.
}
38.
39.
}
40.
41.
}
42.
43.
44.
if(dbmeta!
=null)
45.
46.
{
47.
48.
org.pentaho.di.core.database.Databasedb=neworg.pentaho.di.core.database.Database(dbmeta);
49.
50.
51.
try
52.
53.
{
54.
55.
db.connect();
56.
57.
58.
Stringtablename=getVariable("TABLENAME");
59.
60.
61.
//在日志中显示创建表名
62.
63.
logBasic("开始创建表:
"+tablename);
64.
65.
66.
if(tablename!
=null&&tablename.trim().length()>0)
67.
68.
{
69.
70.
71.
//获取表名称和输入行数据
72.
73.
Stringsql=db.getDDLCreationTable(tablename,getInputRowMeta());
74.
75.
76.
db.execStatement(sql.replace(";",""));
77.
78.
79.
//在日志中显示sql语句
80.
81.
logBasic(sql);
82.
83.
}
84.
85.
}
86.
87.
catch(Exceptione)
88.
89.
{
90.
91.
logError("创建表出现异常",e);
92.
93.
94.
}finally{
95.
96.
db.disconnect();
97.
98.
}
99.
100.
}
101.
102.
returnfalse;
103.
104.
}
105.
7.新建一个转换:
表抽取.ktr
1)新建两个DB连接:
oraclesource,mysqltarget创建过程同上
2)添加一个【表输入】
3)添加一个表输出
8.运行数据库迁移.kjb
(2)Kettle增量抽取方案
新建一个转换
在核心对象中找到【表输入】【表输入2】【表输出】【阻塞数据】
【执行SQL脚本】组件,并通过Shift连接在一起
【表输入】:
查询该表的最新的一次抽取时间
【表输入2】:
把上一步查询出来的最新抽取时间当做查询条件,查出要抽取的数据
【表输出】把查询出来要抽取的数据,抽取到oracle数据库中
【执行SQL语句】:
修改成目标数据库抽取过来的时间最大的一条记录,把数据抽取记录表的时间修改以便下一次抽取
表输入:
旧数据来源的步骤(来源于mysql数据库)
表输入2:
新数据来源的步骤(来源于oracle数据库)
合并记录:
将输入源与目的源的每个字段数据根据唯一字段比较后到值映射
值映射:
使用字段名为起的后面用到的一个变量名(可任意起),源值列为系统默认
1.“identical”–旧数据和新数据一样
2.“changed”–数据发生了变化;
3.“new”–新数据中有而旧数据中没有的记录
4.“deleted”–旧数据中有而新数据中没有的记录
过滤记录步骤:
通过flagfield的值来执行后面的分支,也可以通过Switch/Case组件执行后面的分支例如:
5.Kettle定时任务
新建一个【作业】,【核心对象】【通用】里面把【START】【转换】【成功】组件拖拽出来并通过Shift连接
【START】组件:
设置重复操作,时间间隔2秒
【转换】组件:
浏览,选择已保存的ktr文件
【成功】:
提示操作成功。
判断数据数据是否变化: