单元测试是用来对一个模块、一个函数或者一个类来进行正确性检验的测试工作。在MapReduce开发中,如果能对Mapper和Reducer进行详尽的单元测试,将及早发现问题,加快开发进度。 本文结合具体的例子,简单总结如何使用MRUnit来对Hadoop的Mapper和Reducer进行单元测试。本文的相关代码可以从Github获取:https://github.com/liujinguang/hadoop-study.git
在MapReduce中,map函数和reduce函数的独立测试非常方便,这是由函数风格决定的。MRUnit(http://incubator.apache.org/mrunit/)是一个测试库,它便于将已知的输入传递给mapper或者检查reducer的输出是否符合预期。MRUnit与标准的执行框架(如JUnit)-起使用,因此可以将MapReduce作业的测试作为正常开发环境的一部分运行。
MaxTemperatureMapper类实现了对固定格式字符串中解析年份、温度和空气质量,在后面的MRUnit测试中,给出了字符串的例子,可以参考。
package com.jliu.mr.intro;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class MaxTemperatureMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
String line = value.toString();
String year = line.substring(15, 19);
int airTemperature;
if (line.charAt(87) == '+') { // parseInt doesn't like leading plus
// signs
airTemperature = Integer.parseInt(line.substring(88, 92));
} else {
airTemperature = Integer.parseInt(line.substring(87, 92));
}
String quality = line.substring(92, 93);
if (airTemperature != MISSING && quality.matches("[01459]")) {
context.write(new Text(year), new IntWritable(airTemperature));
}
}
private static final int MISSING = 9999;
}
使用MRUnit进行测试,首先需要创建MapDriver对象,并设置要测试的Mapper类,设定输入、期望输出。具体例子中传递一个天气记录作为mapper的输入,然后检查输出是否是读入的年份和气温。如果没有期望的输出值,MRUnit测试失败。
package com.jliu.mr.mrunit;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mrunit.mapreduce.MapDriver;
import org.junit.Test;
import com.jliu.mr.intro.MaxTemperatureMapper;
public class MaxTemperatureMapperTest {
@Test
public void testParsesValidRecord() throws IOException {
Text value = new Text("0043011990999991950051518004+68750+023550FM-12+0382" +
// ++++++++++++++++++++++++++++++year ^^^^
"99999V0203201N00261220001CN9999999N9-00111+99999999999");
// ++++++++++++++++++++++++++++++temperature ^^^^^
// 由于测试的mapper,所以适用MRUnit的MapDriver
new MapDriver<LongWritable, Text, Text, IntWritable>()
// 配置mapper
.withMapper(new MaxTemperatureMapper())
// 设置输入值
.withInput(new LongWritable(0), value)
// 设置期望输出:key和value
.withOutput(new Text("1950"), new IntWritable(-11)).runTest();
}
@Test
public void testParseMissingTemperature() throws IOException {
// 根据withOutput()被调用的次数, MapDriver能用来检查0、1或多个输出记录。
// 在这个测试中由于缺失的温度记录已经被过滤,保证对这种特定输入不产生任何输出
Text value = new Text("0043011990999991950051518004+68750+023550FM-12+0382" +
// ++++++++++++++++++++++++++++++Year ^^^^
"99999V0203201N00261220001CN9999999N9+99991+99999999999");
// ++++++++++++++++++++++++++++++Temperature ^^^^^
new MapDriver<LongWritable, Text, Text, IntWritable>()
.withMapper(new MaxTemperatureMapper())
.withInput(new LongWritable(0), value)
.runTest();
}
}
结合上面的Mapper,reducer必须找出指定键的最大值。
package com.jliu.mr.intro;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class MaxTemperatureReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values,
Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
int maxValue = Integer.MIN_VALUE;
for (IntWritable value : values) {
maxValue = Math.max(maxValue, value.get());
}
context.write(key, new IntWritable(maxValue));
}
}
对Reducer的测试,与Mapper类似,参考下面的具体测试例:
package com.jliu.mr.mrunit;
import org.apache.hadoop.mrunit.mapreduce.ReduceDriver;
import java.io.IOException;
import java.util.Arrays;
import org.apache.hadoop.io.*;
import org.junit.Test;
import com.jliu.mr.intro.MaxTemperatureReducer;
public class MaxTemperatureReducerTest {
@Test
public void testRetrunsMaximumIntegerValues() throws IOException {
new ReduceDriver<Text, IntWritable, Text, IntWritable>()
//设置Reducer
.withReducer(new MaxTemperatureReducer())
//设置输入key和List
.withInput(new Text("1950"), Arrays.asList(new IntWritable(10), new IntWritable(5)))
//设置期望输出
.withOutput(new Text("1950"), new IntWritable(10))
//运行测试
.runTest();
}
}
1. Hadoop权威指南 第3版