当前位置: 首页 > 知识库问答 >
问题:

在使用MapReduce进行HBase扫描的过程中,Reducer的数量始终为1

毋玺
2023-03-14

我在Mapper中进行HBase扫描,然后Reducer将结果写入HDFS。
映射器输出的记录数大约为1,000,000,000。

问题是化简器的数量总是一个,尽管我设置了 -Dmapred.reduce.tasks=100。减少过程非常缓慢。

//编辑时间:2016-12-04祝方泽
我的主类的代码:

public class GetUrlNotSent2SpiderFromHbase extends Configured implements Tool {

public int run(String[] arg0) throws Exception {

    Configuration conf = getConf();
    Job job = new Job(conf, conf.get("mapred.job.name"));
    String input_table = conf.get("input.table");       

    job.setJarByClass(GetUrlNotSent2SpiderFromHbase.class);

    Scan scan = new Scan();
    scan.setCaching(500);
    scan.setCacheBlocks(false);
    scan.addColumn(Bytes.toBytes("info"), Bytes.toBytes("sitemap_type"));
    scan.addColumn(Bytes.toBytes("info"), Bytes.toBytes("is_send_to_spider"));

    TableMapReduceUtil.initTableMapperJob(
            input_table, 
            scan, 
            GetUrlNotSent2SpiderFromHbaseMapper.class, 
            Text.class, 
            Text.class, 
            job);

    /*job.setMapperClass(GetUrlNotSent2SpiderFromHbaseMapper.class);
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(Text.class);*/

    job.setReducerClass(GetUrlNotSent2SpiderFromHbaseReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Text.class);

    if (job.waitForCompletion(true) && job.isSuccessful()) {
        return 0;
    }
    return -1;
}

public static void main(String[] args) throws Exception {
    Configuration conf = HBaseConfiguration.create();
    int res = ToolRunner.run(conf, new GetUrlNotSent2SpiderFromHbase(), args);
    System.exit(res);
}

}

以下是运行此MapReduce作业的脚本:

table="xxx"
output="yyy"
sitemap_type="zzz"

JOBCONF=""
JOBCONF="${JOBCONF} -Dmapred.job.name=test_for_scan_hbase"
JOBCONF="${JOBCONF} -Dinput.table=$table"
JOBCONF="${JOBCONF} -Dmapred.output.dir=$output"
JOBCONF="${JOBCONF} -Ddemand.sitemap.type=$sitemap_type"
JOBCONF="${JOBCONF} -Dyarn.app.mapreduce.am.command-opts='-Xmx8192m'"
JOBCONF="${JOBCONF} -Dyarn.app.mapreduce.am.resource.mb=9216"
JOBCONF="${JOBCONF} -Dmapreduce.map.java.opts='-Xmx1536m'"
JOBCONF="${JOBCONF} -Dmapreduce.map.memory.mb=2048"
JOBCONF="${JOBCONF} -Dmapreduce.reduce.java.opts='-Xmx1536m'"
JOBCONF="${JOBCONF} -Dmapreduce.reduce.memory.mb=2048"
JOBCONF="${JOBCONF} -Dmapred.reduce.tasks=100"
JOBCONF="${JOBCONF} -Dmapred.job.priority=VERY_HIGH"

hadoop fs -rmr $output
hadoop jar get_url_not_sent_2_spider_from_hbase_hourly.jar hourly.GetUrlNotSent2SpiderFromHbase $JOBCONF
echo "===== scan HBase finished ====="

我设置了<code>作业。SETNUMREDUCTASKS(100) 在代码中,它起作用了。

共有1个答案

段兴为
2023-03-14

由于您提到只有一个reduce正在工作,这就是reduce非常慢的明显原因。

将以下方法添加到上述作业驱动程序中,以打印从所有可能来源(即从-D或其他地方)应用的配置条目。提交作业前,请在驱动程序中添加此方法调用:

public static void printConfigApplied(Configuration conf) 
     try {
                conf.writeXml(System.out);
            } catch (final IOException e) {
                e.printStackTrace();
            }
}

这证明您的系统属性不是从命令行应用的,即-Dxxx,因此您传递系统属性的方式不正确。因为从语法上来说。

由于job.setnumreducetasks正在运行,我强烈怀疑您的系统属性没有正确传递给驱动程序。

 Configuration conf = getConf();
    Job job = new Job(conf, conf.get("mapred.job.name"));

将此更改为此处的示例

 类似资料:
  • 我需要在HBase中使用Scan来扫描满足某些条件的所有行:这就是为什么我要使用过滤器(实际上是一个包含两个SingleColumnValueFilter的复合过滤器列表)的原因。现在,我的rowKeys结构如下: null 提前感谢 Andrea

  • 发生的事情 由于系统中的错误,上个月的所有数据都已损坏。所以我们不得不手动删除并重新输入这些记录。基本上,我想删除在某段时间内插入的所有行。但是,我发现很难在HBase中扫描和删除数百万行。 可能的解决方案 我找到了两种批量删除的方法: 第一种是设置一个TTL,这样所有过期的记录都会被系统自动删除。但是我想保留上个月之前插入的记录,所以这个解决方案对我不起作用。 第二种选择是使用Java API编

  • 我正在编写一个程序,将给定的整数简化为它们的最简单的比率。但是当通过Scanner类在一个子方法中获取输入时发生了一个错误。下面是代码:

  • 例如,对于hbase表“test_table”,插入的值为: 在扫描“test_table”时,其中version=t+4应返回 如何在HBase中实现基于时间戳的扫描(基于小于或等于时间戳的最新可用值)?

  • 我想在HBase中执行查询操作,以使用提供的行键列表提取记录。由于MapReduce中的Mappers是并行工作的,所以我想使用它。 行键的输入列表将在~100000的范围内,我已经为映射器创建了一个,它将为每个映射器提供1000行键的列表以查询HBase表。这些查询的记录可能存在于 HBase 表中,也可能不存在,我只想返回那些存在的记录。 我看到了各种各样的例子,我发现hbase table操

  • 导出HADOOP_CLASSPATH=“