当前位置: 首页 > 教程 > Hadoop >

7.0 MapReduce 编程

精华
小牛编辑
130浏览
2023-03-14

在学习了 MapReduce 的使用之后,我们已经可以处理 Word Count 这类统计和检索任务,但是客观上 MapReduce 可以做的事情还有很多。

MapReduce 主要是依靠开发者通过编程来实现功能的,开发者可以通过实现 Map 和 Reduce 相关的方法来进行数据处理。

为了简单的展示这一过程,我们将手工编写一个 Word Count 程序。

注意:MapReduce 依赖 Hadoop 的库,但由于本教程使用的 Hadoop 运行环境是 Docker 容器,难以部署开发环境,所以真实的开发工作(包含调试)将需要一个运行 Hadoop 的计算机。在这里我们仅学习已完成程序的部署。

MyWordCount.java 文件代码

/**
 * 引用声明
 *
 */

package cn.xnip.hadoop ;
import java.io.IOException ;
import java.util.* ;
import org.apache.hadoop.fs.Path ;
import org.apache.hadoop.io.* ;
import org.apache.hadoop.mapred.* ;
/**
 * 与 `Map` 相关的方法
 */

class Map extends MapReduceBase implements Mapper <LongWritable, Text, Text, IntWritable > {
    private final static IntWritable one = new IntWritable ( 1 ) ;
    private Text word = new Text ( ) ;
    public void map (LongWritable key,
               Text value,
               OutputCollector <Text, IntWritable > output,
               Reporter reporter )
          throws IOException {
      String line = value. toString ( ) ;
      StringTokenizer tokenizer = new StringTokenizer (line ) ;
      while (tokenizer. hasMoreTokens ( ) ) {
         word. set (tokenizer. nextToken ( ) ) ;
         output. collect (word, one ) ;
      }
    }
}
/**
 * 与 `Reduce` 相关的方法
 */

class Reduce extends MapReduceBase implements Reducer <Text, IntWritable, Text, IntWritable > {
    public void reduce (Text key,
                  Iterator <IntWritable > values,
                  OutputCollector <Text, IntWritable > output,
                  Reporter reporter )
          throws IOException {
      int sum = 0 ;
      while (values. hasNext ( ) ) {
         sum += values. next ( ). get ( ) ;
      }
      output. collect (key, new IntWritable (sum ) ) ;
    }
}
public class MyWordCount {
    public static void main ( String [ ] args ) throws Exception {
      JobConf conf = new JobConf (MyWordCount. class ) ;
      conf. setJobName ( "my_word_count" ) ;
      conf. setOutputKeyClass (Text. class ) ;
      conf. setOutputValueClass (IntWritable. class ) ;
      conf. setMapperClass ( Map. class ) ;
      conf. setCombinerClass (Reduce. class ) ;
      conf. setReducerClass (Reduce. class ) ;
      conf. setInputFormat (TextInputFormat. class ) ;
      conf. setOutputFormat (TextOutputFormat. class ) ;
      // 第一个参数表示输入
      FileInputFormat. setInputPaths (conf, new Path (args [ 0 ] ) ) ;
      // 第二个输入参数表示输出
      FileOutputFormat. setOutputPath (conf, new Path (args [ 1 ] ) ) ;
      JobClient. runJob (conf ) ;
    }
}

请将此 Java 文件的内容保存到 NameNode 容器中去,建议位置:

/home/hadoop/MyWordCount/cn/xnip/hadoop/MyWordCount.java

注意:根据当前情况,有的 Docker 环境中安装的 JDK 不支持中文,所以保险起见,请去掉以上代码中的中文注释。

进入目录:

cd /home/hadoop/MyWordCount

编译:

javac -classpath ${HADOOP_HOME}/share/hadoop/mapreduce/hadoop-mapreduce-client-core-3.1.4.jar -classpath ${HADOOP_HOME}/share/hadoop/client/hadoop-client-api-3.1.4.jar cn/xnip/hadoop/MyWordCount.java

打包:

jar -cf my-word-count.jar cn

执行:

hadoop jar my-word-count.jar cn.xnip.hadoop.MyWordCount /wordcount/input /wordcount/output2

查看结果:

hadoop fs -cat /wordcount/output2/part-00000

输出:

I       4
hadoop  2
like    2
love    2
xnip  2