1、Hadoop 文件输入和文件输出 学步园Hadoop 文件输入和文件输出 学步园 本文完成对hadoop输入、输出文件方式的控制,完成的功能如下:1、改写map读取数据的格式:默认的-变为2、改写输出的格式,输出文件时每个输入文件对应一个输出文件,输出文件的名字跟输入文件名字相同。直接上代码:coAuInputFormatpackage an.hadoop.code.audit;/* * The function of this class is revise the input format * the - map * of the map * */import java.io.IOExce
2、ption;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import press.CompressionCodec;import press.CompressionCodecFactory;import org.apache.hadoop.mapred.FileSplit;import org.apache.hadoop.mapreduce.I
3、nputSplit;import org.apache.hadoop.mapreduce.RecordReader;import org.apache.hadoop.mapreduce.TaskAttemptContext;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;public class coAuInputFormat extends FileInputFormat private CompressionCodecFactory compressionCodecs = null; public void conf
4、igure(Configuration conf) compressionCodecs = new CompressionCodecFactory(conf); /* * brief isSplitable 不对文件进行切分,必须对文件整体进行处理 * * param fs * param file * * return false */ protected boolean isSplitable(FileSystem fs, Path file) CompressionCodec codec = compressionCodecs.getCodec(file); return false;/
5、以文件为单位,每个单位作为一个split,即使单个文件的大小超过了64M,也就是Hadoop一个块得大小,也不进行分片 Override public RecordReader createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException / TODO Auto-generated method stub return new coAuRecordReader(context, split); coAuRecordReaderpackage an
6、.hadoop.code.audit;import java.io.IOException;import mons.logging.Log;import mons.logging.LogFactory;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FSDataInputStream;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;impo
7、rt press.CompressionCodec;import press.CompressionCodecFactory;import org.apache.hadoop.mapreduce.InputSplit;import org.apache.hadoop.mapreduce.RecordReader;import org.apache.hadoop.mapreduce.TaskAttemptContext;import org.apache.hadoop.mapreduce.lib.input.FileSplit;public class coAuRecordReader exte
8、nds RecordReader private static final Log LOG = LogFactory.getLog(coAuRecordReader.class.getName(); private CompressionCodecFactory compressionCodecs = null; private long start; private long pos; private long end; private byte buffer; private String keyName; private FSDataInputStream fileIn; private
9、 Text key = null; private Text value = null; public coAuRecordReader(TaskAttemptContext context, InputSplit genericSplit) throws IOException / TODO Auto-generated constructor stub Configuration job = context.getConfiguration(); FileSplit split = (FileSplit) genericSplit; start = (FileSplit) split).g
10、etStart(); /从中可以看出每个文件是作为一个split的 end = split.getLength() + start; final Path path = split.getPath();/ keyName = path.toString();/key 的值是文件路径 LOG.info(filename in hdfs is : + keyName);/写入日志文件,去哪里查看日志呢? final FileSystem fs = path.getFileSystem(job); fileIn = fs.open(path); fileIn.seek(start); buffer
11、= new byte(int)(end - start); this.pos = start; /*if(key = null) key = new Text(); key.set(keyName); if(value = null) value = new Text(); value.set(utf8); */ /coAuRecordReader() Override public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException, InterruptedExcept
12、ion / TODO Auto-generated method stub FileSplit split = (FileSplit) genericSplit; Configuration job = context.getConfiguration(); /this.maxLineLength = job.getInt(mapred.linerecordreader.maxlength,Integer.MAX_VALUE); start = split.getStart(); end = start + split.getLength(); final Path file = split.
13、getPath(); compressionCodecs = new CompressionCodecFactory(job); final CompressionCodec codec = compressionCodecs.getCodec(file); keyName = file.toString();/key 的值是文件路径 LOG.info(filename in hdfs is : + keyName);/写入日志文件,去哪里查看日志呢? final FileSystem fs = file.getFileSystem(job); fileIn = fs.open(file);
14、fileIn.seek(start); buffer = new byte(int)(end - start); this.pos = start; Override public boolean nextKeyValue() throws IOException, InterruptedException / TODO Auto-generated method stub /这个是需要做的 if(key = null) key = new Text(); key.set(keyName); if(value = null) value = new Text(); key.clear(); k
15、ey.set(keyName);/ set the key value.clear();/clear the value while(pos extends FileOutputFormat /默认的是TextOutputFormat private MultiRecordWriter writer = null; public RecordWriter getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException if (writer = null) writer = new MultiRec
16、ordWriter(job, getTaskOutputPath(job);/job ,output path return writer; private Path getTaskOutputPath(TaskAttemptContext conf) throws IOException /获得输出路径 Path workPath = null; OutputCommitter committer = super.getOutputCommitter(conf); if (committer instanceof FileOutputCommitter) /如果是 workPath = (F
17、ileOutputCommitter) committer).getWorkPath();/工作路径 else Path outputPath = super.getOutputPath(conf);/获得conf路径 if (outputPath = null) throw new IOException(Undefined job output-path); workPath = outputPath; return workPath; / /*通过key, value, conf来确定输出文件名(含扩展名)*/ protected abstract String generateFile
18、NameForKeyValue(K key, V value, Configuration conf);/抽象方法,被之后的方法重写了 public class MultiRecordWriter extends RecordWriter /*RecordWriter的缓存*/ private HashMap recordWriters = null; private TaskAttemptContext job = null; /*输出目录*/ private Path workPath = null; public MultiRecordWriter(TaskAttemptContext
19、job, Path workPath) /构造函数 super(); this.job = job; this.workPath = workPath; recordWriters = new HashMap(); Override public void close(TaskAttemptContext context) throws IOException, InterruptedException /多个writer都要关掉 Iterator values = this.recordWriters.values().iterator(); while (values.hasNext()
20、values.next().close(context); this.recordWriters.clear(); Override public void write(K key, V value) throws IOException, InterruptedException /得到输出文件名 String baseName = generateFileNameForKeyValue(key, value, job.getConfiguration();/生成输出文件名 RecordWriter rw = this.recordWriters.get(baseName);/? if (r
21、w = null) rw = getBaseRecordWriter(job, baseName);/ this.recordWriters.put(baseName, rw); rw.write(key, value); / $mapred.out.dir/_temporary/_$taskid/$nameWithExtension private RecordWriter getBaseRecordWriter(TaskAttemptContext job, String baseName) throws IOException, InterruptedException Configur
22、ation conf = job.getConfiguration(); boolean isCompressed = getCompressOutput(job); String keyValueSeparator = ,; RecordWriter recordWriter = null; if (isCompressed) Class codecClass = getOutputCompressorClass(job, GzipCodec.class); CompressionCodec codec = ReflectionUtils.newInstance(codecClass, conf); Path file = new Path(workPath, base
copyright@ 2008-2023 冰点文库 网站版权所有
经营许可证编号:鄂ICP备19020893号-2