当前位置: 首页 > 知识库问答 >
问题:

如何定义自己的map和reduce类

子车安和
2023-03-14

我想创建我自己的地图减少工作。

map类的输出是:文本(键)、文本(值)

reduce类的输出是:文本,不可扭曲

我试图通过以下方式实现它:

import java.io.IOException;
import java.util.*;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;

public class artistandTrack {

   public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> {
     private final static IntWritable one = new IntWritable(1);
     private Text word = new Text();

     public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
       String line = value.toString();
           String[] names=line.split(" ");
           Text artist_name = new Text(names[2]);
           Text track_name = new Text(names[3]);               

 output.collect(artist_name,track_name);
     }
   }

   public static class Reduce extends MapReduceBase implements Reducer<Text, Text, Text, IntWritable> {
     public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
       int sum = 0;
       while (values.hasNext()) {
         sum += 1;
             Text x1=values.next();
                     }
       output.collect(key, new IntWritable(sum));
     }
   }

   public static void main(String[] args) throws Exception {
     JobConf conf = new JobConf(artistandTrack.class);
     conf.setJobName("artisttrack");

     conf.setOutputKeyClass(Text.class);
     conf.setOutputValueClass(IntWritable.class);

         conf.setMapOutputKeyClass(Text.class);
         conf.setMapOutputValueClass(Text.class);

     conf.setMapperClass(Map.class);
     conf.setCombinerClass(Reduce.class);
     conf.setReducerClass(Reduce.class);

     conf.setInputFormat(TextInputFormat.class);
     conf.setOutputFormat(TextOutputFormat.class);

     FileInputFormat.setInputPaths(conf, new Path(args[0]));
     FileOutputFormat.setOutputPath(conf, new Path(args[1]));

     JobClient.runJob(conf);
   }
}

当我尝试运行它时,它会显示以下输出并终止

    WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
14/10/17 06:09:15 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id
14/10/17 06:09:15 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
14/10/17 06:09:16 INFO jvm.JvmMetrics: Cannot initialize JVM Metrics with processName=JobTracker, sessionId= - already initialized
14/10/17 06:09:18 WARN mapreduce.JobSubmitter: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
14/10/17 06:09:19 INFO mapred.FileInputFormat: Total input paths to process : 1
14/10/17 06:09:19 INFO mapreduce.JobSubmitter: number of splits:1
14/10/17 06:09:19 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_local803195645_0001
14/10/17 06:09:20 WARN conf.Configuration: file:/app/hadoop/tmp/mapred/staging/userloki803195645/.staging/job_local803195645_0001/job.xml:an attempt to override final  parameter: mapreduce.job.end-notification.max.retry.interval;  Ignoring.
14/10/17 06:09:20 WARN conf.Configuration: file:/app/hadoop/tmp/mapred/staging/userloki803195645/.staging/job_local803195645_0001/job.xml:an attempt to override final parameter: mapreduce.job.end-notification.max.attempts;  Ignoring.
14/10/17 06:09:20 WARN conf.Configuration: file:/app/hadoop/tmp/mapred/local/localRunner/userloki/job_local803195645_0001/job_local803195645_0001.xml:an attempt to override final parameter: mapreduce.job.end-notification.max.retry.interval;  Ignoring.
14/10/17 06:09:20 WARN conf.Configuration: file:/app/hadoop/tmp/mapred/local/localRunner/userloki/job_local803195645_0001/job_local803195645_0001.xml:an attempt to override final parameter: mapreduce.job.end-notification.max.attempts;  Ignoring.
14/10/17 06:09:20 INFO mapreduce.Job: The url to track the job: http://localhost:8080/
14/10/17 06:09:20 INFO mapreduce.Job: Running job: job_local803195645_0001
14/10/17 06:09:20 INFO mapred.LocalJobRunner: OutputCommitter set in config null
14/10/17 06:09:20 INFO mapred.LocalJobRunner: OutputCommitter is org.apache.hadoop.mapred.FileOutputCommitter
14/10/17 06:09:20 INFO mapred.LocalJobRunner: Waiting for map tasks
14/10/17 06:09:20 INFO mapred.LocalJobRunner: Starting task: attempt_local803195645_0001_m_000000_0
14/10/17 06:09:20 INFO mapred.Task:  Using ResourceCalculatorProcessTree : [ ]
14/10/17 06:09:20 INFO mapred.MapTask: Processing split: hdfs://localhost:54310/project5/input/sad.txt:0+272
14/10/17 06:09:21 INFO mapred.MapTask: numReduceTasks: 1
14/10/17 06:09:21 INFO mapred.MapTask: Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer
14/10/17 06:09:21 INFO mapreduce.Job: Job job_local803195645_0001 running in uber mode : false
14/10/17 06:09:21 INFO mapreduce.Job:  map 0% reduce 0%
14/10/17 06:09:22 INFO mapred.MapTask: (EQUATOR) 0 kvi 26214396(104857584)
14/10/17 06:09:22 INFO mapred.MapTask: mapreduce.task.io.sort.mb: 100
14/10/17 06:09:22 INFO mapred.MapTask: soft limit at 83886080
14/10/17 06:09:22 INFO mapred.MapTask: bufstart = 0; bufvoid = 104857600  
14/10/17 06:09:22 INFO mapred.MapTask: kvstart = 26214396; length = 6553600 
14/10/17 06:09:25 INFO mapred.LocalJobRunner: 
14/10/17 06:09:25 INFO mapred.MapTask: Starting flush of map output
14/10/17 06:09:25 INFO mapred.MapTask: Spilling map output
14/10/17 06:09:25 INFO mapred.MapTask: bufstart = 0; bufend = 120; bufvoid = 104857600
14/10/17 06:09:25 INFO mapred.MapTask: kvstart = 26214396(104857584); kvend = 26214384(104857536); length = 13/6553600
14/10/17 06:09:25 INFO mapred.LocalJobRunner: map task executor complete.
14/10/17 06:09:25 WARN mapred.LocalJobRunner: job_local803195645_0001
***java.lang.Exception: java.io.IOException: wrong value class: class         org.apache.hadoop.io.IntWritable is not class org.apache.hadoop.io.Text
at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462)
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:522)
Caused by: java.io.IOException: wrong value class: class  org.apache.hadoop.io.IntWritable is not class org.apache.hadoop.io.Text
at org.apache.hadoop.mapred.IFile$Writer.append(IFile.java:199)
at org.apache.hadoop.mapred.Task$CombineOutputCollector.collect(Task.java:1307)
at artistandTrack$Reduce.reduce(artistandTrack.java:44)
at artistandTrack$Reduce.reduce(artistandTrack.java:37)
at org.apache.hadoop.mapred.Task$OldCombinerRunner.combine(Task.java:1572)
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.sortAndSpill(MapTask.java:1611)
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.flush(MapTask.java:1462)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:437)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:243)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)***


14/10/17 06:09:26 INFO mapreduce.Job: Job job_local803195645_0001 failed with state FAILED due to: NA
14/10/17 06:09:26 INFO mapreduce.Job: Counters: 11
Map-Reduce Framework
    Map input records=4
    Map output records=4
    Map output bytes=120
    Map output materialized bytes=0
    Input split bytes=97
    Combine input records=0
    Combine output records=0
    Spilled Records=0
    Failed Shuffles=0
    Merged Map outputs=0
File Input Format Counters 
    Bytes Read=272
Exception in thread "main" java.io.IOException: Job failed!
at org.apache.hadoop.mapred.JobClient.runJob(JobClient.java:836)
at artistandTrack.main(artistandTrack.java:68)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at    sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at org.apache.hadoop.util.RunJar.main(RunJar.java:212)

错误的课程从哪里来?

java.lang.Exception: java.io.IOException: wrong value class: class         org.apache.hadoop.io.IntWritable is not class org.apache.hadoop.io.Text

工作失败的原因

Exception in thread "main" java.io.IOException: Job failed!
at org.apache.hadoop.mapred.JobClient.runJob(JobClient.java:836)
at artistandTrack.main(artistandTrack.java:68)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at      sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at org.apache.hadoop.util.RunJar.main(RunJar.java:212)

我不明白哪里出了问题。任何帮助

共有2个答案

葛烨
2023-03-14

使用波纹管中的主:

 public static void main(String[] args) throws Exception {
 JobConf conf = new JobConf(artistandTrack.class);
 conf.setJobName("artisttrack");

 FileInputFormat.setInputPaths(conf, new Path(args[0]));
 FileOutputFormat.setOutputPath(conf, new Path(args[1]));

 conf.setMapperClass(Map.class);
 //conf.setCombinerClass(Reduce.class);
 conf.setReducerClass(Reduce.class);

 //conf.setOutputKeyClass(Text.class);
 //conf.setOutputValueClass(IntWritable.class);

 conf.setMapOutputKeyClass(Text.class);
 conf.setMapOutputValueClass(Text.class);

 conf.setInputFormat(TextInputFormat.class);
 conf.setOutputFormat(TextOutputFormat.class);

 JobClient.runJob(conf);
}
左丘阳晖
2023-03-14

我认为问题在于:

conf.setCombinerClass(Reduce.class);

您的地图会生成一对文本文本,然后您的组合器会获取它并生成一对文本不可写入,但您的化简器无法接受 IntWritable 作为值,因此会引发异常。尝试使用合路器设置删除线。

 类似资料:
  • 如果你读过Google的那篇大名鼎鼎的论文“MapReduce: Simplified Data Processing on Large Clusters”,你就能大概明白map/reduce的概念。 map 举例说明,比如我们有一个函数f(x)=x2,要把这个函数作用在一个数组[1, 2, 3, 4, 5, 6, 7, 8, 9]上,就可以用map实现如下: 由于map()方法定义在JavaSc

  • Python内建了map()和reduce()函数。 如果你读过Google的那篇大名鼎鼎的论文“MapReduce: Simplified Data Processing on Large Clusters”,你就能大概明白map/reduce的概念。 我们先看map。map()函数接收两个参数,一个是函数,一个是序列,map将传入的函数依次作用到序列的每个元素,并把结果作为新的list返回。

  • Python内建了map()和reduce()函数。 如果你读过Google的那篇大名鼎鼎的论文“MapReduce: Simplified Data Processing on Large Clusters”,你就能大概明白map/reduce的概念。 我们先看map。map()函数接收两个参数,一个是函数,一个是Iterable,map将传入的函数依次作用到序列的每个元素,并把结果作为新的It

  • 我正在使用reverfit2、rxjava2和adapter-rxjava来实现我的http api调用。 如果我有很多api需要实现,并且每个单独的api实现都需要添加这两行: 我不想在每个api实现中添加它们。我想使用MyObservable作为api定义的结果类型。 我的想法如下所示: 我在https://github.com/square/reverfit/blob/master/reve

  • ,和在Python 2中完美工作。这里有一个例子: 但是在Python 3中,我收到以下输出: 如果有人能向我解释这是为什么,我将不胜感激。 为进一步清晰起见,代码截图:

  • 好的,在laravel 4中,如果我想添加自己的自定义类,例如:库\myFunction.php然后我执行以下步骤: 添加myFunctions.php到app/库/myFunctiosn.php 在app/start/global.php,在ClassLoader::addDirectory(数组(,我添加app_path()。 为了在我的刀片视图中调用它,我添加了以下代码 它是有效的。 但是如