7.0 MapReduce 编程
精华
小牛编辑
130浏览
2023-03-14
在学习了 MapReduce 的使用之后,我们已经可以处理 Word Count 这类统计和检索任务,但是客观上 MapReduce 可以做的事情还有很多。
MapReduce 主要是依靠开发者通过编程来实现功能的,开发者可以通过实现 Map 和 Reduce 相关的方法来进行数据处理。
为了简单的展示这一过程,我们将手工编写一个 Word Count 程序。
注意:MapReduce 依赖 Hadoop 的库,但由于本教程使用的 Hadoop 运行环境是 Docker 容器,难以部署开发环境,所以真实的开发工作(包含调试)将需要一个运行 Hadoop 的计算机。在这里我们仅学习已完成程序的部署。
MyWordCount.java 文件代码
/**
* 引用声明
*
*/
package cn.xnip.hadoop ;
import java.io.IOException ;
import java.util.* ;
import org.apache.hadoop.fs.Path ;
import org.apache.hadoop.io.* ;
import org.apache.hadoop.mapred.* ;
/**
* 与 `Map` 相关的方法
*/
class Map extends MapReduceBase implements Mapper <LongWritable, Text, Text, IntWritable > {
private final static IntWritable one = new IntWritable ( 1 ) ;
private Text word = new Text ( ) ;
public void map (LongWritable key,
Text value,
OutputCollector <Text, IntWritable > output,
Reporter reporter )
throws IOException {
String line = value. toString ( ) ;
StringTokenizer tokenizer = new StringTokenizer (line ) ;
while (tokenizer. hasMoreTokens ( ) ) {
word. set (tokenizer. nextToken ( ) ) ;
output. collect (word, one ) ;
}
}
}
/**
* 与 `Reduce` 相关的方法
*/
class Reduce extends MapReduceBase implements Reducer <Text, IntWritable, Text, IntWritable > {
public void reduce (Text key,
Iterator <IntWritable > values,
OutputCollector <Text, IntWritable > output,
Reporter reporter )
throws IOException {
int sum = 0 ;
while (values. hasNext ( ) ) {
sum += values. next ( ). get ( ) ;
}
output. collect (key, new IntWritable (sum ) ) ;
}
}
public class MyWordCount {
public static void main ( String [ ] args ) throws Exception {
JobConf conf = new JobConf (MyWordCount. class ) ;
conf. setJobName ( "my_word_count" ) ;
conf. setOutputKeyClass (Text. class ) ;
conf. setOutputValueClass (IntWritable. class ) ;
conf. setMapperClass ( Map. class ) ;
conf. setCombinerClass (Reduce. class ) ;
conf. setReducerClass (Reduce. class ) ;
conf. setInputFormat (TextInputFormat. class ) ;
conf. setOutputFormat (TextOutputFormat. class ) ;
// 第一个参数表示输入
FileInputFormat. setInputPaths (conf, new Path (args [ 0 ] ) ) ;
// 第二个输入参数表示输出
FileOutputFormat. setOutputPath (conf, new Path (args [ 1 ] ) ) ;
JobClient. runJob (conf ) ;
}
}
* 引用声明
*
*/
package cn.xnip.hadoop ;
import java.io.IOException ;
import java.util.* ;
import org.apache.hadoop.fs.Path ;
import org.apache.hadoop.io.* ;
import org.apache.hadoop.mapred.* ;
/**
* 与 `Map` 相关的方法
*/
class Map extends MapReduceBase implements Mapper <LongWritable, Text, Text, IntWritable > {
private final static IntWritable one = new IntWritable ( 1 ) ;
private Text word = new Text ( ) ;
public void map (LongWritable key,
Text value,
OutputCollector <Text, IntWritable > output,
Reporter reporter )
throws IOException {
String line = value. toString ( ) ;
StringTokenizer tokenizer = new StringTokenizer (line ) ;
while (tokenizer. hasMoreTokens ( ) ) {
word. set (tokenizer. nextToken ( ) ) ;
output. collect (word, one ) ;
}
}
}
/**
* 与 `Reduce` 相关的方法
*/
class Reduce extends MapReduceBase implements Reducer <Text, IntWritable, Text, IntWritable > {
public void reduce (Text key,
Iterator <IntWritable > values,
OutputCollector <Text, IntWritable > output,
Reporter reporter )
throws IOException {
int sum = 0 ;
while (values. hasNext ( ) ) {
sum += values. next ( ). get ( ) ;
}
output. collect (key, new IntWritable (sum ) ) ;
}
}
public class MyWordCount {
public static void main ( String [ ] args ) throws Exception {
JobConf conf = new JobConf (MyWordCount. class ) ;
conf. setJobName ( "my_word_count" ) ;
conf. setOutputKeyClass (Text. class ) ;
conf. setOutputValueClass (IntWritable. class ) ;
conf. setMapperClass ( Map. class ) ;
conf. setCombinerClass (Reduce. class ) ;
conf. setReducerClass (Reduce. class ) ;
conf. setInputFormat (TextInputFormat. class ) ;
conf. setOutputFormat (TextOutputFormat. class ) ;
// 第一个参数表示输入
FileInputFormat. setInputPaths (conf, new Path (args [ 0 ] ) ) ;
// 第二个输入参数表示输出
FileOutputFormat. setOutputPath (conf, new Path (args [ 1 ] ) ) ;
JobClient. runJob (conf ) ;
}
}
请将此 Java 文件的内容保存到 NameNode 容器中去,建议位置:
/home/hadoop/MyWordCount/cn/xnip/hadoop/MyWordCount.java
注意:根据当前情况,有的 Docker 环境中安装的 JDK 不支持中文,所以保险起见,请去掉以上代码中的中文注释。
进入目录:
cd /home/hadoop/MyWordCount
编译:
javac -classpath ${HADOOP_HOME}/share/hadoop/mapreduce/hadoop-mapreduce-client-core-3.1.4.jar -classpath ${HADOOP_HOME}/share/hadoop/client/hadoop-client-api-3.1.4.jar cn/xnip/hadoop/MyWordCount.java
打包:
jar -cf my-word-count.jar cn
执行:
hadoop jar my-word-count.jar cn.xnip.hadoop.MyWordCount /wordcount/input /wordcount/output2
查看结果:
hadoop fs -cat /wordcount/output2/part-00000
输出:
I 4 hadoop 2 like 2 love 2 xnip 2