当前位置: 首页 > 知识库问答 >
问题:

Hadoop mapreduce - reducer未运行

东郭瀚玥
2023-03-14

我试图将批量加载map-reduce定制到HBase中,我遇到了reducer的问题。起初我认为我没有写好reducer,但是在reducer中抛出运行时异常并看到代码工作时,我意识到reducer根本没有运行。到目前为止,我看不出这个问题的一些常见答案有什么问题;

  1. 我的配置将mapoutput和output分开。
  2. 我的减速器和映射器具有覆盖功能。
  3. 我有Iterable,我的reducer输入是(writable,put),所以

这是我的代码:

司机

public int run(String[] args) throws Exception {
    int result=0;
    String outputPath = args[1];
    Configuration configuration = getConf();
    configuration.set("data.seperator", DATA_SEPERATOR);
    configuration.set("hbase.table.name",TABLE_NAME);
    configuration.set("COLUMN_FAMILY_1",COLUMN_FAMILY_1);
    Job job = new Job(configuration);
    job.setJarByClass(HBaseBulkLoadDriver.class);
    job.setJobName("Bulk Loading HBase Table::"+TABLE_NAME);
    job.setInputFormatClass(TextInputFormat.class);
    job.setMapOutputKeyClass(ImmutableBytesWritable.class);
    job.setMapperClass(HBaseBulkLoadMapper.class);
    job.setReducerClass(HBaseBulkLoadReducer.class);
    job.setOutputKeyClass(ImmutableBytesWritable.class);
    job.setOutputValueClass(Put.class);
    FileInputFormat.addInputPaths(job, args[0]);
    FileSystem.getLocal(getConf()).delete(new Path(outputPath), true);
    FileOutputFormat.setOutputPath(job, new Path(outputPath));
    job.setMapOutputValueClass(Put.class);
    job.setNumReduceTasks(1);
    HFileOutputFormat.configureIncrementalLoad(job, new HTable(configuration,TABLE_NAME));
    job.waitForCompletion(true);

制图员

public class HBaseBulkLoadMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
    private String hbaseTable;
    private String dataSeperator;
    private String columnFamily1;
    private ImmutableBytesWritable hbaseTableName;

    public void setup(Context context) {
        Configuration configuration = context.getConfiguration();
        hbaseTable = configuration.get("hbase.table.name");
        dataSeperator = configuration.get("data.seperator");
        columnFamily1 = configuration.get("COLUMN_FAMILY_1");
        hbaseTableName = new ImmutableBytesWritable(Bytes.toBytes(hbaseTable));
    }
        @Override
    public void map(LongWritable key, Text value, Context context) {
        try {
            String[] values = value.toString().split(dataSeperator);
            String rowKey = values[0];
            Put put = new Put(Bytes.toBytes(rowKey));
            BUNCH OF ADDS;
            context.write(new ImmutableBytesWritable(Bytes.toBytes(rowKey)), put);
        } catch(Exception exception) {
            exception.printStackTrace();
        }
    }
}

减速机

public class HBaseBulkLoadReducer extends Reducer<ImmutableBytesWritable, Put, ImmutableBytesWritable, Put> {
      @Override
      protected void reduce(
          ImmutableBytesWritable row,
          Iterable<Put> puts,
          Reducer<ImmutableBytesWritable, Put,
                  ImmutableBytesWritable, Put>.Context context)
          throws java.io.IOException, InterruptedException
      {
        TreeMap<String,KeyValue> map = new TreeMap<String,KeyValue>();
        int count =0;
        Append nkv;
        byte[] tmp= "".getBytes();
        Put pp = new Put(tmp);
    try{
        for (Put p : puts) {
              byte[] r =  "".getBytes();
              //KeyValue kv = new KeyValue(r);
              if (count!=0){
              r = p.getRow();
              pp.add(new KeyValue(r));
              //KeyValue k = map.get(row.toString());
              //nkv = new Append(k.getRowArray());
              //nkv=nkv.add(kv);
              //map.put(row.toString(), k.clone());
              //context.write(row,nkv);
              //tmp=ArrayUtils.addAll(tmp,kv.getValueArray());
              //map.put(row.toString(),new KeyValue(kv.getRowArray(),kv.getFamilyArray(),kv.getQualifierArray(),tmp));
              count++;
              throw new RuntimeException();
              }
              else{
              r = p.getRow();
              pp = new Put(row.toString().getBytes());
              pp.add(new KeyValue(r));
              //tmp=kv.clone().getValueArray();
              //nkv = new Append(kv.getRowArray());
              //map.put(row.toString(), kv.clone());
              count++;
              throw new RuntimeException();
          }
     }
      context.write(row,pp);
      }catch(Exception e) { e.printStackTrace();}
     }

}

我知道reducer有点混乱,但问题是,正如您所看到的,它在if和else子句上都有runtimeException,并且批量加载成功,所以我很确定reducer没有运行-我不确定为什么。所有三个文件都被maven打包在同一个目录中,仅供参考。

共有1个答案

蒋星驰
2023-03-14

弄清楚了哪里出了问题。配置增量加载根据输出值将化简器类设置为 putort 或键值排序,因此,如果我想使用自定义化简器类,则必须在配置增量加载后进行设置。在那之后,我可以看到减速器在运行。只是回答我自己的问题,这样它可能有助于遇到相同问题的人。

HFileOutputFormat.configureIncrementalLoad(job, new HTable(configuration,TABLE_NAME));
job.setReducerClass(HBaseBulkLoadReducer.class);
job.waitForCompletion(true);
 类似资料:
  • 减速器 自定义可写类

  • 我试图运行WordCount示例的一个变体,这个变体是,映射器输出文本作为键和文本作为值,而还原器输出文本作为键和NullWritable作为值。 除了地图,减少签名,我把主要的方法是这样的:

  • 目录 如何在 reducer 之间共享 state? combineReducers 是必须的吗? 处理 action 必须用 switch 语句吗? Reducer 如何在 reducer 之间共享 state? combineReducers 是必须的吗? Redux store 推荐的结构是将 state 对象按键值切分成 “层”(slice) 或者 “域”(domain),并提供独立的 r

  • Reducers 指定了应用状态的变化如何响应 actions 并发送到 store 的,记住 actions 只是描述了有事情发生了这一事实,并没有描述应用如何更新 state。 设计 State 结构 在 Redux 应用中,所有的 state 都被保存在一个单一对象中。建议在写代码前先想一下这个对象的结构。如何才能以最简的形式把应用的 state 用对象描述出来? 以 todo 应用为例,需

  • 作为核心概念, Redux 真的是一种十分简单的设计模式:所有你“写”的逻辑都集中在一个单独的函数中,并且执行这些逻辑的唯一方式就是传给 Redux 一个能够描述当时情景的普通对象(plain object)。Redux store 调用这些逻辑函数,并传入当前的 state tree 以及这些描述对象,返回新的 state tree,接着 Redux store 便开始通知这些订阅者(subsc

  • 我只是使用3机器集群测试单词计数示例。我的代码与此示例相同,但以下代码除外: