Hadoop学习笔记.docx
《Hadoop学习笔记.docx》由会员分享,可在线阅读,更多相关《Hadoop学习笔记.docx(55页珍藏版)》请在冰点文库上搜索。
Hadoop学习笔记
Hadoop:
TheDefinitiveGuid学习笔记
mingyuanEmail:
cn.mingyuan@
1.MeetHadoop
1.1.Data数据
本节陈述了这样的事实:
数据量越来越大,并且来源也越来越多,我们面临的问题是如何有效的存储和分析它们。
1.2.DataStorageandAnalysis数据存储和分析
现在面临这这样的一个问题:
当磁盘的存储量随着时间的推移越来越大的时候,对磁盘上的数据的读取速度却没有多大的增长。
读取硬盘上的全部数据会花费比较长的时间,如果是写操作的话则会更慢。
一个解决的办法是同时读取多个硬盘上的数据。
例如我们有100块硬盘,而他们分别存储整个数据的1%的话,若是用并行读取的方法操作,可以在两分钟之内搞定。
只使用每块硬盘的1%当然是浪费的,但是我们可以存储100个数据集(dataset),每个1T,并且提供共享访问。
有了更短的分析时间之后,这样的系统的用户应该是乐于接受共享访问模式的。
他们的工作是按时间划分的,这样相互之间就不会有较大的影响。
从多个磁盘上进行并行读写操作是可行的,但是存在以下几个方面的问题:
1)第一个问题是硬件错误。
使用的硬件越多出错的几率就越大。
一种常用的解决方式是数据冗余,保留多分拷贝,即使一份数据处理出错,还有另外的数据。
HDFS使用的也是类似的方式,但稍有不同。
2)第二个问题是数据处理的相关性问题。
例如很多分析工作在一快磁盘上处理出来的结果需要与其他磁盘上处理处理出来的结果合并才能完成任务。
各种分布式系统也都给出了合并的策略,但是做好这方面确实是一个挑战。
MapReduce提供了一种编程模型,他将从硬盘上读写数据的问题抽象出来,转化成对一系列键值对的计算。
简而言之,Hadoop提供了一个可靠的存储和分析系统。
存储又HDFS提供,分析由MapReduce提供。
1.3.与其他系统比较
1.3.1.RDBMS
为什么我们不能使用大量的磁盘数据库做大规模的批量分析?
为什么需要MapReduce?
1)磁盘的寻道时间提高的速度低于数据的传输速度,如果数据访问模式由寻道时间支配的话,在读写数据集的一大部分的时候速度就会较流式读取慢很多,这样就出现了瓶颈。
2)另一方面在更新数据集的少量数据的时候,传统的B-树工作的比较好,但是在更新数据集的大部分数据的时候B-树就显得比MapReduce方式慢了。
MapReduce使用排序/合并操作去重建数据库(完成数据更新)。
在很多方面MapReduce可以看作是对传统关系数据库的补充。
MapReduce比较适合于需要分析整个数据集,并且要使用批处理方式,特别是特定的分析的情况;RDBMS点查询方面占优势,或在已编制索引的数据集提供低延迟的检索和更新的数据,但是数据量不能太大。
MapReduce适合一次写入,多次读取的操作,但是关系数据库就比较适合对数据集的持续更新。
还有一方面,MapReduce比较适合处理半结构化,非结构化的数据。
关系数据往往进行规则化以保证数据完整性,并删除冗余。
这样做给MapReduce提出了新的问题:
它使得读数据变成了非本地执行,而MapReduce的一个重要前提(假设)就是数据可以进行高速的流式读写。
MapReduce是可以进行线性扩展的编程模型。
一个对集群级别的数据量而写的MapReduce可以不加修改的应用于小数据量或者更大数据量的处理上。
更重要的是当你的输入数据增长一倍的时候,相应的处理时间也会增加一倍。
但是如果你把集群也增长一倍的话,处理的速度则会和没有增加数据量时候的速度一样快,这方面对SQL查询来说不见得是正确的。
随着时间的推移,关系数据库和MapReduce之间的差别变得越来越不明显,很多数据库(例如AsterData和Greenplum的数据)已经借用了一些MapReduce的思想。
另一个方面,基于MapReduce的高层次查询语言使得MapReduce系统较传统的关系数据库来说,使程序员们更容易接受。
1.3.2.GridCompuing网格计算
数据量大的时候网络带宽会成为网格计算的瓶颈。
但是MapReduce使数据和计算在一个节点上完成,这样就变成了本地的读取。
这是MapReduce高性能的核心。
MPI将控制权大大的交给了程序员,但是这就要求程序员明确的处理数据流等情况,而MapReduce只提供高层次的操作:
程序员只需考虑处理键值对的函数,而对数据流则是比较隐晦的。
在分布式计算中,如何协调各个处理器是一项很大的挑战。
最大的挑战莫过于如何很好的处理部分计算的失误。
当你不知道是不是出现错误的时候,程序还在继续运行,这就比较麻烦了。
由于MapReduce是一种非共享(Shared-nothing)的架构,当MapReduce实现检测到map或者reduce过程出错的时候,他可以将错误的部分再执行一次。
MPI程序员则需要明确的考虑检查点和恢复,这虽然给程序员很大自由,但是也使得程序变得难写。
也许你会觉得mapreduce模式过于严格,程序员面对的都是些键值对,并且mapper和reducer之间很少来往,这样的模式能做一些有用的或者是非凡的事情吗?
答案是肯定的,Google已经把Mapreduce使用在了很多方面——从图像分析到基于图的问题,再到机器学习,MapReduce工作的很好。
虽然他不是万能的,但是他确是一种通用的数据处理工具。
1.3.3.VolunteerComputing志愿计算
志愿计算主要是让志愿者贡献CPU时间来完成计算。
MapReduce是针对在一个高聚合网络连接的数据中心中进行的可信的、使用专用的硬件工作持续数分钟或者数个小时而设计的。
相比之下,志愿计算则是在不可信的、链接速度有很大差异的、没有数据本地化特性的,互联网上的计算机上运行永久的(超长时间的)计算,
1.4.Hadoop简史(略)
1.5.TheApacheHadoopProject(略)
2.MapReduce
2.1.AWeatherDataset一个天气数据集
数据是NCDC的数据,我们关注以下特点:
1)数据是半格式化的
2)目录里面存放的是从1901-2001年一个世纪的记录,是gzip压缩过的文件。
3)以行为单位,使用ASCII格式存储,每行就是一条记录
4)每条记录我们关注一些基本的元素,比如温度,这些数据在每条数据中都会出现,并且宽度也是固定的。
下面是一条记录的格式,为了便于显示,做了一部分调整。
2.2.AnalyzingtheDatawithUnixTools使用Unix工具分析数据
以分析某年份的最高温度为例,下面是一段Unix的脚本程序:
这段脚本的执行过程如下:
脚本循环处理每一个压缩的年份文件,首先打印出年份,然后对每一个文件使用awk处理。
Awk脚本从数据中解析出两个字段:
一个airtemperature,一个qualitycode。
airtemperature值加0被转换成整形。
接下来查看温度数据是否有效(9999表示在NCDC数据集中丢失的值),并且检查qualitycode是不是可信并没有错误的。
如果读取一切正常,temp将与目前的最大值比较,如果出现新的最大值,则更新当前max的值。
当文件中所有行的数据都被处理之后,开始执行End程序块,并且打印出最大值。
程序执行之后将产生如下样式的输出:
处理结果之中,温度的值被放大了10倍。
所以,1901年的温度应该是31.7度,1902年的温度应该是24.4度……
所有的,一个世纪的气象记录在一台EC2High-CPUExtraLargeInstance上耗时42分钟。
为了加速处理速度,我们将程序的某些部分进行并行执行。
这在理论上是比较简单的,我们可以按照年份来在不同的处理器上执行,使用所有可用的硬件线程,但是还是有些问题:
1)把任务切分成相同大小的块不总是那么容易的。
这这种情况下,不同年份的文件大小有很大不同,这样就会导致一些过程较早的完成,尽管这些他们可以进行下一步的工作,但是总的运行时间是由耗费时间最长的文件所决定的。
一种可供选择的尝试是将输入分成固定大小的块,并把它们分配给处理进程。
2)合并单独处理出来的结果还需要进一步的处理。
在这种情况下,一个年份的结果对于其他年份来说是独立的,并且可能经过联接所有的结果,并按照年份进行排序之后被合并。
如果使用固定大小块的方式,合并是很脆弱的。
例如,某一年份的数据可能被分到不同的块中,并且被单独处理。
我们最终也会得每块数据的最高温度,但是这时候我们最后一步变成了在这些最大值中,为每一个年份找出最大值。
3)人们仍旧被单机的处理能力所束缚。
如果在一台拥有确定数量处理器的计算机上面执行程序的的开销是20分钟的话,你也不能可能再有所提高了。
并且有些数据集的数据量已经超出了单台计算机的处理能力。
当我们开始使用多台机器的时候,其它一大堆因素就跳了出来,主要是协调和可靠性的问题。
谁掌控全局?
怎么进行处理器的失效处理?
所以,尽管在理论上并行处理是可行的,但是实践上却是麻烦的。
使用一个类似于Hadoop的框架将会有很大的帮助。
2.3.AnalyzingtheDatawithHadoop使用Hadoop分析数据
为了使用Hadoop并行处理的长处,我们需要将程序做成MapReduce格式。
经过一些本地的、小数据量的测试之后,我们将可以把程序放在集群上进行运行。
2.3.1.MapandReduce
MapReduce将工作分为map阶段和reduce阶段,每个阶段都将键值对作为输入输入,键值对的类型可以由程序员选择。
程序员还指定两个函数:
map和reduce函数。
Map阶段的输入数据是NCDC的原始数据,我们选择文本格式输入,这样可以把记录中的每一行作为文本value。
Key是当前行离开始行的偏移量,但是我们并不需要这个key,所以省去。
我们的Map函数比较简单,仅仅从输入中析取出temperature。
从这个意义上来说,map函数仅仅是完成了数据的准备阶段,这样使得reducer函数可以基于它查找历年的最高温度。
Map函数也是一个很好的过滤阶段,这里可以过滤掉丢失、置疑、错误的temperature数据。
形象化一点:
下面是输入数据
下面的键值对给map函数处理,其中加粗的是有用的数据
处理之后的结果如下:
经过以上的处理之后还需要在mapreduce框架中进行进一步的处理,主要有排序和按照key给键值对给key-value排序。
经过这一番处理之后的结果如下:
上面的数据将传递给reduce之后,reduce所需要做的工作仅仅是遍历这些数据,找出最大值,产生最终的输出结果:
以上过程可以用下图描述:
2.3.2.JavaMapReduce
Map函数实现了mapper接口,此接口声明了一个map()函数。
下面是map实现
importjava.io.IOException;
importorg.apache.hadoop.io.IntWritable;
importorg.apache.hadoop.io.LongWritable;
importorg.apache.hadoop.io.Text;
importorg.apache.hadoop.mapred.MapReduceBase;
importorg.apache.hadoop.mapred.Mapper;
importorg.apache.hadoop.mapred.OutputCollector;
importorg.apache.hadoop.mapred.Reporter;
publicclassMaxTemperatureMapperextendsMapReduceBaseimplements
Mapper{
privatestaticfinalintMISSING=9999;
publicvoidmap(LongWritablekey,Textvalue,
OutputCollectoroutput,Reporterreporter)
throwsIOException{
Stringline=value.toString();
Stringyear=line.substring(15,19);
intairTemperature;
if(line.charAt(87)=='+'){//parseIntdoesn'tlikeleadingplus
//signs
airTemperature=Integer.parseInt(line.substring(88,92));
}else{
airTemperature=Integer.parseInt(line.substring(87,92));
}
Stringquality=line.substring(92,93);
if(airTemperature!
=MISSING&&quality.matches("[01459]")){
output.collect(newText(year),newIntWritable(airTemperature));
}
}
}
Mapper是一个泛型类型,有四个参数分别代表Map函数的inputkey,inputvalue,outputkey,outputvalue的类型。
对于本例来说,inputkey是一个longinteger的偏移量,inputvalue是一行文本,outputkey是年份,outputvalue是气温(整形)。
除了Java的数据类型之外,Hadoop也提供了他自己的基本类型,这些类型为网络序列化做了专门的优化。
可以在org.apache.hadoop.io包中找到他们。
比如LongWritable相当于Java中的Long,Text相当于String而IntWritable在相当于Integer。
map()方法传入一个key和一个value。
我们将Text类型的value转化成Java的String,然后用String的substring方法取出我偶们需要的部分。
map()方法也提供了OutputCollector的一个实例,用来写输出数据。
在次情况下,我们将year封装成Text,而将temperature包装成IntWritable类型。
只要在temperature值出现并且qualitycode表示temperature读取正常的情况下我们才进行数据的写入。
下面是Reduce函数的类似实现,仅用了一个Reducer接:
importjava.io.IOException;
importjava.util.Iterator;
importorg.apache.hadoop.io.IntWritable;
importorg.apache.hadoop.io.Text;
importorg.apache.hadoop.mapred.MapReduceBase;
importorg.apache.hadoop.mapred.OutputCollector;
importorg.apache.hadoop.mapred.Reducer;
importorg.apache.hadoop.mapred.Reporter;
publicclassMaxTemperatureReducerextendsMapReduceBaseimplements
Reducer{
publicvoidreduce(Textkey,Iteratorvalues,
OutputCollectoroutput,Reporterreporter)
throwsIOException{
intmaxValue=Integer.MIN_VALUE;
while(values.hasNext()){
maxValue=Math.max(maxValue,values.next().get());
}
output.collect(key,newIntWritable(maxValue));
}
}
类似的,Reducer也有四个参数来分别标记输入输出。
Reduce函数的输入类型必须对应于Map函数的输出,拿本例来说,输入必须是:
Text,IntWritable类型。
Reduce在本例输出结果是Text和IntWritbale类型,year和与其对应的maxValue是经过遍历、比较之后得到的。
下面的一段代码执行了MapReduce工作:
importjava.io.IOException;
importorg.apache.hadoop.fs.Path;
importorg.apache.hadoop.io.IntWritable;
importorg.apache.hadoop.io.Text;
importorg.apache.hadoop.mapred.FileInputFormat;
importorg.apache.hadoop.mapred.FileOutputFormat;
importorg.apache.hadoop.mapred.JobClient;
importorg.apache.hadoop.mapred.JobConf;
publicclassMaxTemperature{
publicstaticvoidmain(String[]args)throwsIOException{
if(args.length!
=2){
System.err
.println("Usage:
MaxTemperature");
System.exit(-1);
}
JobConfconf=newJobConf(MaxTemperature.class);
conf.setJobName("Maxtemperature");
FileInputFormat.addInputPath(conf,newPath(args[0]));
FileOutputFormat.setOutputPath(conf,newPath(args[1]));
conf.setMapperClass(MaxTemperatureMapper.class);
conf.setReducerClass(MaxTemperatureReducer.class);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
JobClient.runJob(conf);
}
}
JobConf对象构成了mapReducejob的说明,给出了job的总体控制。
当执行MapReduce工作的时候我们需要将代码打包成一个jar文件(这个文件将被Hadoop在集群中分发)。
我们并没有指定jar文件,但是我们传递了一个class给JobConf的构造函数,Hadoop将利用它通过查找包含这个类的jar文件而去定位相关的jar件。
之后我们指定input、output路径。
FileInputFormat.addInputPath(conf,newPath(args[0]));
FileInputFormat的静态方法addInputPath来添加inputpath,inputpath可以是文件名或者目录,如果是目录的话,在目录下面的文件都会作为输入。
addInputPath可以调用多次。
FileOutputFormat.setOutputPath(conf,newPath(args[1]));
FileOutputFormat的setOutputPath()方法指定outputpath。
这个目录在运行job之前是不应该存在的,这样可以阻止数据丢失。
conf.setMapperClass(MaxTemperatureMapper.class);
conf.setReducerClass(MaxTemperatureReducer.class);
指定了mapper和reducer类。
conf.setOutputKeyClass(Text.class);设置outputkey类型
conf.setOutputValueClass(IntWritable.class);设置outputvalue类型
一般map和reduce的key、value类型都是一样的,如果不一样的话可以调用setMapOutputKeyClass()和setMapOutputValueClass()来设置。
输入类型由inputformat控制,本例使用的是默认的Text格式,所以没有显式指定。
JobClient.runJob(conf);提交工作,等待工作完成。
2.4.TheNewJavaMapReduceAPI
0.20.0版本的Hadoop新增了一个ContextObject,为API将来进化做准备。
新旧api不兼容,要想使用新api的特性,程序需要重写。
主要有以下几处重大改进:
1)ThenewAPIfavorsabstractclassesoverinterfaces,sincetheseareeasiertoevolve.
Forexample,youcanaddamethod(withadefaultimplementation)toanabstract
classwithoutbreakingoldimplementationsoftheclass.InthenewAPI,the
MapperandReducerinterfacesarenowabstractclasses.
2)ThenewAPIisintheorg.apache.hadoop.mapreducepackage(andsubpackages).
TheoldAPIisfoundinorg.apache.hadoop.mapred.
3)ThenewAPImakesextensiveuseofcontextobjectsthatallowtheusercodeto
communicatewiththeMapReducesystem.TheMapContext,forexample,essentially
uni