public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
public abstract class Context implements MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
}
protected void setup(Context context) throws IOException, InterruptedException {
// NOTHING
}
protected void map(KEYIN key, VALUEIN value, Context context)
throws IOException, InterruptedException {
context.write((KEYOUT) key, (VALUEOUT) value);
}
protected void cleanup(Context context) throws IOException, InterruptedException {
// NOTHING
}
public void run(Context context) throws IOException, InterruptedException {
setup(context);
try {
while (context.nextKeyValue()) {
map(context.getCurrentKey(), context.getCurrentValue(), context);
}
} finally {
cleanup(context);
}
}
}
将输入键/值对映射到一组中间键/值对。转换后的中间记录不需要与输入记录的类型相同,但是在程序中必须job.setMapperOutputKeyClass和job.setMapperOutputKeyValueClass指定输出的数据类型。
【MR通过每个inputsplit(切分)生成对应的Maptask(map任务),而inputsplit(切分)则由InputFormat生成(默认使用FileInputFormat)】
【org.apache.hadoop.mapreduce.lib.input.FileInputFormat】
【说明:FileInputFormat是InputFormat的文件类型的子类,它提供了getSplit()方法】
FileInputFormat的子类
实现isSplitable()方法,用于判定当前文件是否可切分(压缩格式不可切,除了bzip2)。
protected boolean isSplitable(FileSystem fs, Path filename) {
return true;
}
**mapreduce.input.fileinputformat.split.maxsize:**切分最大大小,默认Long.MAX_VALUE。
**mapreduce.input.fileinputformat.split.minsize:**切分最小大小,默认0。
修改逻辑切分的最小大小,解决由大量小文件产生多个map任务而导致集群运行过慢的情况。
方案为,修改mapreduce.input.fileinputformat.split.maxsize
在FileInputFormat中一个重要的方法:【public InputSplit[] getSplits(JobConf job, int numSplits)】
–>computeSplitSize(blockSize, minSize, maxSize):计算逻辑切分大小 -->splitsize = Math.max(minSize, Math.min(maxSize, blockSize));
【默认切分大小为blocksize,128M】
总结:map数量由InputSplit切分大小来决定,而InputSplit切分计算与(block、maxsize、minsize) 有关系!!!! 【P222】
【org.apache.hadoop.mapreduce.lib.input.TextInputFormat】
【说明:纯文本文件格式化类,它以行进行分割,行的分割标识为回车或换行】
【key为文件的偏移量,而value为行文本值。】
public class TextInputFormat extends FileInputFormat<LongWritable, Text>
implements JobConfigurable {
private CompressionCodecFactory compressionCodecs = null;
public void configure(JobConf conf) {
compressionCodecs = new CompressionCodecFactory(conf);
}
protected boolean isSplitable(FileSystem fs, Path file) {
final CompressionCodec codec = compressionCodecs.getCodec(file);
if (null == codec) {
return true;
}
return codec instanceof SplittableCompressionCodec;
}
public RecordReader<LongWritable, Text> getRecordReader(
InputSplit genericSplit, JobConf job,
Reporter reporter)
throws IOException {
reporter.setStatus(genericSplit.toString());
String delimiter = job.get("textinputformat.record.delimiter");
byte[] recordDelimiterBytes = null;
if (null != delimiter) {
recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
}
return new LineRecordReader(job, (FileSplit) genericSplit,
recordDelimiterBytes);
}
}
说明:创建RecordReader,通过textinputformat.record.delimiter获取记录的默认分隔符,然后调用LineRecordReader(行记录阅读器)
【org.apache.hadoop.mapreduce.lib.input.LineRecordReader】
【说明:将键视为文件中的偏移量,将值视为行。】
–>initialize(InputSplit genericSplit,TaskAttemptContext context)
–>new SplitLineReader(codec.createInputStream(fileIn,decompressor), job, this.recordDelimiterBytes);生成split行阅读器对象(SplitLineReader in)
【SplitLineReader extends org.apache.hadoop.util.LineReader】在LineRecordReader类中,Mapper通过调用nextKeyValue()方法将每行记录写到key和value中。
【org.apache.hadoop.util.LineReader.class】
【说明:从输入流中提供行读取器的类,默认按照’\r’或’\n’进行分割。】
in.readLine(Text str, int maxLineLength,int maxBytesToConsume):从InputStream中读取一行到给定的文本。返回读取的字节个数。