当前位置: 首页 > 工具软件 > Mapper > 使用案例 >

Mapper分析

宗政坚白
2023-12-01

Mapper分析

public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {

  public abstract class Context implements MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
  }
  
  protected void setup(Context context) throws IOException, InterruptedException {
    // NOTHING
  }

  protected void map(KEYIN key, VALUEIN value, Context context) 
  		throws IOException, InterruptedException {
    context.write((KEYOUT) key, (VALUEOUT) value);
  }

  protected void cleanup(Context context) throws IOException, InterruptedException {
    // NOTHING
  }

  public void run(Context context) throws IOException, InterruptedException {
    setup(context);
    try {
      while (context.nextKeyValue()) {
        map(context.getCurrentKey(), context.getCurrentValue(), context);
      }
    } finally {
      cleanup(context);
    }
  }
}

1. 作用:

将输入键/值对映射到一组中间键/值对。转换后的中间记录不需要与输入记录的类型相同,但是在程序中必须job.setMapperOutputKeyClass和job.setMapperOutputKeyValueClass指定输出的数据类型。

【MR通过每个inputsplit(切分)生成对应的Maptask(map任务),而inputsplit(切分)则由InputFormat生成(默认使用FileInputFormat)】

2. Mapper函数:(源码中注释翻译)

  • **setup():**在任务开始时调用一次,用于加载资源【如:mysql的Driver创建】
  • **map():**为inputsplit中的每个键/值对调用一次,大多数应用程序应该覆盖它,但是默认值是identity函数
  • **cleanup():**任务结束时调用一次,释放资源【关闭mysql连接】
  • **run():**专家用户可以重写此方法,以便对执行mapper。

3. FileInputFormat类

org.apache.hadoop.mapreduce.lib.input.FileInputFormat

【说明:FileInputFormat是InputFormat的文件类型的子类,它提供了getSplit()方法】

  • FileInputFormat的子类

    实现isSplitable()方法,用于判定当前文件是否可切分(压缩格式不可切,除了bzip2)。

    protected boolean isSplitable(FileSystem fs, Path filename) {
    return true;
    }

    • 属性:【只能在底层代码中修改】

    **mapreduce.input.fileinputformat.split.maxsize:**切分最大大小,默认Long.MAX_VALUE。

**mapreduce.input.fileinputformat.split.minsize:**切分最小大小,默认0。
修改逻辑切分的最小大小,解决由大量小文件产生多个map任务而导致集群运行过慢的情况。

  • 【将多个小文件当成单个大文件来逻辑切分,处理数据】

方案为,修改mapreduce.input.fileinputformat.split.maxsize
在FileInputFormat中一个重要的方法:【public InputSplit[] getSplits(JobConf job, int numSplits)】
–>computeSplitSize(blockSize, minSize, maxSize):计算逻辑切分大小

​ -->splitsize = Math.max(minSize, Math.min(maxSize, blockSize));

【默认切分大小为blocksize,128M】
总结:map数量由InputSplit切分大小来决定,而InputSplit切分计算与(block、maxsize、minsize) 有关系!!!! 【P222】

4. TextInputFormat类

org.apache.hadoop.mapreduce.lib.input.TextInputFormat

【说明:纯文本文件格式化类,它以行进行分割,行的分割标识为回车或换行】
【key为文件的偏移量,而value为行文本值。】

public class TextInputFormat extends FileInputFormat<LongWritable, Text>
  implements JobConfigurable {

  private CompressionCodecFactory compressionCodecs = null;
  
  public void configure(JobConf conf) {
    compressionCodecs = new CompressionCodecFactory(conf);
  }
  
  protected boolean isSplitable(FileSystem fs, Path file) {
    final CompressionCodec codec = compressionCodecs.getCodec(file);
    if (null == codec) {
      return true;
    }
    return codec instanceof SplittableCompressionCodec;
  }

  public RecordReader<LongWritable, Text> getRecordReader(
                                          InputSplit genericSplit, JobConf job,
                                          Reporter reporter)
    throws IOException {
    
    reporter.setStatus(genericSplit.toString());
    String delimiter = job.get("textinputformat.record.delimiter");
    byte[] recordDelimiterBytes = null;
    if (null != delimiter) {
      recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
    }
    return new LineRecordReader(job, (FileSplit) genericSplit,
        recordDelimiterBytes);
  }
}

说明:创建RecordReader,通过textinputformat.record.delimiter获取记录的默认分隔符,然后调用LineRecordReader(行记录阅读器)

5. LineRecordReader类

org.apache.hadoop.mapreduce.lib.input.LineRecordReader

【说明:将键视为文件中的偏移量,将值视为行。】

–>initialize(InputSplit genericSplit,TaskAttemptContext context)
–>new SplitLineReader(codec.createInputStream(fileIn,decompressor), job, this.recordDelimiterBytes);

生成split行阅读器对象(SplitLineReader in)
【SplitLineReader extends org.apache.hadoop.util.LineReader】

在LineRecordReader类中,Mapper通过调用nextKeyValue()方法将每行记录写到key和value中。
【org.apache.hadoop.util.LineReader.class】
【说明:从输入流中提供行读取器的类,默认按照’\r’或’\n’进行分割。】
in.readLine(Text str, int maxLineLength,int maxBytesToConsume):从InputStream中读取一行到给定的文本。返回读取的字节个数。

 类似资料: