我正试图在一些维基百科转储(以压缩的bz2形式)上运行一个带有java Mapper/Reducer的hadoop流作业。我正在尝试使用WikiHadoop,这是Wikimedia最近发布的一个界面。
WikiReader_Mapper。JAVA
package courseproj.example;
// Mapper: emits (token, 1) for every article occurrence.
public class WikiReader_Mapper extends MapReduceBase implements Mapper<Text, Text, Text, IntWritable> {
// Reuse objects to save overhead of object creation.
private final static Text KEY = new Text();
private final static IntWritable VALUE = new IntWritable(1);
@Override
public void map(Text key, Text value, OutputCollector<Text, IntWritable> collector, Reporter reporter)
throws IOException {
KEY.set("article count");
collector.collect(KEY, VALUE);
}
}
维基阅读器。JAVA
package courseproj.example;
//Reducer: sums up all the counts.
public class WikiReader_Reducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
private final static IntWritable SUM = new IntWritable();
public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> collector,
Reporter reporter) throws IOException {
int sum = 0;
while (values.hasNext()) {
sum += values.next().get();
}
SUM.set(sum);
collector.collect(key, SUM);
}
}
我正在运行的命令是
hadoop jar lib/hadoop-streaming-2.0.0-cdh4.2.0.jar \
-libjars lib2/wikihadoop-0.2.jar \
-D mapreduce.input.fileinputformat.split.minsize=300000000 \
-D mapreduce.task.timeout=6000000 \
-D org.wikimedia.wikihadoop.previousRevision=false \
-input enwiki-latest-pages-articles10.xml-p000925001p001325000.bz2 \
-output out \
-inputformat org.wikimedia.wikihadoop.StreamWikiDumpInputFormat \
-mapper WikiReader_Mapper \
-reducer WikiReader_Reducer
我收到的错误信息是
Error: java.lang.RuntimeException: Error in configuring object
at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:106)
at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:72)
at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:130)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:424)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:340)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:157)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1408)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:152)
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:103)
Caused by: java.io.IOException: Cannot run program "WikiReader_Mapper": java.io.IOException: error=2, No such file or directory
at java.lang.ProcessBuilder.start(ProcessBuilder.java:460)
at org.apache.hadoop.streaming.PipeMapRed.configure(PipeMapRed.java:209)
我更熟悉新的hadoop API和旧的。由于我的mapper和reducer代码位于两个不同的文件中,我应该在哪里定义作业的JobConf配置参数,同时遵循hadoop streaming的命令结构(显式设置mapper和reducer类)。有没有一种方法可以将mapper和reducer代码打包成一个类(扩展配置和实现工具,这是新API中所做的)并将类名传递给hadoop流式命令行,而不是分别设置map和reduce类的名称?
流媒体使用旧的API(org.apache.hadoop.mapred
)——但您的mapper和reducer类扩展了新的API类(org.apache.hadoop.mapreduce
)。
尝试更改映射器以实现org。阿帕奇。hadoop。映射。Mapper
,以及用于实现组织的reducer。阿帕奇。hadoop。映射。减速机,例如:
package courseproj.example;
// Mapper: emits ("article", 1) for every article occurrence.
public class WikiReader_Mapper implements Mapper<Text, Text, Text, IntWritable> {
// Reuse objects to save overhead of object creation.
private final static Text KEY = new Text();
private final static IntWritable VALUE = new IntWritable(1);
@Override
public void map(Text key, Text value, OutputCollector<Text, IntWritable> collector, Reporter reporter)
throws IOException, InterruptedException {
KEY.set("article count");
collector.collect(KEY, VALUE);
}
}
我决定创建自己的WritableComparable类来学习Hadoop如何使用它。因此,我创建了一个带有两个实例变量(orderNumber cliente)的Order类,并实现了所需的方法。我还为getters/setters/hashcode/equals/toString使用了Eclipse生成器。 相比较而言,我决定只使用orderNumber变量。 我创建了一个简单的MapReduc
我在使用MapReduce作业时遇到了问题。我的函数确实运行并产生了所需的输出。但是,函数没有运行。该函数似乎从未被调用过。我使用Text作为键,Text作为值。但我不认为这会导致问题。 输入文件的格式如下: 我想将一行的第二个日期提取为<code>Text<code>并将其用作reduce的键。键的值将是同一行中最后两个<code>float<code>值的组合 即: 以便可以将值部分视为由空白
我正在学习一些MapReduce,但是我遇到了一些问题,情况是这样的:我有两个文件:“users”包含一个用户列表,其中包含一些用户数据(性别、年龄、国家等)...)文件看起来像这样: “歌曲”包含所有用户收听的歌曲的数据(用户ID,收听日期和时间,艺术家ID,艺术家姓名,歌曲ID,歌曲标题): 目标是在某些国家找到k首最受欢迎的歌曲。k和输入中提供的国家列表。 我决定为映射器使用Multiple
减速器 自定义可写类
我试图将批量加载map-reduce定制到HBase中,我遇到了reducer的问题。起初我认为我没有写好reducer,但是在reducer中抛出运行时异常并看到代码工作时,我意识到reducer根本没有运行。到目前为止,我看不出这个问题的一些常见答案有什么问题; 我的配置将mapoutput和output分开。 我的减速器和映射器具有覆盖功能。 我有Iterable,我的reducer输入是(
我只是使用3机器集群测试单词计数示例。我的代码与此示例相同,但以下代码除外: