扩展资料/Hadoop的MultipleOutputFormat使用
优质
小牛编辑
136浏览
2023-12-01
一、背景
Hadoop的MapReduce中多文件输出默认是TextOutFormat,输出为part-r- 00000和part-r-00001依次递增的文件名。hadoop提供了
MultipleOutputFormat类,重写该类可实现定制自定义的文件名。
二、技术细节
1.环境:hadoop 0.19(目前hadoop 0.20.2对MultipleOutputFormat支持不好),linux。
2.实现MultipleOutputFormat代码例子如下:
public class WordCount { public static class TokenizerMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> { private final static IntWritable count = new IntWritable(1); private Text word = new Text(); public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); output.collect(word, count); } } } public static class IntSumReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { int sum = 0; while (values.hasNext()) { sum += values.next().get(); } result.set(sum); output.collect(key, result); } } public static class WordCountOutputFormat extends MultipleOutputFormat<Text, IntWritable> { private TextOutputFormat<Text, IntWritable> output = null; @Override protected RecordWriter<Text, IntWritable> getBaseRecordWriter( FileSystem fs, JobConf job, String name, Progressable arg3) throws IOException { if (output == null) { output = new TextOutputFormat<Text, IntWritable>(); } return output.getRecordWriter(fs, job, name, arg3); } @Override protected String generateFileNameForKeyValue(Text key, IntWritable value, String name) { char c = key.toString().toLowerCase().charAt(0); if (c >= 'a' && c <= 'z') { return c + ".txt"; } return "result.txt"; } } public static void main(String[] args) throws Exception { JobConf job = new JobConf(WordCount.class); job.setJobName("wordcount"); String[] otherArgs = new GenericOptionsParser(job, args) .getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: wordcount <in> <out>"); System.exit(2); } job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setOutputFormat(WordCountOutputFormat.class);// 设置输出格式 FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); JobClient.runJob(job); } }3.在main函数中设置输出格式,job.setOutputFormat(WordCountOutputFormat.class);实现WordCountOutputFormat类继承MultipleOutputFormat类,重写getBaseRecordWriter和generateFileNameForKeyValue函数,在generateFileNameForKeyValue函数中参数String name为默认的输出part-00000:
public static class WordCountOutputFormat extends MultipleOutputFormat<Text, IntWritable> { private TextOutputFormat<Text, IntWritable> output = null; @Override protected RecordWriter<Text, IntWritable> getBaseRecordWriter( FileSystem fs, JobConf job, String name, Progressable arg3) throws IOException { if (output == null) { output = new TextOutputFormat<Text, IntWritable>(); } return output.getRecordWriter(fs, job, name, arg3); } @Override protected String generateFileNameForKeyValue(Text key, IntWritable value, String name) { char c = key.toString().toLowerCase().charAt(0); if (c >= 'a' && c <= 'z') { return c + ".txt"; } return "result.txt"; } }4.程序结果为:
-rw-r--r-- 2 root supergroup7 2010-08-07 17:44 /hua/multipleoutput1/c.txt -rw-r--r-- 2 root supergroup6 2010-08-07 17:44 /hua/multipleoutput1/h.txt -rw-r--r-- 2 root supergroup7 2010-08-07 17:44 /hua/multipleoutput1/k.txt -rw-r--r-- 2 root supergroup6 2010-08-07 17:44 /hua/multipleoutput1/m.txt -rw-r--r-- 2 root supergroup 28 2010-08-07 17:44 /hua/multipleoutput1/result.txt -rw-r--r-- 2 root supergroup6 2010-08-07 17:44 /hua/multipleoutput1/t.txt如果generateFileNameForKeyValue返回return c + "_" + name + ".txt";结果为:
-rw-r--r-- 2 root supergroup7 2010-08-07 17:23 /hua/multipleoutput/c_part-00000.txt -rw-r--r-- 2 root supergroup6 2010-08-07 17:23 /hua/multipleoutput/h_part-00000.txt -rw-r--r-- 2 root supergroup7 2010-08-07 17:23 /hua/multipleoutput/k_part-00000.txt -rw-r--r-- 2 root supergroup6 2010-08-07 17:23 /hua/multipleoutput/m_part-00000.txt -rw-r--r-- 2 root supergroup 28 2010-08-07 17:23 /hua/multipleoutput/result.txt -rw-r--r-- 2 root supergroup6 2010-08-07 17:23 /hua/multipleoutput/t_part-00000.txt
三、总结
虽然API用的是0.19的,但是使用0.20的API一样可用,只是会提示方法已过时而已。