hadoop倒排索引实验报告.docx
《hadoop倒排索引实验报告.docx》由会员分享,可在线阅读,更多相关《hadoop倒排索引实验报告.docx(27页珍藏版)》请在冰点文库上搜索。
hadoop倒排索引实验报告
大数据技术概论实验报告
作
业
三
姓名:
郭利强
专业:
工程管理专业
学号:
2015E8009064028
目录
1.实验要求3
2.环境说明4
2.1系统硬件4
2.2系统软件4
2.3集群配置4
3.实验设计4
3.1第一部分设计4
3.2第二部分设计6
4.程序代码11
4.1第一部分代码11
4.2第二部分代码17
5.实验输入和结果21
实验输入输出结果见压缩包中对应目录21
1.实验要求
第一部分:
采用辅助排序的设计方法,对于输入的N个IP网络流量文件,计算得到文件中的各个源IP地址连接的不同目的IP地址个数,即对各个源IP地址连接的目的IP地址去重并计数
举例如下:
第二部分:
输入N个文件,生成带详细信息的倒排索引
举例如下,有4个输入文件:
–d1.txt:
catdogcatfox
–d2.txt:
catbearcatcatfox
–d3.txt:
foxwolfdog
–d4.txt:
wolfhenrabbitcatsheep
要求建立如下格式的倒排索引:
–cat—>3:
4:
{(d1.txt,2,4),(d2.txt,3,5),(d4.txt,1,5)}–单词—>出现该单词的文件个数:
总文件个数:
{(出现该单词的文件名,单词在该文件中的出现次数,该文件的总单词数),……}
2.环境说明
2.1系统硬件
处理器:
IntelCorei3-2350MCPU@2.3GHz×4
内存:
2GB
磁盘:
60GB
2.2系统软件
操作系统:
Ubuntu14.04LTS
操作系统类型:
32位
Java版本:
1.7.0_85
Eclipse版本:
3.8
Hadoop插件:
hadoop-eclipse-plugin-2.6.0.jar
Hadoop:
2.6.1
2.3集群配置
集群配置为伪分布模式,节点数量一个
3.实验设计
3.1第一部分设计
利用两个Map/Reduce过程,在第一个MR中,读取记录并去除重复记录,第二个MR按照辅助排序设计方法,根据源地址进行分组,统计目的地址数量。
第一个MR设计:
自定义StringPair{源地址,目的地址}类型,实现WritableComparable,在map过程读取文件,输出,reduce过程去除重复记录输出。
在第二个MR设计:
1.在Map过程读取第一个MR的输出,对value值进行拆分,并以拆分得到的源地址和目的地址初始化StringPair对象作为输出键,输出值为1。
publicvoidmap(Objectkey,Textvalue,Contextcontext)
throwsIOException,InterruptedException{
String[]records=value.toString().split("\t");
Stringsourceip=records[0];
Stringdesip=records[1];
context.write(newStringPair(sourceip,desip),one);
}
2.定义GroupComparator类,继承WritableComparator类,并重载compare方法,对Map过程输出按照StringPair.first排序,完成按照源地址分组。
publicstaticclassGroupComparatorextendsWritableComparator{
protectedGroupComparator(){
super(StringPair.class,true);
}
@Override
publicintcompare(WritableComparablew1,WritableComparablew2){
StringPairip1=(StringPair)w1;
StringPairip2=(StringPair)w2;
returnip1.getFirst().compareTo(ip2.getFirst());
}
}
3.在Reduce过程统计分组中的所有值,得到源地址连接不同目的地址数量。
publicvoidreduce(StringPairkey,Iterablevalues,Contextcontext)
throwsIOException,InterruptedException{
intsum=0;
for(IntWritableval:
values){
sum+=val.get();
}
statistic.set(sum);
context.write(key.getFirst(),statistic);
}
}
3.2第二部分设计
利用两个Map/Reduce过程,第一个MR统计各个文件中的所有单词的出现次数,以及各个文件单词总数,第二个MR根据统计结果处理加工得到单词倒排索引。
第一个MR设计:
1.在Map过程中,重写map类,利用StringTokenizer类,将map方法中的value值中存储的文本,拆分成一个个的单词,并获取文件名,以两种格式进行输出或者。
publicvoidmap(Objectkey,Textvalue,Contextcontext)
throwsIOException,InterruptedException{
//获取文件名
FileSplitfileSplit=(FileSplit)context.getInputSplit();
StringfileName=fileSplit.getPath().getName();
//获取单词在单个文件中出现次数,及文件单词总数
StringTokenizeritr=newStringTokenizer(value.toString());
for(;itr.hasMoreTokens();){
Stringword=removeNonLetters(itr.nextToken().toLowerCase());
StringfileWord=fileName+"\001"+word;
if(!
word.equals("")){
context.write(newText(fileWord),newIntWritable
(1));
context.write(newText(fileName),newIntWritable
(1));
}
}
}
2.在Reduce过程中,统计得到每个文件中每个单词的出现次数,以及每个文件的单词总数,输出。
publicvoidreduce(Textkey,Iterablevalues,Contextcontext)
throwsIOException,InterruptedException{
intsum=0;
for(IntWritableval:
values){
sum+=val.get();
}
context.write(key,newIntWritable(sum));
}
}
第二个MR设计:
1.Map过程读取第一个MR的输出,对value值进行拆分,重新组合后输出键为固定Text类型值index,值为filename+word+count或者filename+count。
publicvoidmap(Objectkey,Textvalue,Contextcontext)
throwsIOException,InterruptedException{
StringvalStr=value.toString();
String[]records=valStr.split("\t");
context.write(newText("index"),newText(records[0]+"\001"+records[1]));
}
}
2.Reduce过程中定义四个HashMap,Mapwordinfilescount,key为单词+文件名,value为单词在该文件中出现的次数;Mapfilescount,key为文件名,value为文件的单词总数;Mapwordinfiles,key为单词,value为单词在多少个文件中出现;Mapindexes,key为单词,value为倒排索引。
读取values值,根据设定分隔符拆分,判断拆分后长度如果为2,则该值为文件名+文件单词总数,将拆分后的文件名及文件单词总数,组成键值对放入Mapfilescount;拆分后长度如果为3,则该值为文件名+单词+单词在该文件中出现次数,将拆分后的文件名+单词及单词在该文件中出现次数组成键值对放入Mapwordinfilescount,同时统计单词在多少个文件中出现,并组成键值对放入Mapwordinfiles。
遍历Mapwordinfilescount,将单词作为键,“单词->出现该单词的文件个数:
总文件个数:
{(出现该单词的文件名,单词在该文件中的出现次数,该文件的总单词数)”作为值,放入Mapindexes中。
遍历Mapindexes获取倒排索引并输出全部索引。
publicvoidreduce(Textkey,Iterablevalues,Contextcontext)
throwsIOException,InterruptedException{
//拆分输入,获取单词出现在几个文件中以及在该文件中出现次数,各个文件的单词总数,总文件数
for(Textval:
values){
StringvalStr=val.toString();
String[]records=valStr.split("\001");
switch(records.length){
case2:
filescount.put(records[0],Integer.parseInt(records[1]));
break;
case3:
{
wordinfilescount.put(valStr,Integer.parseInt(records[2]));
if(!
wordinfiles.containsKey(records[1])){
wordinfiles.put(records[1],1);
}else{
wordinfiles.put(records[1],wordinfiles.get(records[1])+1);
}
};
break;
}
}
//处理获取倒排索引
for(Entryentry:
wordinfilescount.entrySet()){
StringvalStr=entry.getKey();
String[]records=valStr.split("\001");
Stringword=records[1];
if(!
indexes.containsKey(word)){
StringBuildersb=newStringBuilder();
sb.append(word)
.append("->")
.append(wordinfiles.get(word))
.append(":
")
.append(filescount.size())
.append(":
")
.append("{(")
.append(records[0])
.append(",")
.append(entry.getValue())
.append(",")
.append(filescount.get(records[0]))
.append(")");
indexes.put(word,sb.toString());
}else{
StringBuildersb=newStringBuilder();
sb.append(",(")
.append(records[0])
.append(",")
.append(entry.getValue())
.append(",")
.append(filescount.get(records[0]))
.append(")");
indexes.put(word,indexes.get(word)+sb.toString());
}
}
for(Entryentry:
indexes.entrySet()){
context.write(newText(entry.getValue()+"}"),NullWritable.get());
}
}
}
4.程序代码
4.1第一部分代码
1.IpStatistics.java
/**
*LicensedtotheApacheSoftwareFoundation(ASF)underone
*ormorecontributorlicenseagreements.SeetheNOTICEfile
*distributedwiththisworkforadditionalinformation
*regardingcopyrightownership.TheASFlicensesthisfile
*toyouundertheApacheLicense,Version2.0(the
*"License");youmaynotusethisfileexceptincompliance
*withtheLicense.YoumayobtainacopyoftheLicenseat
*
*http:
//www.apache.org/licenses/LICENSE-2.0
*
*Unlessrequiredbyapplicablelaworagreedtoinwriting,software
*distributedundertheLicenseisdistributedonan"ASIS"BASIS,
*WITHOUTWARRANTIESORCONDITIONSOFANYKIND,eitherexpressorimplied.
*SeetheLicenseforthespecificlanguagegoverningpermissionsand
*limitationsundertheLicense.
*/
importjava.io.IOException;
importjava.util.ArrayList;
importjava.util.Collections;
importjava.util.Comparator;
importjava.util.HashMap;
importjava.util.List;
importjava.util.Map;
importjava.util.Map.Entry;
importjava.util.StringTokenizer;
importorg.apache.hadoop.conf.Configuration;
importorg.apache.hadoop.io.IntWritable;
importorg.apache.hadoop.io.NullWritable;
importorg.apache.hadoop.io.Text;
importorg.apache.hadoop.io.WritableComparable;
importorg.apache.hadoop.io.WritableComparator;
importorg.apache.hadoop.mapreduce.Job;
importorg.apache.hadoop.mapreduce.Mapper;
importorg.apache.hadoop.mapreduce.Reducer;
importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;
importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
importorg.apache.hadoop.util.GenericOptionsParser;
importorg.apache.hadoop.fs.Path;
publicclassIpStatistics{
//第一个Map/Reduce的map类,用于去重
publicstaticclassRemoveMapperextends
Mapper
publicvoidmap(Objectkey,Textvalue,Contextcontext)
throwsIOException,InterruptedException{
StringTokenizeritr=newStringTokenizer(value.toString());
while(itr.hasMoreTokens()){
StringnextToken=itr.nextToken();
String[]records=nextToken.split(",");
Stringsourceip=records[0].replace("<","");
Stringdestinationip=records[1].replace(">","");
context.write(newStringPair(sourceip,destinationip),NullWritable.get());
}
}
}
//第二个Map/Reduce过程map类,用于统计
publicstaticclassStatisticsMapperextends
Mapper
IntWritableone=newIntWritable
(1);
publicvoidmap(Objectkey,Textvalue,Contextcontext)
throwsIOException,InterruptedException{
String[]records=value.toString().split("\t");
Stringsourceip=records[0];
Stringdesip=records[1];
context.write(newStringPair(sourceip,desip),one);
}
}
//按照源地址分组
publicstaticclassGroupComparatorextendsWritableComparator{
protectedGroupComparator(){
super(StringPair.class,true);
}
@Override
publicintcompare(WritableComparablew1,WritableComparablew2){
StringPairip1=(StringPair)w1;
StringPairip2=(StringPair)w2;
returnip1.getFirst().compareTo(ip2.getFirst());
}
}
//第一个Map/Reduce过程reduce过程,去重
publicstaticclassRemoveReducerextends
Reducer{
publicvoidreduce(StringPairkey,Iterablevalues,Contextcontext)
throwsIOException,InterruptedException{
context.write(newText(key.toString()),NullWritable.get());
}
}
//第二个Map/Reduce过程reduce过程,统计
publicstaticclassStatisticsReducerextends
Reducer{