5.3.1-MapReduce

优质
小牛编辑
127浏览
2023-12-01

1.1 词频统计

1.2 计算每年的最高温度

1.2.1 实现步骤

  • 定义 Map 类(或静态内部类),继承 Mapper 接口,实现 map 方法。

    // map 方法输入为
    // 字符串偏移 :LongWritable
    // 一行文本:Text
    // 输出为:
    // year: Text
    // temperature : IntWritable
    public static class MaxTemperatureMapper extends Mapper<LongWritable, Text,Text, IntWritable>{
          // 使用 @Override 注解,确保覆盖了 Mapper 中定义的 map 方法。
          @Override
          public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
              String line=value.toString();
              String year=line.substring(15,19);
              String temperatureStr=line.substring(87,92);
              int temperature;
              if(!missing(temperatureStr)){
                  temperature=Integer.parseInt(temperatureStr);
                  context.write(new Text(year),new IntWritable(temperature));
              }
          }
    
          private boolean missing(String str){
              return str.equals("+9999");
          }
      }
    
  • 定义 Reduce 类(或静态内部类),继承 Reducer 接口,实现 reduce 方法。
      // reduce 方法输入为
      // year :Text
      // values:key(year) 相同的 temperature 的 Iterable 集合。
      // 输出为:
      // year: Text
      // 某一 key(year) 下的 temperature 的最大值: IntWritable
      public static class MaxTemperatureReducer extends Reducer<Text,IntWritable,Text,IntWritable>{
          int maxValue=Integer.MIN_VALUE;
          public void reduce(Text key, Iterable<IntWritable> values,
                             Context context) throws IOException, InterruptedException {
              while (values.iterator().hasNext()){
                  maxValue=Math.max(values.iterator().next().get(),maxValue);
              }
              context.write(key,new IntWritable(maxValue));
          }
      }
    
  • 定义 App 类,设置 job 的 mapper 类、reduce 类、combiner 类、reduce 输出 key 的类型、reduce 输出 value 的类型、reduce 数、文件输入路径、文件输出路径。
    public static void main(String[] args) throws Exception {
          Configuration conf = new Configuration();
          Job job = Job.getInstance(conf, "max temperature");
          //设置 mapper 类
          job.setMapperClass(MaxTemperatureMapper.class);
          //设置 combiner 类
          job.setCombinerClass(MaxTemperatureReducer.class);
          //设置 reducer 类
          job.setReducerClass(MaxTemperatureReducer.class);
          //设置 Reduce 输出 Key 的类型
          job.setOutputKeyClass(Text.class);
          //设置 Reduce 输出 Value 的类型
          job.setOutputValueClass(IntWritable.class);
          //设置 reduce 数
          job.setNumReduceTasks(2);
          //设置文件输入路径。
          FileInputFormat.addInputPath(job,
                  new Path("/Users/dreamaker/Downloads/hadoop-2.7.1/input1"));
          //设置文件输出路径。
          FileOutputFormat.setOutputPath(job,
                  new Path("/Users/dreamaker/Downloads/hadoop-2.7.1/output1"));
          //执行任务。
          System.exit(job.waitForCompletion(true) ? 0 : 1);
      }
    

    1.3 全排序

    如何用 Hadoop 产生全局有序的文件?最简单的方法是使用一个分区(a single partition)。替代方案:首先,创建一系列排好序的文件;其次,串联这些文件;最后,生成一个全局排序的文件。主要的思路是使用一个 partitioner 来描述全局排序的输出。

    1.3.1 实现步骤

  • 定义 App 类,相对于普通 App 添加以下配置。
          // 设置输入文件类型
          job.setInputFormatClass(SequenceFileInputFormat.class);
          //设置全局有序分区
          job.setPartitionerClass(TotalOrderPartitioner.class);
          //设置文件输入路径。
          InputSampler.RandomSampler<IntWritable, IntWritable> sampler = new InputSampler.RandomSampler<IntWritable, IntWritable>(0.1, 10, 4);
          // 设置 partition 文件写入路径,本地文件系统。
          TotalOrderPartitioner.setPartitionFile(conf,new Path("_partition2.lst"));
          InputSampler.writePartitionFile(job,sampler);
    

    1.4 二次排序

  • 定义 CombineKey 类,实现 WritableComparable 接口,实现子接口 Writable 的 write 和 readFields 方法,和子接口 Comparable 的 compareTo 方法。 ``` public class CombineKey implements WritableComparable{ private int year; private int temperature;

    //private static Logger logger=LoggerFactory.getLogger(CombineKey.class); public CombineKey() { }

    public CombineKey(int year, int temperature) {

      this.year = year;
      this.temperature = temperature;
    

    }

    public int getYear() {

      return year;
    

    }

public int compareTo(CombineKey o) {
    if(year==o.year){
        // 再按温度降序排序
        return o.temperature-temperature;
    }else {
        // 首先按年份升序排序
        return year-o.year;
    }
}

public void write(DataOutput dataOutput) throws IOException {
    dataOutput.writeInt(year);
    //logger.info("Year:"+year);
    dataOutput.writeInt(temperature);
}

public void readFields(DataInput dataInput) throws IOException {
    year=dataInput.readInt();
    temperature=dataInput.readInt();
}
@Override
public String toString(){
    return year+" "+temperature;
}

}

* 定义 YearPartitioner 继承 Partitioner 类,并重写 getPartition 方法。

public class YearPartitioner extends Partitioner { @Override public int getPartition(CombineKey key, NullWritable value, int numPartitions) { return key.getYear()%numPartitions; } }

* 定义 YearGroupComparator 继承 WritableComparator 类,重写其 compare 方法,将同一年的数据放入同一 Iterable 容器。

public class YearGroupComparator extends WritableComparator { YearGroupComparator(){ super(CombineKey.class,true); }

@Override
public int compare(WritableComparable c1, WritableComparable c2){
    return ((CombineKey)c1).getYear()-((CombineKey)c2).getYear();

}

}

* 定义 Mapper 类。

public class MaxTempSecSortMapper extends Mapper { //public MaxTempSecSortMapper(){} @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line=value.toString(); int year=Integer.valueOf(line.substring(15,19)); String temperatureStr=line.substring(87,92); if(!missing(temperatureStr)){ context.write(new CombineKey(year,Integer.parseInt(temperatureStr)),NullWritable.get()); }

}

private boolean missing(String str){
    return str.equals("+9999");
}

}

* 定义 Reduce 类。

public class MaxTempSecReducer extends Reducer { @Override public void reduce(CombineKey key,Iterable values, Context context) throws IOException, InterruptedException { context.write(key,NullWritable.get()); } }

* 定义 App 类,设置 job 的 mapper 类、reduce 类、reduce 输出 key 的类型、reduce 输出 value 的类型、分区器、分组器、reduce 数、文件输入路径、文件输出路径。

public class MaxTempSecSortDemo { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "sec sort"); //设置 mapper 类 job.setMapperClass(MaxTempSecSortMapper.class); //设置 reducer 类 job.setReducerClass(MaxTempSecReducer.class); //设置 Reduce 输出 Key 的类型 job.setOutputKeyClass(CombineKey.class); //设置 Reduce 输出 Value 的类型 job.setOutputValueClass(NullWritable.class); //设置分区器类型。 job.setPartitionerClass(YearPartitioner.class); //设置分组器,值相同的记录作为同一个 Iterable 容器发给 Reduce job.setGroupingComparatorClass(YearGroupComparator.class); // 设置 Reduce 数 job.setNumReduceTasks(2); //设置文件输入路径。 FileInputFormat.addInputPath(job, new Path("/input1")); //设置文件输出路径。 FileOutputFormat.setOutputPath(job, new Path("/output1")); //执行任务。 System.exit(job.waitForCompletion(true) ? 0 : 1); } }

## 1.5 大小表连接
### 1.5.1 实现步骤
* 定义 Mapper 类,重写 map 方法。

public static class MyMapper extends Mapper{ @Override public void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException { String line=value.toString(); String[] orderInfo=line.split("\t"); int cid=Integer.parseInt(orderInfo[3]); context.write(new IntWritable(cid),value); } }

* 定义 Reducer 类,实现 setup 和 reduce 方法。

public static class MyReducer extends Reducer{ private Map map=new HashMap(); @Override
/**

     * setup 方法将 cumstomer 数据缓存进 map。
     */ 
    public void setup(Context context) throws IOException {
        Configuration configuration=context.getConfiguration();
        FileSystem fs=FileSystem.get(configuration);
        FSDataInputStream fis=fs.open(new Path(configuration.get("customersdir")));
        BufferedReader reader=new BufferedReader(new InputStreamReader(fis));
        String line;
        while ((line=reader.readLine())!=null){
            map.put(new Integer(line.split("\t")[0]),line);
        }
        reader.close();
        //fis.close();
    }
    @Override
    public void reduce(IntWritable key,Iterable<Text> values,
                       Context context) throws IOException, InterruptedException {
        int cid=key.get();
        for (Text order:values){
            String customerInfo=map.get(cid);
            context.write(new Text(order.toString()+" "+customerInfo),NullWritable.get());
        }
    }
}
* 定义 App 类,设置 job 的 mapper 类、reduce 类、reduce 输出 key 的类型、reduce 输出 value 的类型、customer 数据路径、reduce 数、文件输入路径、文件输出路径。

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); conf.set("customersdir","/input2/customers"); Job job = Job.getInstance(conf, "join demo01"); //设置 mapper 类 job.setMapperClass(MyMapper.class); //设置 reducer 类 job.setReducerClass(MyReducer.class); //设置 Map 输出 Key 的类型 job.setMapOutputKeyClass(IntWritable.class); //设置 Map 输出 Value 的类型 job.setMapOutputValueClass(Text.class); //设置 Reduce 输出 Key 的类型 job.setOutputKeyClass(Text.class); //设置 Reduce 输出 Value 的类型 job.setOutputValueClass(NullWritable.class); //设置 reduce 数。 job.setNumReduceTasks(2); //job.setPartitionerClass(MyPartitioner.class); //设置文件输入路径。 FileInputFormat.addInputPath(job, new Path("/input3")); //设置文件输出路径。 FileOutputFormat.setOutputPath(job, new Path("/output1")); //执行任务。 System.exit(job.waitForCompletion(true) ? 0 : 1);

}
## 1.6 大大表连接
基于二次排序实现。
### 1.6.1 实现步骤
* 定义 CombineKey 类,实现 WritableComparable 接口,实现子接口 Writable 的 write 和 readFields 方法,和子接口 Comparable 的 compareTo 方法,对于 cid 相同的数据,仅有一条来自 customer,且置于 Iterable 容器头。

public class CombineKey implements WritableComparable{ private int flag; private int cid;

//private static Logger logger=LoggerFactory.getLogger(CombineKey.class);
public CombineKey() {
}

public CombineKey(int flag, int cid) {
    this.flag = flag;
    this.cid = cid;
}

public int getCid() {
    return cid;
}

public int compareTo(CombineKey o) {
    if(flag==o.flag){
        // 再按cid升序排序
        return cid-o.cid;
    }else {
        // 首先按 flag 升序排序
        return flag-o.flag;
    }
}
public void write(DataOutput dataOutput) throws IOException {
    dataOutput.writeInt(flag);
    dataOutput.writeInt(cid);
}

public void readFields(DataInput dataInput) throws IOException {
    flag=dataInput.readInt();
    cid=dataInput.readInt();
}
@Override
public String toString(){
    return flag+" "+cid;
}

}

* 定义 CidPartitioner 继承 Partitioner 类,并重写 getPartition 方法。

public class CidPartitioner extends Partitioner { public int getPartition(CombineKey key, Text value, int numPartitions) { return key.getCid()%numPartitions; } }

* 定义 CidGroupComparator 继承 WritableComparator 类,重写其 compare 方法,将 cid 相同的数据放入同一 Iterable 容器。

public class CidGroupComparator extends WritableComparator { public CidGroupComparator(){ super(CombineKey.class,true); } @Override public int compare(WritableComparable c1, WritableComparable c2){ return ((CombineKey)c1).getCid()-((CombineKey)c2).getCid(); } }

* 定义 Mapper 类,将 customer 和 order 的 cid 与 flag(customer 数据 flag 取为 0,order 数据 flag 取为 1) 的组合作为 key。

public class MyMapper extends Mapper { @Override public void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException { String line=value.toString(); int flag=0; int cid; FileSplit split=(FileSplit) context.getInputSplit(); String path=split.getPath().toString(); if(path.contains("customers")){ flag=0; cid=Integer.parseInt(line.split("\t")[0]); }else { flag=1; cid=Integer.parseInt(line.split("\t")[3]); } context.write(new CombineKey(flag,cid),value); } }

* 定义 Reducer 类,将 cid 相同的数据按 flag 进行 join。

public class MyReducer extends Reducer { @Override public void reduce(CombineKey key,Iterable values, Context context) throws IOException, InterruptedException { Iterator ite=values.iterator(); String customerInfo=((Iterator) ite).next().toString(); while (ite.hasNext()){ context.write(new Text(ite.next().toString().toString() +" "+customerInfo),NullWritable.get()); }

}

} ```

1.7 大大表连接,分区键相同,分区数相同连接

1.8 链条

2. 工作机制

任务的分配

shuffle 和排序

MapReduce 确保每个 reducer 的输入都按键排序。系统执行排序的过程——将 map 输出作为输入传给 reducer——称为 shuffle。 Partition 函数默认对 key 值进行 hash 再对 Reduce 数进行取模确定数据该发送给哪个 Reduce。 Map进程 将缓存中的数据周期性写入本地,并通知 Master 节点文件所在位置,Reduce 进程从 Master 节点获取文件所在位置,并使用 RPC 读取文件。

map 端

mapper 一般执行输入格式的解析、投影(选择相关的字段)和过滤(去除无关记录)。对于线性作业,最简单的方法是一个接一个运行;对于复杂的非线性结构,使用 JobControl 来指定作业之间的依赖关系。

缓存

map 产生输出时,利用缓冲的方式写到内存,并出于效率的考虑进行预排序。默认情况下,缓冲区大小为 100 MB,此值可以通过改变 io.sort.mb 属性来调整。一旦缓冲内容达到阈值(io.sort.spill.percent,默认为 80%),一个后台线程便开始把内容写(spill)到磁盘中。在写磁盘的过程中,map 输出将继续被写到缓冲区,但如果在此期间缓冲区被填满,map 任务会阻塞知道写磁盘的过程完成。写磁盘将按轮询的方式写到 mapred.local.dir属性指定的目录中。

分区

在写磁盘之前,线程首先根据数据最终要传送到的 reducer 把数据划分到相应的分区,在每个分区中,后台线程按键进行内排序,如果有一个 combiner,它会在排序后的输出上运行。

合并

一旦内存缓冲区达到溢出写对阈值,就会新建一个溢出写文件,因此在 map 任务完成最后一个记录输出之后,会有几个溢出写文件。在任务完成之前,溢出写文件被合并成一个已分区且已排序对输出文件。配置 io.sort.factor 控制一次合并多少个流,默认值为 10。

压缩

写磁盘压缩 map 输出,会让写磁盘的速度更快,节省磁盘空间,并且减少传给 reducer 的数据量。默认情况下,输出是不压缩的,但只要将mapred.compress.map.output 设置为 true,并使用 mapred.map.output.compression.codec 来指定压缩库。

数据传输

map 输出文件位于 map 任务的 tasktracker 的本地磁盘,reducer 通过 HTTP 方式得到 map 输出文件的分区。

reduce 端

复制阶段

reduce 任务需要集群上若干个 map 任务的输出作为其分区文件。每个 map 任务的完成时间不同,因此只要有一个 map 任务完成,reduce 任务就开始复制其输出,这就是 reduce 任务的复制阶段,reduce 任务有少量复制线程,默认值是 5 个线程,可以通过 mapred.reduce.paraller.copies 属性来指定,因此能并行得到 map 输出。

reducer 如何知道从那个 tasktarcker 取得文件输出

map 任务完成后,它们会通知其父 tasktracker 状态已更新,tasktracker 通知 jobtracker 进程,通知通过心跳机制传输。reducer 中的一个线程定期询问 jobtracker,以便获得 map 的输出位置,知道它获得所有输出位置。

如果 map 输出足够下,则会被复制到 reduce tasktracker 到内存(缓冲区大小由 mapred.job.shuffle.input.buffer.percent )

合并阶段

每台机器上有个 TaskTracker 进程,管理机器上的资源,当资源空闲时,通知 JobTracker 根据分布 在该节点上启动 Map 进程或 Reduce 进程。

3. 作业调优

调优项调优方法
map 数Map 数由 splitsize 决定,首先按 splitsize 切分数据,若剩余数据大小 < 1.1*splitsize ,则将剩余数据作为一个 split,splitsize 的取值方法:取 minSplit , maxSplit , blocksize 的中间值。
reduce 数大部分场景下可由用户指定。
combiner对于取最大值、词频统计场景,可以使用 combiner 对数据在 map 端进行 reduce 计算,对 sort 场景,必须将所有 value 值传给 reduce 端,combiner 不适用。
中间值的压缩
自定义序列
调整 shuffle

4. 数据类型和格式

4.1 MapReduce 的数据类型

4.2 输入格式

格式功能
DBInputFormat
DBOutputFormat
DBInputFormat用于使用 JDBC 从关系型数据库中读取数据。
DBOutputFormat将作业输出数据转储到数据库中。
MultipleInputs
TableInputFormat让 MapReduce 操作存放在 HBase 表的数据。
TableOutputFormat把 MapReduce 的输出写入到 Hbase 表。

4.3 输出格式

参考资料