当前位置: 首页 > 知识库问答 >
问题:

第一个Reduce任务的输出值作为mapReduce中的对

董飞
2023-03-14

我希望我的第一个reduce任务生成类似smth的(当然,

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;

public class AvgGrading {

    public static void main(String[] args) throws IllegalArgumentException, IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "avg grading");
        job.setJarByClass(AvgGrading.class);
        job.setMapperClass(MapForAverage.class);
        job.setCombinerClass(ReduceForAverage.class);
        job.setNumReduceTasks(2);
        job.setReducerClass(ReduceForFinal.class);
        job.setMapOutputKeyClass(LongWritable.class);
        job.setMapOutputValueClass(Object.class);
        job.setOutputKeyClass(LongWritable.class);
        job.setOutputValueClass(FloatWritable.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }

    public static class MapForAverage extends Mapper<LongWritable, Text, LongWritable, Object> {

        public void map(LongWritable key, Text value, Context con) throws IOException, InterruptedException {
            String [] word = value.toString().split(", ");
            float grade = Integer.parseInt(word[1]);
            int course = Integer.parseInt(word[0]);
            Map <Float,Long> m = new HashMap<Float,Long>();
            m.put(grade, (long) 1);
            con.write(new LongWritable(course), m);
        }
    }

    public static class ReduceForAverage extends Reducer<LongWritable, Object, LongWritable, Object> {
        private FloatWritable result = new FloatWritable();
        public void reduce(LongWritable course, Map<Float,Long> values, Context con)
                throws IOException, InterruptedException {

            Map <Float,Long> m = new HashMap<Float,Long>();

            float sum = 0;
            long count =0;
            for (Map.Entry<Float, Long> entry : values.entrySet()) {
                sum += entry.getKey();
                count++;
            }
            m.put(sum, count);

            con.write(course, m);
        }
    }

    public static class ReduceForFinal extends Reducer<LongWritable, Object, LongWritable, FloatWritable> {
        private FloatWritable result = new FloatWritable();
        public void reduce(LongWritable course, Map<Long,Float>values, Context con)
                throws IOException, InterruptedException {

            long key = 0;
            float value=0;

            for ( Map.Entry<Long, Float> entry : values.entrySet()) {
                 key = entry.getKey();
                 value = entry.getValue();
            }
            float res= key/value;

            con.write(course, new FloatWritable(res));
        }
    }
}

请注意,我无法循环访问可迭代

错误代码是:

Unable to initialize MapOutputCollector org.apache.hadoop.mapred.MapTask$MapOutputBuffer

java.lang.NullPointerException

第二减速机故障


共有1个答案

刁茂才
2023-03-14

Map没有实现可写,你说你的组合器和减缩器输入值的类是Object,而你正在发出Map。你只需要为此创建一个自定义类。请记住,如果要在hadoop中发出某些东西,自定义类必须实现可写。你可以这样做:

public class Counter implements Writable {

       private float sum;
       private long count;

       public Counter(float sum, long count){
              this.sum = sum;
              this.count = count;
       }

       /* Methods to get and set private variables of the class */

       public float getSum() {
              return sum;
       }

       public void setSum(float sumValue) {
              sum=sumValue;
       }

       public long getCount() {
              return count;
       }

       public void setCount(long countValue) {
              count=countValue;
       }

       /* Methods to serialize and deserialize the contents of the
          instances of this class */

       @Override /* Serialize the fields of this object to out */ 
       public void write(DataOutput out) throws IOException{
              out.writeFloat(sum);
              out.writeLong(count);
       }

      @Override /* Deserialize the fields of this object from in */
      public void readFields(DataInputin) throws IOException{
                  sum=in.readFloat();
                  count=in.readLong();
       }
       }

因此,在您的第一个映射器中,您可以通过以下方式创建并发出计数器:

       Counter counter = new Counter(grade, 1);
       con.write(course, counter);

此时,在第一个reducer中,您将有一个表示课程的键和一个可迭代值,该值对于所有计数器都是可迭代的,使用该可迭代值可以计算平均值。记住更新mapper和reducers类参数,使其与新参数一致。

 类似资料:
  • 问题内容: 我是Hadoop的新手,正在尝试弄清楚它是如何工作的。至于练习,我应该实现类似于WordCount- Example的东西。任务是读入多个文件,执行WordCount并为每个输入文件编写一个输出文件。Hadoop使用组合器,将map- part的输出改编为reducer的输入,然后写入一个输出文件(我猜每个正在运行的实例)。我想知道是否可以为每个输入文件写入一个输出文件(因此保留inp

  • 我有几个<code>(标题,文本) 现在我想在这些有序对的文本字段上实现单词计数。 所以我的最终输出应该是这样的: 总而言之,我想在第一个mapduce的输出记录上单独实现wordcount。有人能给我一个好方法吗?或者我如何链接第二个map duce作业来创建上述输出或更好地格式化它? 下面是代码,从github借用并做了一些更改 我们在线找到的字数代码对所有文件进行字数统计,并给出输出。我想分

  • 问题内容: 在Hadoop MapReduce中,对于中间输出(由map()生成),我希望中间输出的值成为以下对象。 我该怎么做。我应该创建自己的可写类吗? 我是MapReduce的新手。 谢谢。 问题答案: 您可以编写自定义类型,将其作为映射器值发出。但是无论您要发出什么值,都必须实现可写接口。您可以执行以下操作: 另外,您可以使用Avro序列化框架。

  • 又一个火花问题给你! 因此,我使用mllibs原生SVMWithSGD训练一个SVM模型。训练rdd被划分为114个分区,每个分区大约有2700个实例。 每个迭代工作分为两个阶段。第一阶段,与我的节点线性扩展。然而,第二阶段只分为10个左右的任务,比可用的V核少得多,所以这个阶段不是线性扩展的。 我想把这个阶段分成更多的任务,让更多的执行者可以并行地在上面工作。有办法着手做那件事吗? 此外,还有一

  • 我的作业当前使用(如本文所述)以以下结构生成输出: 输出的基本路径是但是,另一个作业(job2)生成类似的数据,我希望这个作业(job1)的输出与另一个作业(job2)的输出合并。 我试图将生成的输出合并到一个公共输出目录中,该目录包含上述结构和两个作业的组合输出。是否有一种方法可以在作业本身中手动运行shell命令? 欣赏任何洞察力。

  • 减速器正在计算所有相同的值: 然而,当我在hadoop上运行一个更大的数据集时,似乎丢失了一半的结果。当我在本地机器上使用cat input mapper.py sort reducer.py>out-local测试它时,如果输入合理地很小,它工作得很好,但是在更大的数据集上(例如1M个条目),本地输出文件的条目几乎是在Hadoop上运行mapreduce作业的两倍。代码有错误吗?还是我漏掉了什么