我正在使用hadoop mapreduce,我想计算两个文件。我的第一个Map / Reduce迭代是给我一个文件,其文件具有ID号,如下所示:
A 30
D 20
我的目标是使用文件中的该ID与另一个文件相关联,并使用三重奏输出另一个:ID,Number,Name,如下所示:
A ABC 30
D EFGH 20
但是我不确定使用Map Reduce是否是最好的方法。例如,使用文件读取器读取第二个输入文件并通过ID获得名称会更好吗?还是可以使用Map
Reduce做到这一点?
如果是这样,我正在尝试找出方法。我尝试了MultipleInput解决方案:
MultipleInputs.addInputPath(job2, new Path(args[1]+"-tmp"),
TextInputFormat.class, FlightsByCarrierMapper2.class);
MultipleInputs.addInputPath(job2, new Path("inputplanes"),
TextInputFormat.class, FlightsModeMapper.class);
但是我想不出任何将两者结合起来并获得我想要的输出的解决方案。我现在所拥有的方式就是给我这样的列表:
A ABC
A 30
B ABCD
C ABCDEF
D EFGH
D 20
在我的最后一次减少后,我得到这个:
N125DL 767-332
N125DL 7 ,
N126AT 737-76N
N126AT 19 ,
N126DL 767-332
N126DL 1 ,
N127DL 767-332
N127DL 7 ,
N128DL 767-332
N128DL 3
我想要这个:N127DL 7 767-332。而且,我不要那些不结合的东西。
这是我的reduce班:
公共类FlightsByCarrierReducer2扩展了Reducer {
String merge = "";
protected void reduce(Text token, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
int i = 0;
for(Text value:values)
{
if(i == 0){
merge = value.toString()+",";
}
else{
merge += value.toString();
}
i++;
}
context.write(token, new Text(merge));
}
}
更新:
http://stat-computing.org/dataexpo/2009/the-data.html这是我正在使用的示例。
我正在尝试:TailNum和Canceled(为1或0)获取与TailNum对应的模型名称。我的模型文件包含TailNumb,Model和其他内容。我当前的输出是:
N193JB ERJ 190-100 IGW
N194DN 767-332
N19503 EMB-135ER
N19554 EMB-145LR
N195DN 767-332
N195DN 2
首先是钥匙,其次是模型,已取消航班的钥匙,模型下方的钥匙
我想要一个三重键,取消的型号,因为我想要每个型号的取消数量
您可以使用ID作为两个映射器的键来加入它们。您可以像这样编写地图任务
public void map(LongWritable k, Text value, Context context) throws IOException, InterruptedException
{
//Get the line
//split the line to get ID seperate
//word1 = A
//word2 = 30
//Likewise for A ABC
//word1 = A
//word2 = ABC
context.write(word1, word2);
}
我认为您可以重新使用相同的Map任务。然后编写一个普通的Reducer作业,在该作业中Hadoop框架将关键数据分组。因此,您将能够获得ID作为密钥。您可以缓存其中一个值,然后进行连接。
String merge = "";
public void reduce(Text key, Iterable<Text> values, Context context)
{
int i =0;
for(Text value:values)
{
if(i == 0){
merge = value.toString()+",";
}
else{
merge += value.toString();
}
i++;
}
valEmit.set(merge);
context.write(key, valEmit);
}
最后,您可以编写Driver类
public int run(String[] args) throws Exception {
Configuration c=new Configuration();
String[] files=new GenericOptionsParser(c,args).getRemainingArgs();
Path p1=new Path(files[0]);
Path p2=new Path(files[1]);
Path p3=new Path(files[2]);
FileSystem fs = FileSystem.get(c);
if(fs.exists(p3)){
fs.delete(p3, true);
}
Job job = new Job(c,"Multiple Job");
job.setJarByClass(MultipleFiles.class);
MultipleInputs.addInputPath(job, p1, TextInputFormat.class, MultipleMap1.class);
MultipleInputs.addInputPath(job,p2, TextInputFormat.class, MultipleMap2.class);
job.setReducerClass(MultipleReducer.class);
.
.
}
您可以在此处找到示例
希望这可以帮助。
更新
输入1
A 30
D 20
输入2
A ABC
D EFGH
输出量
A ABC 30
D EFGH 20
Mapper.java
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/**
* @author sreeveni
*
*/
public class Mapper1 extends Mapper<LongWritable, Text, Text, Text> {
Text keyEmit = new Text();
Text valEmit = new Text();
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String parts[] = line.split(" ");
keyEmit.set(parts[0]);
valEmit.set(parts[1]);
context.write(keyEmit, valEmit);
}
}
Reducer.java
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/**
* @author sreeveni
*
*/
public class ReducerJoin extends Reducer<Text, Text, Text, Text> {
Text valEmit = new Text();
String merge = "";
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
String character = "";
String number = "";
for (Text value : values) {
// ordering output
String val = value.toString();
char myChar = val.charAt(0);
if (Character.isDigit(myChar)) {
number = val;
} else {
character = val;
}
}
merge = character + " " + number;
valEmit.set(merge);
context.write(key, valEmit);
}
}
驾驶舱
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
* @author sreeveni
*
*/
public class Driver extends Configured implements Tool {
public static void main(String[] args) throws Exception {
// TODO Auto-generated method stub
// checking the arguments count
if (args.length != 3) {
System.err
.println("Usage : <inputlocation> <inputlocation> <outputlocation> ");
System.exit(0);
}
int res = ToolRunner.run(new Configuration(), new Driver(), args);
System.exit(res);
}
@Override
public int run(String[] args) throws Exception {
// TODO Auto-generated method stub
String source1 = args[0];
String source2 = args[1];
String dest = args[2];
Configuration conf = new Configuration();
conf.set("mapred.textoutputformat.separator", " "); // changing default
// delimiter to user
// input delimiter
FileSystem fs = FileSystem.get(conf);
Job job = new Job(conf, "Multiple Jobs");
job.setJarByClass(Driver.class);
Path p1 = new Path(source1);
Path p2 = new Path(source2);
Path out = new Path(dest);
MultipleInputs.addInputPath(job, p1, TextInputFormat.class,
Mapper1.class);
MultipleInputs.addInputPath(job, p2, TextInputFormat.class,
Mapper1.class);
job.setReducerClass(ReducerJoin.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setOutputFormatClass(TextOutputFormat.class);
/*
* delete if exist
*/
if (fs.exists(out))
fs.delete(out, true);
TextOutputFormat.setOutputPath(job, out);
boolean success = job.waitForCompletion(true);
return success ? 0 : 1;
}
}
嗨,我有一个map-reduce程序,它在每个递归步骤中获取reducer的输出。但我还需要在每次递归中输出另一个结果。 输入1--- 输出1--- 输出2--- 输出3--- 作为我需要的最终输出:输出11,输出22,输出33,输出44和输出4 像这样,每个步骤都有两个输出文件,其中一个用于下一次迭代,另一个用于输出。 我正在使用序列文件作为文本输入格式。 任何帮助,谢谢。
问题内容: 我是Hadoop的新手,正在尝试弄清楚它是如何工作的。至于练习,我应该实现类似于WordCount- Example的东西。任务是读入多个文件,执行WordCount并为每个输入文件编写一个输出文件。Hadoop使用组合器,将map- part的输出改编为reducer的输入,然后写入一个输出文件(我猜每个正在运行的实例)。我想知道是否可以为每个输入文件写入一个输出文件(因此保留inp
问题内容: 我正在尝试将我的reducer的结果输出到多个文件。数据结果全部包含在一个文件中,其余结果根据它们所尊重的文件中的类别进行划分。我知道使用0.18可以使用MultipleOutputs做到这一点,并且它尚未被删除。但是,我正在尝试使我的应用程序兼容0.20+。现有的多输出功能仍然需要JobConf(我的应用程序使用Job和Configuration)。如何根据密钥生成多个输出? 问题答
问题内容: 我的工作按日期处理数据,需要将输出写入特定的文件夹结构。当前的期望是生成如下结构: 等等 在任何时候,我最多只能获得12个月的数据,因此,我正在使用类在驱动程序中使用以下函数创建12个输出: 在reducer中,我添加了一个清理功能,以将生成的输出移动到适当的目录。 问题:在将输出从_temporary目录移动到输出目录之前,reducer的清除功能正在执行。因此,由于所有数据仍位于_
在Hadoop MapReduce中是否有可能使用多个不同的映射器有多个输入?每个映射器类都在一组不同的输入上工作,但它们都会发出由同一个减速器使用的键值对。请注意,我不是在这里谈论链接映射器,我是在谈论并行运行不同的映射器,而不是顺序运行。
本文向大家介绍Hadoop MapReduce多输出详细介绍,包括了Hadoop MapReduce多输出详细介绍的使用技巧和注意事项,需要的朋友参考一下 Hadoop MapReduce多输出 FileOutputFormat及其子类产生的文件放在输出目录下。每个reducer一个文件并且文件由分区号命名:part-r-00000,part-r-00001,等等。有时可能要对输出的文件名进行控制