我想创建我自己的地图减少工作。
map类的输出是:文本(键)、文本(值)
reduce类的输出是:文本,不可扭曲
我试图通过以下方式实现它:
import java.io.IOException;
import java.util.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;
public class artistandTrack {
public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
String line = value.toString();
String[] names=line.split(" ");
Text artist_name = new Text(names[2]);
Text track_name = new Text(names[3]);
output.collect(artist_name,track_name);
}
}
public static class Reduce extends MapReduceBase implements Reducer<Text, Text, Text, IntWritable> {
public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
int sum = 0;
while (values.hasNext()) {
sum += 1;
Text x1=values.next();
}
output.collect(key, new IntWritable(sum));
}
}
public static void main(String[] args) throws Exception {
JobConf conf = new JobConf(artistandTrack.class);
conf.setJobName("artisttrack");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapOutputKeyClass(Text.class);
conf.setMapOutputValueClass(Text.class);
conf.setMapperClass(Map.class);
conf.setCombinerClass(Reduce.class);
conf.setReducerClass(Reduce.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
JobClient.runJob(conf);
}
}
当我尝试运行它时,它会显示以下输出并终止
WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
14/10/17 06:09:15 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id
14/10/17 06:09:15 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
14/10/17 06:09:16 INFO jvm.JvmMetrics: Cannot initialize JVM Metrics with processName=JobTracker, sessionId= - already initialized
14/10/17 06:09:18 WARN mapreduce.JobSubmitter: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
14/10/17 06:09:19 INFO mapred.FileInputFormat: Total input paths to process : 1
14/10/17 06:09:19 INFO mapreduce.JobSubmitter: number of splits:1
14/10/17 06:09:19 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_local803195645_0001
14/10/17 06:09:20 WARN conf.Configuration: file:/app/hadoop/tmp/mapred/staging/userloki803195645/.staging/job_local803195645_0001/job.xml:an attempt to override final parameter: mapreduce.job.end-notification.max.retry.interval; Ignoring.
14/10/17 06:09:20 WARN conf.Configuration: file:/app/hadoop/tmp/mapred/staging/userloki803195645/.staging/job_local803195645_0001/job.xml:an attempt to override final parameter: mapreduce.job.end-notification.max.attempts; Ignoring.
14/10/17 06:09:20 WARN conf.Configuration: file:/app/hadoop/tmp/mapred/local/localRunner/userloki/job_local803195645_0001/job_local803195645_0001.xml:an attempt to override final parameter: mapreduce.job.end-notification.max.retry.interval; Ignoring.
14/10/17 06:09:20 WARN conf.Configuration: file:/app/hadoop/tmp/mapred/local/localRunner/userloki/job_local803195645_0001/job_local803195645_0001.xml:an attempt to override final parameter: mapreduce.job.end-notification.max.attempts; Ignoring.
14/10/17 06:09:20 INFO mapreduce.Job: The url to track the job: http://localhost:8080/
14/10/17 06:09:20 INFO mapreduce.Job: Running job: job_local803195645_0001
14/10/17 06:09:20 INFO mapred.LocalJobRunner: OutputCommitter set in config null
14/10/17 06:09:20 INFO mapred.LocalJobRunner: OutputCommitter is org.apache.hadoop.mapred.FileOutputCommitter
14/10/17 06:09:20 INFO mapred.LocalJobRunner: Waiting for map tasks
14/10/17 06:09:20 INFO mapred.LocalJobRunner: Starting task: attempt_local803195645_0001_m_000000_0
14/10/17 06:09:20 INFO mapred.Task: Using ResourceCalculatorProcessTree : [ ]
14/10/17 06:09:20 INFO mapred.MapTask: Processing split: hdfs://localhost:54310/project5/input/sad.txt:0+272
14/10/17 06:09:21 INFO mapred.MapTask: numReduceTasks: 1
14/10/17 06:09:21 INFO mapred.MapTask: Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer
14/10/17 06:09:21 INFO mapreduce.Job: Job job_local803195645_0001 running in uber mode : false
14/10/17 06:09:21 INFO mapreduce.Job: map 0% reduce 0%
14/10/17 06:09:22 INFO mapred.MapTask: (EQUATOR) 0 kvi 26214396(104857584)
14/10/17 06:09:22 INFO mapred.MapTask: mapreduce.task.io.sort.mb: 100
14/10/17 06:09:22 INFO mapred.MapTask: soft limit at 83886080
14/10/17 06:09:22 INFO mapred.MapTask: bufstart = 0; bufvoid = 104857600
14/10/17 06:09:22 INFO mapred.MapTask: kvstart = 26214396; length = 6553600
14/10/17 06:09:25 INFO mapred.LocalJobRunner:
14/10/17 06:09:25 INFO mapred.MapTask: Starting flush of map output
14/10/17 06:09:25 INFO mapred.MapTask: Spilling map output
14/10/17 06:09:25 INFO mapred.MapTask: bufstart = 0; bufend = 120; bufvoid = 104857600
14/10/17 06:09:25 INFO mapred.MapTask: kvstart = 26214396(104857584); kvend = 26214384(104857536); length = 13/6553600
14/10/17 06:09:25 INFO mapred.LocalJobRunner: map task executor complete.
14/10/17 06:09:25 WARN mapred.LocalJobRunner: job_local803195645_0001
***java.lang.Exception: java.io.IOException: wrong value class: class org.apache.hadoop.io.IntWritable is not class org.apache.hadoop.io.Text
at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462)
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:522)
Caused by: java.io.IOException: wrong value class: class org.apache.hadoop.io.IntWritable is not class org.apache.hadoop.io.Text
at org.apache.hadoop.mapred.IFile$Writer.append(IFile.java:199)
at org.apache.hadoop.mapred.Task$CombineOutputCollector.collect(Task.java:1307)
at artistandTrack$Reduce.reduce(artistandTrack.java:44)
at artistandTrack$Reduce.reduce(artistandTrack.java:37)
at org.apache.hadoop.mapred.Task$OldCombinerRunner.combine(Task.java:1572)
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.sortAndSpill(MapTask.java:1611)
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.flush(MapTask.java:1462)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:437)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:243)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)***
14/10/17 06:09:26 INFO mapreduce.Job: Job job_local803195645_0001 failed with state FAILED due to: NA
14/10/17 06:09:26 INFO mapreduce.Job: Counters: 11
Map-Reduce Framework
Map input records=4
Map output records=4
Map output bytes=120
Map output materialized bytes=0
Input split bytes=97
Combine input records=0
Combine output records=0
Spilled Records=0
Failed Shuffles=0
Merged Map outputs=0
File Input Format Counters
Bytes Read=272
Exception in thread "main" java.io.IOException: Job failed!
at org.apache.hadoop.mapred.JobClient.runJob(JobClient.java:836)
at artistandTrack.main(artistandTrack.java:68)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at org.apache.hadoop.util.RunJar.main(RunJar.java:212)
错误的课程从哪里来?
java.lang.Exception: java.io.IOException: wrong value class: class org.apache.hadoop.io.IntWritable is not class org.apache.hadoop.io.Text
和
工作失败的原因
Exception in thread "main" java.io.IOException: Job failed!
at org.apache.hadoop.mapred.JobClient.runJob(JobClient.java:836)
at artistandTrack.main(artistandTrack.java:68)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at org.apache.hadoop.util.RunJar.main(RunJar.java:212)
我不明白哪里出了问题。任何帮助
使用波纹管中的主:
public static void main(String[] args) throws Exception {
JobConf conf = new JobConf(artistandTrack.class);
conf.setJobName("artisttrack");
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
conf.setMapperClass(Map.class);
//conf.setCombinerClass(Reduce.class);
conf.setReducerClass(Reduce.class);
//conf.setOutputKeyClass(Text.class);
//conf.setOutputValueClass(IntWritable.class);
conf.setMapOutputKeyClass(Text.class);
conf.setMapOutputValueClass(Text.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
JobClient.runJob(conf);
}
我认为问题在于:
conf.setCombinerClass(Reduce.class);
您的地图
会生成一对文本
和文本
,然后您的组合器
会获取它并生成一对文本
和不可写入,
但您的化简器
无法接受 IntWritable
作为值,因此会引发异常。尝试使用合路器
设置删除线。
如果你读过Google的那篇大名鼎鼎的论文“MapReduce: Simplified Data Processing on Large Clusters”,你就能大概明白map/reduce的概念。 map 举例说明,比如我们有一个函数f(x)=x2,要把这个函数作用在一个数组[1, 2, 3, 4, 5, 6, 7, 8, 9]上,就可以用map实现如下: 由于map()方法定义在JavaSc
Python内建了map()和reduce()函数。 如果你读过Google的那篇大名鼎鼎的论文“MapReduce: Simplified Data Processing on Large Clusters”,你就能大概明白map/reduce的概念。 我们先看map。map()函数接收两个参数,一个是函数,一个是序列,map将传入的函数依次作用到序列的每个元素,并把结果作为新的list返回。
Python内建了map()和reduce()函数。 如果你读过Google的那篇大名鼎鼎的论文“MapReduce: Simplified Data Processing on Large Clusters”,你就能大概明白map/reduce的概念。 我们先看map。map()函数接收两个参数,一个是函数,一个是Iterable,map将传入的函数依次作用到序列的每个元素,并把结果作为新的It
我正在使用reverfit2、rxjava2和adapter-rxjava来实现我的http api调用。 如果我有很多api需要实现,并且每个单独的api实现都需要添加这两行: 我不想在每个api实现中添加它们。我想使用MyObservable作为api定义的结果类型。 我的想法如下所示: 我在https://github.com/square/reverfit/blob/master/reve
,和在Python 2中完美工作。这里有一个例子: 但是在Python 3中,我收到以下输出: 如果有人能向我解释这是为什么,我将不胜感激。 为进一步清晰起见,代码截图:
我有一个名为 Expect 的类,在你实例化它之后,你可以构建一个数据结构(为了简单起见,假设它是一棵树)。然后调用 run 方法,该方法遍历树,在每个节点上执行一些操作。这些操作需要一些时间才能完成,以便将来返回最终结果。在伪代码中,它类似于: 我想用它们通常的签名实现map和flatmap,但是它们作为参数接收的函数必须对将来返回的值进行操作。我看不出有任何方法可以实现这一点。