当前位置: 首页 > 知识库问答 >
问题:

具有混合数据源的MapReduce作业:HBase表和HDFS文件

索令
2023-03-14

我需要实现一个从HBase表和HDFS文件访问数据的MR作业。E、 例如,映射器从HBase表和HDFS文件读取数据,这些数据共享相同的主键,但具有不同的模式。然后,reducer将所有列(来自HBase表和HDFS文件)连接在一起。

我试着在线查看,但找不到使用这种混合数据源运行MR作业的方法。MultipleInput似乎只适用于多个HDFS数据源。如果您有一些想法,请告诉我。示例代码会很棒。

共有3个答案

姬衡
2023-03-14

猪脚本或配置单元查询可以很容易地做到这一点。

样本猪脚本

tbl = LOAD 'hbase://SampleTable'
       USING org.apache.pig.backend.hadoop.hbase.HBaseStorage(
       'info:* ...', '-loadKey true -limit 5')
       AS (id:bytearray, info_map:map[],...);

fle = LOAD '/somefile' USING PigStorage(',') AS (id:bytearray,...);

Joined = JOIN A tbl by id,fle by id;
STORE Joined to ...
郭盛
2023-03-14

没有支持此功能的OOTB功能。一种可能的解决方法是先扫描HBase表并将结果写入HDFS文件,然后使用多个输入执行reduce端连接。但这将引起一些额外的I/O开销。

秦才
2023-03-14

经过几天的调查(并得到HBase用户邮件列表的帮助),我终于想出了怎么做。下面是源代码:

public class MixMR {

public static class Map extends Mapper<Object, Text, Text, Text> {

    public void map(Object key, Text value, Context context) throws IOException,   InterruptedException {
        String s = value.toString();
        String[] sa = s.split(",");
        if (sa.length == 2) {
            context.write(new Text(sa[0]), new Text(sa[1]));
        }

    }

}

public static class TableMap extends TableMapper<Text, Text>  {
    public static final byte[] CF = "cf".getBytes();
    public static final byte[] ATTR1 = "c1".getBytes();

    public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException {

        String key = Bytes.toString(row.get());
        String val = new String(value.getValue(CF, ATTR1));

        context.write(new Text(key), new Text(val));
    }
}


public static class Reduce extends Reducer  <Object, Text, Object, Text> {
    public void reduce(Object key, Iterable<Text> values, Context context)
            throws IOException, InterruptedException {
        String ks = key.toString();
        for (Text val : values){
            context.write(new Text(ks), val);
        }

    }
}

public static void main(String[] args) throws Exception {
Path inputPath1 = new Path(args[0]);
    Path inputPath2 = new Path(args[1]);
    Path outputPath = new Path(args[2]);

    String tableName = "test";

    Configuration config = HBaseConfiguration.create();
    Job job = new Job(config, "ExampleRead");
    job.setJarByClass(MixMR.class);     // class that contains mapper

    Scan scan = new Scan();
    scan.setCaching(500);        // 1 is the default in Scan, which will be bad for MapReduce jobs
    scan.setCacheBlocks(false);  // don't set to true for MR jobs
    scan.addFamily(Bytes.toBytes("cf"));

    TableMapReduceUtil.initTableMapperJob(
            tableName,        // input HBase table name
              scan,             // Scan instance to control CF and attribute selection
              TableMap.class,   // mapper
              Text.class,             // mapper output key
              Text.class,             // mapper output value
              job);


    job.setReducerClass(Reduce.class);    // reducer class
    job.setOutputFormatClass(TextOutputFormat.class);   


    // inputPath1 here has no effect for HBase table
    MultipleInputs.addInputPath(job, inputPath1, TextInputFormat.class, Map.class);
    MultipleInputs.addInputPath(job, inputPath2,  TableInputFormat.class, TableMap.class);

    FileOutputFormat.setOutputPath(job, outputPath); 

    job.waitForCompletion(true);
}

}

 类似资料:
  • 我正在尝试创建一个地图减少工作在Java的表从一个HBase数据库。使用这里的示例和internet上的其他内容,我成功地编写了一个简单的行计数器。然而,试图编写一个实际对列中的数据执行某些操作的程序是不成功的,因为接收的字节总是空的。 我的司机工作的一部分是这样的: 如您所见,该表称为。我的映射器如下所示: 一些注意事项: 表中的列族只是。有多个列,其中一些列称为和(第一次看到); 即使值正确显

  • 我正在尝试编写一个关于HBase表中存在的数据的链接MapReduce作业,需要一些关于这个概念的帮助。我并不期望人们通过伪代码来提供代码,因为基于HBase的Java API会很好。 简单来说,我想做的是,

  • https://cloud.google.com/blog/products/data-analytics/how-to-how高效处理实时和聚合数据 用例说明步骤: 从pubsub获取流式原始事件。 验证接收的原始事件。 筛选特定类型的事件。 创建筛选事件的字典。 同时,将筛选的事件通过窗口操作传递并聚合。 2种输出类型-原始事件字典、聚合事件字典。 按照上面链接中解释的设计,原始事件字典属于低

  • 现在我正在编写一个 Java 程序,使用哈道普映射还原将输出写入 HBase。问题是关于合并器类的。因为现在我的 reduce 类扩展了 TableReducer,而不是化简器。那么我的合并器类呢,它应该也扩展表还原器,还是仍然扩展化简器?

  • 本文向大家介绍通用MapReduce程序复制HBase表数据,包括了通用MapReduce程序复制HBase表数据的使用技巧和注意事项,需要的朋友参考一下 编写MR程序,让其可以适合大部分的HBase表数据导入到HBase表数据。其中包括可以设置版本数、可以设置输入表的列导入设置(选取其中某几列)、可以设置输出表的列导出设置(选取其中某几列)。 原始表test1数据如下: 每个row key都有两

  • 合并器在映射器之后、缩减器之前运行,它将接收由给定节点上的映射器实例发出的所有数据作为输入。然后输出到减速器。 而且,如果一个化简函数既是可交换的又是结合的,那么它可以用作组合器。 我的问题是,在这种情况下,“交换和结合”这个短语是什么意思?