当前位置: 首页 > 知识库问答 >
问题:

Hadoop:reduce发生在映射完成前刷新映射输出和完成溢出之间

丌官子安
2023-03-14

输入:

文件01.txt

文件02.txt

这意味着这两个测试都将有两条路径要处理。我打印了一些日志信息,试图理解Map/Reduce的过程。

看看从开始刷新映射输出到完成溢出0之间的关系:wordcount程序在最后一个reduce之前还有两个reduce任务,而secondsort程序只做一次reduce就完成了。由于这些程序非常“小”,我认为io.sort.mb/io.sort.refactor不会影响这一点。

有人能解释吗?

wordcount log:

[hadoop@localhost ~]$ hadoop jar test.jar com.abc.example.test wordcount output
13/08/07 18:14:05 INFO mapred.FileInputFormat: Total input paths to process : 2
13/08/07 18:14:06 INFO mapred.JobClient: Running job: job_local_0001
13/08/07 18:14:06 INFO util.ProcessTree: setsid exited with exit code 0
...
13/08/07 18:14:06 INFO mapred.MapTask: numReduceTasks: 1
13/08/07 18:14:06 INFO mapred.MapTask: io.sort.mb = 100
13/08/07 18:14:06 INFO mapred.MapTask: data buffer = 79691776/99614720
13/08/07 18:14:06 INFO mapred.MapTask: record buffer = 262144/327680
Mapper: 0 | Hello Hadoop GoodBye Hadoop
13/08/07 18:14:06 INFO mapred.MapTask: **Starting flush of map output**
Reduce: GoodBye
Reduce: GoodBye | 1
Reduce: Hadoop
Reduce: Hadoop | 1
Reduce: Hadoop | 1
Reduce: Hello
Reduce: Hello | 1
13/08/07 18:14:06 INFO mapred.Task: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting
13/08/07 18:14:06 INFO mapred.LocalJobRunner: hdfs://localhost:8020/user/hadoop/wordcount/file02.txt:0+28
13/08/07 18:14:06 INFO mapred.Task: Task 'attempt_local_0001_m_000000_0' done.
13/08/07 18:14:06 INFO mapred.Task:  Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@4d16ffed
13/08/07 18:14:06 INFO mapred.MapTask: numReduceTasks: 1
13/08/07 18:14:06 INFO mapred.MapTask: io.sort.mb = 100
13/08/07 18:14:06 INFO mapred.MapTask: data buffer = 79691776/99614720
13/08/07 18:14:06 INFO mapred.MapTask: record buffer = 262144/327680
13/08/07 18:14:06 INFO mapred.MapTask: **Starting flush of map output**
Reduce: Bye
Reduce: Bye | 1
Reduce: Hello
Reduce: Hello | 1
Reduce: world
Reduce: world | 1
Reduce: world | 1
13/08/07 18:14:06 INFO mapred.MapTask: **Finished spill 0**
13/08/07 18:14:06 INFO mapred.Task: Task:attempt_local_0001_m_000001_0 is done. And is in the process of commiting
13/08/07 18:14:06 INFO mapred.LocalJobRunner: hdfs://localhost:8020/user/hadoop/wordcount/file01.txt:0+22
13/08/07 18:14:06 INFO mapred.Task: Task 'attempt_local_0001_m_000001_0' done.
13/08/07 18:14:06 INFO mapred.Task:  Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@1f3c0665
13/08/07 18:14:06 INFO mapred.LocalJobRunner: 
13/08/07 18:14:06 INFO mapred.Merger: Merging 2 sorted segments
13/08/07 18:14:06 INFO mapred.Merger: Down to the last merge-pass, with 2 segments left of total size: 77 bytes
13/08/07 18:14:06 INFO mapred.LocalJobRunner: 
Reduce: Bye
Reduce: Bye | 1
Reduce: GoodBye
Reduce: GoodBye | 1
Reduce: Hadoop
Reduce: Hadoop | 2
Reduce: Hello
Reduce: Hello | 1
Reduce: Hello | 1
Reduce: world
Reduce: world | 2
13/08/07 18:14:06 INFO mapred.Task: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting
...
13/08/07 18:14:07 INFO mapred.JobClient:     Reduce input groups=5
13/08/07 18:14:07 INFO mapred.JobClient:     Combine output records=6
13/08/07 18:14:07 INFO mapred.JobClient:     Physical memory (bytes) snapshot=0
13/08/07 18:14:07 INFO mapred.JobClient:     Reduce output records=5
13/08/07 18:14:07 INFO mapred.JobClient:     Virtual memory (bytes) snapshot=0
13/08/07 18:14:07 INFO mapred.JobClient:     Map output records=8

secondsort log info:

[hadoop@localhost ~]$ hadoop jar example.jar com.abc.example.example secondsort output
13/08/07 17:00:11 INFO input.FileInputFormat: Total input paths to process : 2
13/08/07 17:00:11 WARN snappy.LoadSnappy: Snappy native library not loaded
13/08/07 17:00:12 INFO mapred.JobClient: Running job: job_local_0001
13/08/07 17:00:12 INFO util.ProcessTree: setsid exited with exit code 0
13/08/07 17:00:12 INFO mapred.Task:  Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@57d94c7b
13/08/07 17:00:12 INFO mapred.MapTask: io.sort.mb = 100
13/08/07 17:00:12 INFO mapred.MapTask: data buffer = 79691776/99614720
13/08/07 17:00:12 INFO mapred.MapTask: record buffer = 262144/327680
Map: 0 | 5 49
Map: 5 | 9 57
Map: 10 | 19 46
Map: 16 | 3 21
Map: 21 | 9 48
Map: 26 | 7 57
... 
13/08/07 17:00:12 INFO mapred.MapTask: **Starting flush of map output**
13/08/07 17:00:12 INFO mapred.MapTask: **Finished spill 0**
13/08/07 17:00:12 INFO mapred.Task: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting
13/08/07 17:00:12 INFO mapred.LocalJobRunner: 
13/08/07 17:00:12 INFO mapred.Task: Task 'attempt_local_0001_m_000000_0' done.
13/08/07 17:00:12 INFO mapred.Task:  Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@f3a1ea1
13/08/07 17:00:12 INFO mapred.MapTask: io.sort.mb = 100
13/08/07 17:00:12 INFO mapred.MapTask: data buffer = 79691776/99614720
13/08/07 17:00:12 INFO mapred.MapTask: record buffer = 262144/327680
Map: 0 | 20 21
Map: 6 | 50 51
Map: 12 | 50 52
Map: 18 | 50 53
Map: 24 | 50 54
...
13/08/07 17:00:12 INFO mapred.MapTask: **Starting flush of map output**
13/08/07 17:00:12 INFO mapred.MapTask: **Finished spill 0**
13/08/07 17:00:12 INFO mapred.Task: Task:attempt_local_0001_m_000001_0 is done. And is in the process of commiting
13/08/07 17:00:12 INFO mapred.LocalJobRunner: 
13/08/07 17:00:12 INFO mapred.Task: Task 'attempt_local_0001_m_000001_0' done.
13/08/07 17:00:12 INFO mapred.Task:  Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@cee4e92
13/08/07 17:00:12 INFO mapred.LocalJobRunner: 
13/08/07 17:00:12 INFO mapred.Merger: Merging 2 sorted segments
13/08/07 17:00:12 INFO mapred.Merger: Down to the last merge-pass, with 2 segments left of total size: 1292 bytes
13/08/07 17:00:12 INFO mapred.LocalJobRunner: 
Reduce: 0:35 -----------------
Reduce: 0:35 | 35
Reduce: 0:54 -----------------
... 
13/08/07 17:00:12 INFO mapred.Task: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting
13/08/07 17:00:12 INFO mapred.LocalJobRunner: 
13/08/07 17:00:12 INFO mapred.Task: Task attempt_local_0001_r_000000_0 is allowed to commit now
13/08/07 17:00:12 INFO output.FileOutputCommitter: Saved output of task 'attempt_local_0001_r_000000_0' to output
13/08/07 17:00:12 INFO mapred.LocalJobRunner: reduce > reduce
13/08/07 17:00:12 INFO mapred.Task: Task 'attempt_local_0001_r_000000_0' done.
13/08/07 17:00:13 INFO mapred.JobClient:  map 100% reduce 100%
13/08/07 17:00:13 INFO mapred.JobClient: Job complete: job_local_0001
13/08/07 17:00:13 INFO mapred.JobClient: Counters: 22
13/08/07 17:00:13 INFO mapred.JobClient:   File Output Format Counters 
13/08/07 17:00:13 INFO mapred.JobClient:     Bytes Written=4787
...
13/08/07 17:00:13 INFO mapred.JobClient:     SPLIT_RAW_BYTES=236
13/08/07 17:00:13 INFO mapred.JobClient:     Reduce input records=92

字数:

public static void main(String[] args) throws Exception {
     JobConf conf = new JobConf(test.class);
     conf.setJobName("wordcount");

     conf.setOutputKeyClass(Text.class);
     conf.setOutputValueClass(IntWritable.class);

     conf.setMapperClass(Map.class);
     conf.setCombinerClass(Reduce.class);
     conf.setReducerClass(Reduce.class);

     conf.setInputFormat(TextInputFormat.class);
     conf.setOutputFormat(TextOutputFormat.class);

     FileInputFormat.setInputPaths(conf, new Path(args[0]));
     FileOutputFormat.setOutputPath(conf, new Path(args[1]));

     JobClient.runJob(conf);
   }

SecondSort:

public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException
    {
        Configuration conf = new Configuration();
        Job job = new Job(conf, "secondarysort");
        job.setJarByClass(example.class);
        job.setMapperClass(Map.class);
        job.setReducerClass(Reduce.class);
        job.setPartitionerClass(FirstPartitioner.class);
        job.setGroupingComparatorClass(GroupingComparator.class);

        job.setMapOutputKeyClass(IntPair.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }

共有1个答案

法和硕
2023-03-14

组合输出记录=6

这就说明了一切:reduce函数既用作组合器又用作reducer。所以你看到的是组合器输出的。当输出溢出时(有时)调用组合器。

我认为您应该添加您的代码,至少是main()中的部分,向我们展示您的作业是如何设置的。这样可以更容易地回答你的问题。

 类似资料:
  • 本文向大家介绍MyBatis输入映射和输出映射实例详解,包括了MyBatis输入映射和输出映射实例详解的使用技巧和注意事项,需要的朋友参考一下 什么是 MyBatis ? MyBatis 是支持定制化 SQL、存储过程以及高级映射的优秀的持久层框架。MyBatis 避免了几乎所有的 JDBC 代码和手动设置参数以及获取结果集。MyBatis 可以对配置和原生Map使用简单的 XML 或注解,将接口

  • 我正在从mapper生成两个输出文件。我使用Multipleoutput api生成两个输出。我不确定这样做是否正确。这是我的代码。。请仔细检查一下,给我你的建议。。当我运行代码时,我得到了一个错误:java。lang.NullPointerException。。 这是日志... 错误:java。com上的lang.NullPointerException。尼尔森。grfe。Export\u Co

  • 前缀映射,后缀映射和缓存映射 通过 names.NewPrefixMapper(names.SnakeMapper{}, "prefix") 可以创建一个在 SnakeMapper 的基础上在命名中添加统一的前缀,当然也可以把 SnakeMapper{} 换成 SameMapper 或者你自定义的 Mapper。 例如,如果希望所有的表名都在结构体自动命名的基础上加一个前缀而字段名不加前缀,则可以

  • 我正在学习如何使用Mybatis。老实说,我很喜欢这个框架。它很容易使用,我对它很满意,因为我可以使用它的sql命令:)我使用MyBatis 3.4.2和PostgreSQL数据库。 例如,我喜欢在插入之前使用注释执行查询是多么容易。如果我在接口方法之前添加一些注释,那么数据映射就像一个迷人的例子,比如:。 我不喜欢的(我希望你能把我引向正确的方向)有以下几点: 使用JDBC时,我需要做到以下几点

  • 生成路由映射缓存optimize:route 路由映射缓存用于开启路由延迟解析的情况下,支持路由反解的URL生成,如果你没有开启路由延迟解析或者没有使用URL路由反解生成则不需要生成。 生成路由映射缓存的命令: php think optimize:route 执行后,会在runtime目录下面生成route.php文件。

  • 生成类库映射文件optimize:autoload 可以使用下面的指令生成类库映射文件,提高系统自动加载的性能。 >php think optimize:autoload 指令执行成功后,会在rumtime目录下面生成classmap.php文件,生成的类库映射文件会扫描系统目录和应用目录的类库。