当前位置: 首页 > 面试题库 >

以编程方式将数据批量加载到HBase的最快方法是什么?

萧自珍
2023-03-14
问题内容

我有一个纯文本文件,其中可能包含数百万行,需要自定义解析,我想将其尽快加载到HBase表中(使用Hadoop或HBase Java客户端)。

我当前的解决方案基于没有Reduce部件的 MapReduce
作业。我FileInputFormat用来读取文本文件,以便每一行都传递给类的map方法Mapper。此时,将对行进行解析以形成一个Put对象,该对象将写入context。然后,TableOutputFormat获取Put对象并将其插入到表中。

该解决方案产生的平均插入率为每秒1,000行,这比我预期的要低。 我的HBase设置在单个服务器上处于伪分布式模式。

一件有趣的事是,在插入1,000,000行的过程中,生成了25个Mapper(任务),但是它们是串行运行的(一个接一个)。这正常吗?

这是我当前解决方案的html" target="_blank">代码:

public static class CustomMap extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {

    protected void map(LongWritable key, Text value, Context context) throws IOException {
        Map<String, String> parsedLine = parseLine(value.toString());

        Put row = new Put(Bytes.toBytes(parsedLine.get(keys[1])));
        for (String currentKey : parsedLine.keySet()) {
            row.add(Bytes.toBytes(currentKey),Bytes.toBytes(currentKey),Bytes.toBytes(parsedLine.get(currentKey)));
        }

        try {
            context.write(new ImmutableBytesWritable(Bytes.toBytes(parsedLine.get(keys[1]))), row);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}

public int run(String[] args) throws Exception {
    if (args.length != 2) {
        return -1;
    }

    conf.set("hbase.mapred.outputtable", args[1]);

    // I got these conf parameters from a presentation about Bulk Load
    conf.set("hbase.hstore.blockingStoreFiles", "25");
    conf.set("hbase.hregion.memstore.block.multiplier", "8");
    conf.set("hbase.regionserver.handler.count", "30");
    conf.set("hbase.regions.percheckin", "30");
    conf.set("hbase.regionserver.globalMemcache.upperLimit", "0.3");
    conf.set("hbase.regionserver.globalMemcache.lowerLimit", "0.15");

    Job job = new Job(conf);
    job.setJarByClass(BulkLoadMapReduce.class);
    job.setJobName(NAME);
    TextInputFormat.setInputPaths(job, new Path(args[0]));
    job.setInputFormatClass(TextInputFormat.class);
    job.setMapperClass(CustomMap.class);
    job.setOutputKeyClass(ImmutableBytesWritable.class);
    job.setOutputValueClass(Put.class);
    job.setNumReduceTasks(0);
    job.setOutputFormatClass(TableOutputFormat.class);

    job.waitForCompletion(true);
    return 0;
}

public static void main(String[] args) throws Exception {
    Long startTime = Calendar.getInstance().getTimeInMillis();
    System.out.println("Start time : " + startTime);

    int errCode = ToolRunner.run(HBaseConfiguration.create(), new BulkLoadMapReduce(), args);

    Long endTime = Calendar.getInstance().getTimeInMillis();
    System.out.println("End time : " + endTime);
    System.out.println("Duration milliseconds: " + (endTime-startTime));

    System.exit(errCode);
}

问题答案:

我经历的过程可能与您尝试找到一种将MR中的数据加载到HBase的有效方法的过程非常相似。我发现工作的是HFileOutputFormat用作MR的OutputFormatClass。

以下是我必须生成的代码的基础job以及map写出数据的Mapper 函数。很快
我们不再使用它,因此我手边没有数字,但是在一分钟内大约有250万条记录。

这是我编写的(分解后的)函数,用于为MapReduce流程生成作业以将数据放入HBase

private Job createCubeJob(...) {
    //Build and Configure Job
    Job job = new Job(conf);
    job.setJobName(jobName);
    job.setMapOutputKeyClass(ImmutableBytesWritable.class);
    job.setMapOutputValueClass(Put.class);
    job.setMapperClass(HiveToHBaseMapper.class);//Custom Mapper
    job.setJarByClass(CubeBuilderDriver.class);
    job.setInputFormatClass(TextInputFormat.class);
    job.setOutputFormatClass(HFileOutputFormat.class);

    TextInputFormat.setInputPaths(job, hiveOutputDir);
    HFileOutputFormat.setOutputPath(job, cubeOutputPath);

    Configuration hConf = HBaseConfiguration.create(conf);
    hConf.set("hbase.zookeeper.quorum", hbaseZookeeperQuorum);
    hConf.set("hbase.zookeeper.property.clientPort", hbaseZookeeperClientPort);

    HTable hTable = new HTable(hConf, tableName);

    HFileOutputFormat.configureIncrementalLoad(job, hTable);
    return job;
}

这是我来自HiveToHBaseMapper该类的地图函数(略作编辑)。

public void map(WritableComparable key, Writable val, Context context)
        throws IOException, InterruptedException {
    try{
        Configuration config = context.getConfiguration();
        String[] strs = val.toString().split(Constants.HIVE_RECORD_COLUMN_SEPARATOR);
        String family = config.get(Constants.CUBEBUILDER_CONFIGURATION_FAMILY);
        String column = strs[COLUMN_INDEX];
        String Value = strs[VALUE_INDEX];
        String sKey = generateKey(strs, config);
        byte[] bKey = Bytes.toBytes(sKey);
        Put put = new Put(bKey);
        put.add(Bytes.toBytes(family), Bytes.toBytes(column), (value <= 0) 
                        ? Bytes.toBytes(Double.MIN_VALUE)
                        : Bytes.toBytes(value));

        ImmutableBytesWritable ibKey = new ImmutableBytesWritable(bKey);
        context.write(ibKey, put);

        context.getCounter(CubeBuilderContextCounters.CompletedMapExecutions).increment(1);
    }
    catch(Exception e){
        context.getCounter(CubeBuilderContextCounters.FailedMapExecutions).increment(1);    
    }

}

我很确定这不会成为您的复制粘贴解决方案。显然,我在这里使用的数据不需要任何自定义处理(在此之前在MR作业中完成)。我要提供的主要内容是
HFileOutputFormat 。其余只是我如何使用它的一个示例。:)
我希望它能使您踏上寻求良好解决方案的坚实道路。:



 类似资料:
  • 将单个(或几个)宽行从Cassandra加载到C#的最高效的性能方法是什么?我的宽行有10.000-10.000列。主键由几个值组成,但是列键是一个字符串,列值是一个计数器(请参见下面的模式)。

  • 问题内容: 我想创建一个指令,该指令根据来自服务的值来检查dom中是否应包含元素(例如,检查用户角色)。 相应的指令如下所示: 最后,该元素具有ng-if属性,但由于某种原因它不适用于该元素,并且仍然存在于dom中。因此,这显然是错误的方法。 这个小提琴显示了问题:http : //jsfiddle.net/L37tZ/2/ 谁能解释为什么会这样?还有其他方法可以实现类似的行为吗?应该考虑现有的n

  • 问题内容: 我有一个很大的数据集,我必须将其转换为.csv格式,我有29列和超过一百万行。我正在使用python和pandas数据框来处理此工作。我认为,随着数据框变大,将任何行追加到它会越来越耗时。我想知道是否有更快的方法,可以共享代码中的相关代码段。 任何建议,但欢迎。 问题答案: 正如Mohit Motwani建议的最快方法是将数据收集到字典中,然后将所有内容加载到数据帧中。下面是一些速度测

  • 问题内容: 我试图自定义现有的JS库,而不修改原始JS代码。这段代码将加载一些我可以访问的外部JS文件,而我想做的就是更改原始文件中包含的功能之一,而无需将整个内容复制并粘贴到第二个JS文件中。 因此,例如,禁区JS可能具有以下功能: 我希望能够以某种方式在该函数中追加或添加一些JS代码。原因主要是在原始的不可触摸的JS中,该功能非常庞大,如果该JS得到更新,则我用它覆盖的功能将过时。 我不确定这

  • 我已经使用objectstream和fileoutputstream将HashMap序列化为一个文件。这是一个非常庞大的HashMap,大约有1.5亿条条目。当我从文件中读回它时,加载它需要很长时间(~40分钟)。 我使用FileOutputStream后跟ObjectOutputStream来序列化对象。然后,我使用ObjectInputStream和FileInputStream读取对象。 有

  • 我做了很多操作,将数字拆分为单独的数字,将数字放入ArrayList并将这些数字一个接一个地传递给其他ArrayList以进行进一步的操作,直到temList为空-然后是下一个比前一个大的数字。 我想知道哪种方法更快。 这两种方法的共同点是: 然后我可以通过两种方式将这些数字逐个传递给其他ArrayList mainList,并在之后删除它们:方式1: 方式2: 哪种方法更快?考虑到这是数十亿次操