ImageVerifierCode 换一换
格式:DOCX , 页数:16 ,大小:18.73KB ,
资源ID:8837250      下载积分:3 金币
快捷下载
登录下载
邮箱/手机:
温馨提示:
快捷下载时,用户名和密码都是您填写的邮箱或者手机号,方便查询和重复下载(系统自动生成)。 如填写123,账号就是123,密码也是123。
特别说明:
请自助下载,系统不会自动发送文件的哦; 如果您已付费,想二次下载,请登录后访问:我的下载记录
支付方式: 支付宝    微信支付   
验证码:   换一换

加入VIP,免费下载
 

温馨提示:由于个人手机设置不同,如果发现不能下载,请复制以下地址【https://www.bingdoc.com/d-8837250.html】到电脑端继续下载(重复下载不扣费)。

已注册用户请登录:
账号:
密码:
验证码:   换一换
  忘记密码?
三方登录: 微信登录   QQ登录  

下载须知

1: 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。
2: 试题试卷类文档,如果标题没有明确说明有答案则都视为没有答案,请知晓。
3: 文件的所有权益归上传用户所有。
4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
5. 本站仅提供交流平台,并不能对任何下载内容负责。
6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

版权提示 | 免责声明

本文(Hadoop 文件输入和文件输出学步园.docx)为本站会员(b****5)主动上传,冰点文库仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对上载内容本身不做任何修改或编辑。 若此文所含内容侵犯了您的版权或隐私,请立即通知冰点文库(发送邮件至service@bingdoc.com或直接QQ联系客服),我们立即给予删除!

Hadoop 文件输入和文件输出学步园.docx

1、Hadoop 文件输入和文件输出 学步园Hadoop 文件输入和文件输出 学步园 本文完成对hadoop输入、输出文件方式的控制,完成的功能如下:1、改写map读取数据的格式:默认的-变为2、改写输出的格式,输出文件时每个输入文件对应一个输出文件,输出文件的名字跟输入文件名字相同。直接上代码:coAuInputFormatpackage an.hadoop.code.audit;/* * The function of this class is revise the input format * the - map * of the map * */import java.io.IOExce

2、ption;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import press.CompressionCodec;import press.CompressionCodecFactory;import org.apache.hadoop.mapred.FileSplit;import org.apache.hadoop.mapreduce.I

3、nputSplit;import org.apache.hadoop.mapreduce.RecordReader;import org.apache.hadoop.mapreduce.TaskAttemptContext;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;public class coAuInputFormat extends FileInputFormat private CompressionCodecFactory compressionCodecs = null; public void conf

4、igure(Configuration conf) compressionCodecs = new CompressionCodecFactory(conf); /* * brief isSplitable 不对文件进行切分,必须对文件整体进行处理 * * param fs * param file * * return false */ protected boolean isSplitable(FileSystem fs, Path file) CompressionCodec codec = compressionCodecs.getCodec(file); return false;/

5、以文件为单位,每个单位作为一个split,即使单个文件的大小超过了64M,也就是Hadoop一个块得大小,也不进行分片 Override public RecordReader createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException / TODO Auto-generated method stub return new coAuRecordReader(context, split); coAuRecordReaderpackage an

6、.hadoop.code.audit;import java.io.IOException;import mons.logging.Log;import mons.logging.LogFactory;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FSDataInputStream;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;impo

7、rt press.CompressionCodec;import press.CompressionCodecFactory;import org.apache.hadoop.mapreduce.InputSplit;import org.apache.hadoop.mapreduce.RecordReader;import org.apache.hadoop.mapreduce.TaskAttemptContext;import org.apache.hadoop.mapreduce.lib.input.FileSplit;public class coAuRecordReader exte

8、nds RecordReader private static final Log LOG = LogFactory.getLog(coAuRecordReader.class.getName(); private CompressionCodecFactory compressionCodecs = null; private long start; private long pos; private long end; private byte buffer; private String keyName; private FSDataInputStream fileIn; private

9、 Text key = null; private Text value = null; public coAuRecordReader(TaskAttemptContext context, InputSplit genericSplit) throws IOException / TODO Auto-generated constructor stub Configuration job = context.getConfiguration(); FileSplit split = (FileSplit) genericSplit; start = (FileSplit) split).g

10、etStart(); /从中可以看出每个文件是作为一个split的 end = split.getLength() + start; final Path path = split.getPath();/ keyName = path.toString();/key 的值是文件路径 LOG.info(filename in hdfs is : + keyName);/写入日志文件,去哪里查看日志呢? final FileSystem fs = path.getFileSystem(job); fileIn = fs.open(path); fileIn.seek(start); buffer

11、= new byte(int)(end - start); this.pos = start; /*if(key = null) key = new Text(); key.set(keyName); if(value = null) value = new Text(); value.set(utf8); */ /coAuRecordReader() Override public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException, InterruptedExcept

12、ion / TODO Auto-generated method stub FileSplit split = (FileSplit) genericSplit; Configuration job = context.getConfiguration(); /this.maxLineLength = job.getInt(mapred.linerecordreader.maxlength,Integer.MAX_VALUE); start = split.getStart(); end = start + split.getLength(); final Path file = split.

13、getPath(); compressionCodecs = new CompressionCodecFactory(job); final CompressionCodec codec = compressionCodecs.getCodec(file); keyName = file.toString();/key 的值是文件路径 LOG.info(filename in hdfs is : + keyName);/写入日志文件,去哪里查看日志呢? final FileSystem fs = file.getFileSystem(job); fileIn = fs.open(file);

14、fileIn.seek(start); buffer = new byte(int)(end - start); this.pos = start; Override public boolean nextKeyValue() throws IOException, InterruptedException / TODO Auto-generated method stub /这个是需要做的 if(key = null) key = new Text(); key.set(keyName); if(value = null) value = new Text(); key.clear(); k

15、ey.set(keyName);/ set the key value.clear();/clear the value while(pos extends FileOutputFormat /默认的是TextOutputFormat private MultiRecordWriter writer = null; public RecordWriter getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException if (writer = null) writer = new MultiRec

16、ordWriter(job, getTaskOutputPath(job);/job ,output path return writer; private Path getTaskOutputPath(TaskAttemptContext conf) throws IOException /获得输出路径 Path workPath = null; OutputCommitter committer = super.getOutputCommitter(conf); if (committer instanceof FileOutputCommitter) /如果是 workPath = (F

17、ileOutputCommitter) committer).getWorkPath();/工作路径 else Path outputPath = super.getOutputPath(conf);/获得conf路径 if (outputPath = null) throw new IOException(Undefined job output-path); workPath = outputPath; return workPath; / /*通过key, value, conf来确定输出文件名(含扩展名)*/ protected abstract String generateFile

18、NameForKeyValue(K key, V value, Configuration conf);/抽象方法,被之后的方法重写了 public class MultiRecordWriter extends RecordWriter /*RecordWriter的缓存*/ private HashMap recordWriters = null; private TaskAttemptContext job = null; /*输出目录*/ private Path workPath = null; public MultiRecordWriter(TaskAttemptContext

19、job, Path workPath) /构造函数 super(); this.job = job; this.workPath = workPath; recordWriters = new HashMap(); Override public void close(TaskAttemptContext context) throws IOException, InterruptedException /多个writer都要关掉 Iterator values = this.recordWriters.values().iterator(); while (values.hasNext()

20、values.next().close(context); this.recordWriters.clear(); Override public void write(K key, V value) throws IOException, InterruptedException /得到输出文件名 String baseName = generateFileNameForKeyValue(key, value, job.getConfiguration();/生成输出文件名 RecordWriter rw = this.recordWriters.get(baseName);/? if (r

21、w = null) rw = getBaseRecordWriter(job, baseName);/ this.recordWriters.put(baseName, rw); rw.write(key, value); / $mapred.out.dir/_temporary/_$taskid/$nameWithExtension private RecordWriter getBaseRecordWriter(TaskAttemptContext job, String baseName) throws IOException, InterruptedException Configur

22、ation conf = job.getConfiguration(); boolean isCompressed = getCompressOutput(job); String keyValueSeparator = ,; RecordWriter recordWriter = null; if (isCompressed) Class codecClass = getOutputCompressorClass(job, GzipCodec.class); CompressionCodec codec = ReflectionUtils.newInstance(codecClass, conf); Path file = new Path(workPath, base

copyright@ 2008-2023 冰点文库 网站版权所有

经营许可证编号:鄂ICP备19020893号-2