Hadoop环境搭建火龙果.docx
《Hadoop环境搭建火龙果.docx》由会员分享,可在线阅读,更多相关《Hadoop环境搭建火龙果.docx(30页珍藏版)》请在冰点文库上搜索。
Hadoop环境搭建火龙果
Hadoop环境搭建及相关知识
下载
为了获取Hadoop的发行版,从Apache的某个镜像服务器上下载最近的稳定发行版。
运行Hadoop集群的准备工作
解压所下载的Hadoop发行版。
编辑conf/hadoop-env.sh文件,至少需要将JAVA_HOME设置为Java安装根路径。
尝试如下命令:
$bin/hadoop
将会显示hadoop脚本的使用文档。
现在你可以用以下三种支持的模式中的一种启动Hadoop集群:
单机模式
伪分布式模式
完全分布式模式
单机模式的操作方法
默认情况下,Hadoop被配置成以非分布式模式运行的一个独立Java进程。
这对调试非常有帮助。
下面的实例将未解压的conf目录拷贝作为输入,查找并显示匹配给定正则表达式的条目。
输出写入到指定的output目录。
$mkdirinput
$cpconf/*.xmlinput
$bin/hadoopjarhadoop-*-examples.jargrepinputoutput'dfs[a-z.]+'
$catoutput/*
伪分布式模式的操作方法
Hadoop可以在单节点上以所谓的伪分布式模式运行,此时每一个Hadoop守护进程都作为一个独立的Java进程运行。
配置
使用如下的conf/hadoop-site.xml:
fs.default.name
localhost:
9000
mapred.job.tracker
localhost:
9001
dfs.replication
1
免密码ssh设置
现在确认能否不输入口令就用ssh登录localhost:
$sshlocalhost
如果不输入口令就无法用ssh登陆localhost,执行下面的命令:
$ssh-keygen-tdsa-P''-f~/.ssh/id_dsa
$cat~/.ssh/id_dsa.pub>>~/.ssh/authorized_keys
执行
格式化一个新的分布式文件系统:
$bin/hadoopnamenode-format
启动Hadoop守护进程:
$bin/start-all.sh
Hadoop守护进程的日志写入到${HADOOP_LOG_DIR}目录(默认是${HADOOP_HOME}/logs).
浏览NameNode和JobTracker的网络接口,它们的地址默认为:
NameNode-http:
//localhost:
50070/
JobTracker-http:
//localhost:
50030/
将输入文件拷贝到分布式文件系统:
$bin/hadoopdfs-putconfinput
运行发行版提供的示例程序:
$bin/hadoopjarhadoop-*-examples.jargrepinputoutput'dfs[a-z.]+'
查看输出文件:
将输出文件从分布式文件系统拷贝到本地文件系统查看:
$bin/hadoopdfs-getoutputoutput
$catoutput/*
或者
查看分布式文件系统上的输出文件:
$bin/hadoopdfs-catoutput/*
完成全部操作后,停止守护进程:
$bin/stop-all.sh
分布式模式的操作方法
安装Hadoop集群通常要将安装软件解压到集群内的所有机器上。
通常,集群里的一台机器被指定为NameNode,另一台不同的机器被指定为JobTracker。
这些机器是masters。
余下的机器即作为DataNode也作为TaskTracker。
这些机器是slaves。
我们用HADOOP_HOME指代安装的根路径。
通常,集群里的所有机器的HADOOP_HOME路径相同。
配置
接下来的几节描述了如何配置Hadoop集群。
配置文件
对Hadoop的配置通过conf/目录下的两个重要配置文件完成:
hadoop-default.xml-只读的默认配置。
hadoop-site.xml-集群特有的配置。
要了解更多关于这些配置文件如何影响Hadoop框架的细节,请看这里。
此外,通过设置conf/hadoop-env.sh中的变量为集群特有的值,你可以对bin/目录下的Hadoop脚本进行控制。
集群配置
要配置Hadoop集群,你需要设置Hadoop守护进程的运行环境和Hadoop守护进程的运行参数。
Hadoop守护进程指NameNode/DataNode和JobTracker/TaskTracker。
配置Hadoop守护进程的运行环境
管理员可在conf/hadoop-env.sh脚本内对Hadoop守护进程的运行环境的做特别指定。
最少,你得设定JAVA_HOME使之在每一远端节点上都被正确设置。
其它可定制的常用参数还包括:
HADOOP_LOG_DIR-守护进程日志文件的存放目录。
如果不存在会被自动创建。
HADOOP_HEAPSIZE-最大可用的堆大小,单位为MB。
比如,2000MB。
配置Hadoop守护进程的运行参数
这部分涉及Hadoop集群的重要参数,这些参数在conf/hadoop-site.xml中指定。
参数
取值
备注
fs.default.name
NameNode的主机名或者IP。
主机:
端口。
mapred.job.tracker
JobTracker的主机名或者IP。
主机:
端口。
dfs.name.dir
NameNode持久存储名字空间及事务日志的本地文件系统路径。
当这个值是一个逗号分割的目录列表时,nametable数据将会被复制到所有目录中做冗余备份。
dfs.data.dir
DataNode存放块数据的本地文件系统路径,逗号分割的列表。
当这个值是逗号分割的目录列表时,数据将被存储在所有目录下,通常分布在不同设备上。
mapred.system.dir
Map-Reduce框架存储系统文件的HDFS路径。
比如/hadoop/mapred/system/。
这个路径是默认文件系统(HDFS)下的路径,须从服务器和客户端上均可访问。
mapred.local.dir
本地文件系统下逗号分割的路径列表,Map-Reduce临时数据存放的地方。
多路径有助于利用磁盘i/o。
mapred.tasktracker
.{map|reduce}.tasks.maximum
某一TaskTracker上可运行的最大map/reduce任务数,这些任务将同时各自运行。
默认为2(2个map和2个reduce),可依据硬件情况更改。
dfs.hosts/dfs.hosts.exclude
许可/拒绝DataNode列表。
如有必要,用这个文件控制许可的datanode列表。
mapred.hosts
/mapred.hosts.exclude
许可/拒绝TaskTracker列表。
如有必要,用这个文件控制许可的tasktracker列表。
通常,上述参数被标记为final以确保它们不被用户应用更改。
现实世界的集群配置
这节罗列在大规模集群上运行sort基准测试(benchmark)时使用到的一些非缺省配置。
运行sort900需要的一些非缺省配置值,sort900即在900个节点的集群上对9TB的数据进行排序:
参数
取值
备注
dfs.block.size
134217728
针对大文件系统,HDFS的块大小取128MB。
dfs.namenode.handler.count
40
启动更多的NameNode服务线程去处理来自大量DataNode的RPC请求。
mapred.reduce.parallel.copies
20
reduce需要启动更多的map输出拷贝器以获取大量map的输出。
mapred.child.java.opts
-Xmx512M
为map/reduce的子jvm使用更大的堆。
fs.inmemory.size.mb
200
为reduce阶段合并map输出所需的内存文件系统分配更多的内存。
io.sort.factor
100
更多的流个数,对文件排序时这些流将同时被归并。
io.sort.mb
200
提高排序时的内存上限。
io.file.buffer.size
131072
SequenceFile中用到的读/写缓存大小。
运行sort1400和sort2000时需要更新的配置,即在1400个节点上对14TB的数据进行排序和在2000个节点上对20TB的数据进行排序:
参数
取值
备注
mapred.job.tracker.handler.count
60
启用更多的JobTracker服务线程去处理来自大量TaskTracker的RPC请求。
mapred.reduce.parallel.copies
50
tasktracker.http.threads
50
为TaskTracker的Http服务启用更多的工作线程。
reduce通过Http服务获取map的中间输出。
mapred.child.java.opts
-Xmx1024M
Slaves
通常,你选择集群中的一台机器作为NameNode,另外一台不同的机器作为JobTracker。
余下的机器即作为DataNode又作为TaskTracker,这些被称之为slaves。
在conf/slaves文件中列出所有slave的主机名或者地址,一行一个。
日志
Hadoop使用Apachelog4j来记录日志,这是通过ApacheCommonsLogging框架来完成的。
编辑conf/log4j.properties文件可以改变Hadoop守护进程的日志配置(日志格式等)。
历史日志
作业的历史文件集中存放在hadoop.job.history.location,这个也可以是在分布式文件系统下的路径,其默认值为${HADOOP_LOG_DIR}/history。
jobtracker的webUI上有历史日志的webUI链接。
历史文件在用户指定的目录hadoop.job.history.user.location也会记录一份,这个配置的缺省值为作业的输出目录。
这些文件被存放在指定路径下的“_logs/history/”目录中。
因此,默认情况下日志文件会在“mapred.output.dir/_logs/history/”下。
如果将hadoop.job.history.user.location指定为值none,系统将不再记录日志。
用户可使用以下命令在指定路径下查看历史日志汇总
$bin/hadoopjob-historyoutput-dir
这条命令会显示作业的细节信息,失败和中止的提示信息。
关于作业的更多细节,比如成功的任务,以及对每个任务的所做的尝试等可以用下面的命令查看
$bin/hadoopjob-historyalloutput-dir
一但全部必要的配置完成,将这些文件分发到所有机器的HADOOP_CONF_DIR路径下,通常是${HADOOP_HOME}/conf。
Hadoop的机架感知
HDFS和Map-Reduce的组件是能够感知机架的。
NameNode和JobTracker通过调用管理员配置模块中的APIresolve来获取集群里每个slave的机架id。
该API将slave的DNS名称(或者IP地址)转换成机架id。
使用哪个模块是通过配置项topology.node.switch.mapping.impl来指定的。
模块的默认实现会调用topology.script.file.name配置项指定的一个的脚本/命令。
如果topology.script.file.name未被设置,对于所有传入的IP地址,模块会返回/default-rack作为机架id。
在Map-Reduce部分还有一个额外的配置项mapred.cache.task.levels,该参数决定cache的级数(在网络拓扑中)。
例如,如果默认值是2,会建立两级的cache-一级针对主机(主机->任务的映射)另一级针对机架(机架->任务的映射)。
启动Hadoop
启动Hadoop集群需要启动HDFS集群和Map-Reduce集群。
格式化一个新的分布式文件系统:
$bin/hadoopnamenode-format
在分配的NameNode上,运行下面的命令启动HDFS:
$bin/start-dfs.sh
bin/start-dfs.sh脚本会参照NameNode上${HADOOP_CONF_DIR}/slaves文件的内容,在所有列出的slave上启动DataNode守护进程。
在分配的JobTracker上,运行下面的命令启动Map-Reduce:
$bin/start-mapred.sh
bin/start-mapred.sh脚本会参照JobTracker上${HADOOP_CONF_DIR}/slaves文件的内容,在所有列出的slave上启动TaskTracker守护进程。
停止Hadoop
在分配的NameNode上,执行下面的命令停止HDFS:
$bin/stop-dfs.sh
bin/stop-dfs.sh脚本会参照NameNode上${HADOOP_CONF_DIR}/slaves文件的内容,在所有列出的slave上停止DataNode守护进程。
在分配的JobTracker上,运行下面的命令停止Map-Reduce:
$bin/stop-mapred.sh
bin/stop-mapred.sh脚本会参照JobTracker上${HADOOP_CONF_DIR}/slaves文件的内容,在所有列出的slave上停止TaskTracker守护进程。
WordCountV2实践
importorg.apache.hadoop.fs.Path;
importorg.apache.hadoop.filecache.DistributedCache;
importorg.apache.hadoop.conf.*;
importorg.apache.hadoop.io.*;
importorg.apache.hadoop.mapred.*;
importorg.apache.hadoop.util.*;
publicclassWordCountV2extendsConfiguredimplementsTool{
publicstaticclassMapextendsMapReduceBaseimplementsMapper{
staticenumCounters{INPUT_WORDS}
privatefinalstaticIntWritableone=newIntWritable
(1);
privateTextword=newText();
privatebooleancaseSensitive=true;
privateSetpatternsToSkip=newHashSet();
privatelongnumRecords=0;
privateStringinputFile;
publicvoidconfigure(JobConfjob){
caseSensitive=job.getBoolean("wordcount.case.sensitive",true);
inputFile=job.get("map.input.file");
if(job.getBoolean("wordcount.skip.patterns",false)){
Path[]patternsFiles=newPath[0];
try{
patternsFiles=DistributedCache.getLocalCacheFiles(job);
}catch(IOExceptionioe){
System.err.println("Caughtexceptionwhilegettingcachedfiles:
"+StringUtils.stringifyException(ioe));
}
for(PathpatternsFile:
patternsFiles){
parseSkipFile(patternsFile);
}
}
}
privatevoidparseSkipFile(PathpatternsFile){
try{
BufferedReaderfis=newBufferedReader(newFileReader(patternsFile.toString()));
Stringpattern=null;
while((pattern=fis.readLine())!
=null){
patternsToSkip.add(pattern);
}
}catch(IOExceptionioe){
System.err.println("Caughtexceptionwhileparsingthecachedfile'"+patternsFile+"':
"+StringUtils.stringifyException(ioe));
}
}
publicvoidmap(LongWritablekey,Textvalue,OutputCollectoroutput,Reporterreporter)throwsIOException{
Stringline=(caseSensitive)?
value.toString():
value.toString().toLowerCase();
for(Stringpattern:
patternsToSkip){
line=line.replaceAll(pattern,"");
}
StringTokenizertokenizer=newStringTokenizer(line);
while(tokenizer.hasMoreTokens()){
word.set(tokenizer.nextToken());
output.collect(word,one);
reporter.incrCounter(Counters.INPUT_WORDS,1);
}
if((++numRecords%100)==0){
reporter.setStatus("Finishedprocessing"+numRecords+"records"+"fromtheinputfile:
"+inputFile);
}
}
}
publicstaticclassReduceextendsMapReduceBaseimplementsReducer{
publicvoidreduce(Textkey,Iteratorvalues,OutputCollectoroutput,Reporterreporter)throwsIOException{
intsum=0;
while(values.hasNext()){
sum+=values.next().get();
}
output.collect(key,newIntWritable(sum));
}
}
publicintrun(String[]args)throwsException{
JobConfconf=newJobConf(getConf(),WordCountV2.class);
conf.setJobName("wordcount");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(Map.class);
conf.setCombinerClass(Reduce.class);
conf.setReducerClass(Reduce.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
Listother_args=newArrayList();
for(inti=0;iif("-skip".equals(args[i])){
DistributedCache.addCacheFile(newPath(args[++i]).toUri(),conf);
conf.setBoolean("wordcount.skip.patterns",true);
}else{
other_args.add(args[i]);
}
}
FileInputFormat.setInputPaths(conf,newPath(other_args.get(0)));
FileOutputFormat.setOutputPath(conf,newPath(other_args.get
(1)));
JobClient.runJob(conf);
return0;
}
publicstaticvoidmain(String[]args)throwsException{
intres=ToolRunner.run(newCo