当前位置: 首页 > 知识库问答 >
问题:

Hadoop MapReduce reducer未启动

杜昆琦
2023-03-14
public class HoursJob {
    public static void main(String[] args) throws Exception {
        if (args.length != 2) {
          System.err.println("Usage: HoursJob <input path> <output path>");
          System.exit(-1);
        }

        Job job = Job.getInstance();
        job.setJarByClass(HoursJob.class);
        job.setJobName("Hours job");

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        job.setMapperClass(HoursMapper.class);
        job.setReducerClass(HoursReducer.class);

        job.setMapOutputKeyClass(IntWritable.class);
        job.setMapOutputValueClass(CellWithTotalAmount.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);

        int ret = job.waitForCompletion(true) ? 0 : 1;
        System.exit(ret);
    }
}
public class HoursMapper 
        extends Mapper<LongWritable, Text, IntWritable, CellWithTotalAmount> {
    static double BEGIN_LONG = -74.913585;
    static double BEGIN_LAT = 41.474937;
    static double GRID_LENGTH = 0.011972;
    static double GRID_HEIGHT = 0.008983112;

    @Override
    public void map(LongWritable key, Text value, Mapper.Context context)
            throws IOException, InterruptedException {

        System.out.println("Hello from mapper.");
        String recordString = value.toString();
        try {
            DEBSFullRecord record = new DEBSFullRecord(recordString);
            Date pickupDate = record.getPickup();
            Calendar calendar = GregorianCalendar.getInstance();
            calendar.setTime(pickupDate);
            int pickupHour = calendar.get(Calendar.HOUR_OF_DAY);
            int cellX = (int)
                ((record.getPickupLongitude() - BEGIN_LONG) / GRID_LENGTH) + 1;
            int cellY = (int)
                ((BEGIN_LAT - record.getPickupLatitude()) / GRID_HEIGHT) + 1;

            CellWithTotalAmount hourInfo = 
                new CellWithTotalAmount(cellX, cellY, record.getTotal());
            context.write(new IntWritable(pickupHour), hourInfo);
        } catch (Exception ex) {
            System.out.println(
                "Cannot parse: " + recordString + "due to the " + ex);
        }
    }
}

减速器

public class HoursReducer 
        extends Reducer<IntWritable, CellWithTotalAmount, Text, NullWritable> {
    @Override
    public void reduce(IntWritable key, Iterable<CellWithTotalAmount> values, 
            Context context) throws IOException, InterruptedException {
        System.out.println("Hello from reducer.");
        int[][] cellRideCounters = getCellRideCounters(values);
        CellWithRideCount cellWithMostRides = 
            getCellWithMostRides(cellRideCounters);

        int[][] cellTotals = getCellTotals(values);
        CellWithTotalAmount cellWithGreatestTotal = 
            getCellWithGreatestTotal(cellTotals);

        String output = key + " "
            + cellWithMostRides.toString() + " "
            + cellWithGreatestTotal.toString();

        context.write(new Text(output), NullWritable.get());
    }

    //omitted for brevity
}

自定义可写类

public class CellWithTotalAmount implements Writable {
    public int cellX;
    public int cellY;
    public double totalAmount;

    public CellWithTotalAmount(int cellX, int cellY, double totalAmount) {
        this.cellX = cellX;
        this.cellY = cellY;
        this.totalAmount = totalAmount;
    }

    @Override
    public void write(DataOutput out) throws IOException {
        System.out.println("Writing CellWithTotalAmount");
        out.writeInt(cellX);
        out.writeInt(cellY);
        out.writeDouble(totalAmount);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        System.out.println("Reading CellWithTotalAmount");
        cellX = in.readInt();
        cellY = in.readInt();
        totalAmount = in.readDouble();
    }

    @Override
    public String toString() {
        return cellX + " " + cellY + " " + totalAmount;
    }
}

共有1个答案

呼延鹏云
2023-03-14

我认为在减少功能方面有很多例外,所以框架不能很好地完成工作

    public class HoursReducer 
            extends Reducer<IntWritable, CellWithTotalAmount, Text, NullWritable> {
        @Override
        public void reduce(IntWritable key, Iterable<CellWithTotalAmount> values, 
                Context context) throws IOException, InterruptedException {
            System.out.println("Hello from reducer.");
    try{
            int[][] cellRideCounters = getCellRideCounters(values);
       if(cellRideCounter[0].length>0){ // control it before executing it. more explanation is above
            CellWithRideCount cellWithMostRides = 
                getCellWithMostRides(cellRideCounters);



            int[][] cellTotals = getCellTotals(values);
            CellWithTotalAmount cellWithGreatestTotal = 
                getCellWithGreatestTotal(cellTotals);

            String output = key + " "
                + cellWithMostRides.toString() + " "
                + cellWithGreatestTotal.toString();

            context.write(new Text(output), NullWritable.get());


     }
   }catch(Exception e)

    e.printstack();
     return;
   {

  }


}
  • 在reduce函数中添加try-catch以获取异常
  • 。从catch中的函数返回

.还要在调用getCellWithMostRiders(..)之前添加if语句我想问题就在这里。按你想的填写if语句我做了一个猜测,然后按我的猜测填写它,如果它不适合你,你想怎么改就怎么改

 类似资料:
  • 在整个评估过程中使用JxBrowser,效果非常好。现在,我一启动它就突然不起作用了。我在Mac OS X 10.11上运行,但我也在Windows上测试过,仍然不起作用。我的构建路径中有两个操作系统所需的库。这就是我得到的(在Mac OS X上运行):

  • 我很难在服务器重新引导时重新启动pm2(它本身和两个节点/express文件,&)。 下面是我尝试的过程: 我在所有可能的组合中运行了上面的命令,但没有任何工作。我试着以根用户的身份运行,但也不起作用。 我的文件包含信息,所以我不知道还可以在哪里查找。 我尝试的其他参考资料... http://pm2.keymetrics.io/docs/usage/startup/ https://www.di

  • 经过一番努力,我最终成功地在伪分布式节点中使用了hadoop,和工作正常(和) 昨天,我尝试用以下方法重新启动、等: 给出以下输出: Namenode似乎不愿意再启动了...几秒钟后Jobtracker就死了。 mapred-site.xml:

  • 我最近安装了Netbeans 7.2测试版,效果很好。然后,我在beta版的基础上安装了Netbeans 7.2最终版本,在安装过程中,Netbeans 7.1和Netbeans 7.12都在那里。现在,当我启动Netbeans时,它并没有超出“加载模块”阶段。只需在启动几秒钟后,Netbeans屏幕就会消失。然后我做了以下事情。 卸载所有Netbean版本并重新安装Netbean 7.2 已删除

  • krish@krish-virtualbox:~$start-dfs.sh 14/10/20 13:16:16警告util.nativeCodeLoader:无法为您的平台加载本机Hadoop库...在适用的情况下使用内置Java类 正在[localhost]上启动名称代码 我只想知道在里面所有的东西是不是都很完美。我在清单中没有看到Datanode。

  • 我是storm的新手,当我提交拓扑主管日志时 配置是 zookeeper的版本为 结果喷口不能发出消息和螺栓也有什么我可以分享它来解决这个问题吗?谁能帮忙?!