我需要实现一个从HBase表和HDFS文件访问数据的MR作业。E、 例如,映射器从HBase表和HDFS文件读取数据,这些数据共享相同的主键,但具有不同的模式。然后,reducer将所有列(来自HBase表和HDFS文件)连接在一起。
我试着在线查看,但找不到使用这种混合数据源运行MR作业的方法。MultipleInput似乎只适用于多个HDFS数据源。如果您有一些想法,请告诉我。示例代码会很棒。
猪脚本或配置单元查询可以很容易地做到这一点。
样本猪脚本
tbl = LOAD 'hbase://SampleTable'
USING org.apache.pig.backend.hadoop.hbase.HBaseStorage(
'info:* ...', '-loadKey true -limit 5')
AS (id:bytearray, info_map:map[],...);
fle = LOAD '/somefile' USING PigStorage(',') AS (id:bytearray,...);
Joined = JOIN A tbl by id,fle by id;
STORE Joined to ...
没有支持此功能的OOTB功能。一种可能的解决方法是先扫描HBase表并将结果写入HDFS文件,然后使用多个输入执行reduce端连接。但这将引起一些额外的I/O开销。
经过几天的调查(并得到HBase用户邮件列表的帮助),我终于想出了怎么做。下面是源代码:
public class MixMR {
public static class Map extends Mapper<Object, Text, Text, Text> {
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String s = value.toString();
String[] sa = s.split(",");
if (sa.length == 2) {
context.write(new Text(sa[0]), new Text(sa[1]));
}
}
}
public static class TableMap extends TableMapper<Text, Text> {
public static final byte[] CF = "cf".getBytes();
public static final byte[] ATTR1 = "c1".getBytes();
public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException {
String key = Bytes.toString(row.get());
String val = new String(value.getValue(CF, ATTR1));
context.write(new Text(key), new Text(val));
}
}
public static class Reduce extends Reducer <Object, Text, Object, Text> {
public void reduce(Object key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
String ks = key.toString();
for (Text val : values){
context.write(new Text(ks), val);
}
}
}
public static void main(String[] args) throws Exception {
Path inputPath1 = new Path(args[0]);
Path inputPath2 = new Path(args[1]);
Path outputPath = new Path(args[2]);
String tableName = "test";
Configuration config = HBaseConfiguration.create();
Job job = new Job(config, "ExampleRead");
job.setJarByClass(MixMR.class); // class that contains mapper
Scan scan = new Scan();
scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs
scan.setCacheBlocks(false); // don't set to true for MR jobs
scan.addFamily(Bytes.toBytes("cf"));
TableMapReduceUtil.initTableMapperJob(
tableName, // input HBase table name
scan, // Scan instance to control CF and attribute selection
TableMap.class, // mapper
Text.class, // mapper output key
Text.class, // mapper output value
job);
job.setReducerClass(Reduce.class); // reducer class
job.setOutputFormatClass(TextOutputFormat.class);
// inputPath1 here has no effect for HBase table
MultipleInputs.addInputPath(job, inputPath1, TextInputFormat.class, Map.class);
MultipleInputs.addInputPath(job, inputPath2, TableInputFormat.class, TableMap.class);
FileOutputFormat.setOutputPath(job, outputPath);
job.waitForCompletion(true);
}
}
我正在尝试创建一个地图减少工作在Java的表从一个HBase数据库。使用这里的示例和internet上的其他内容,我成功地编写了一个简单的行计数器。然而,试图编写一个实际对列中的数据执行某些操作的程序是不成功的,因为接收的字节总是空的。 我的司机工作的一部分是这样的: 如您所见,该表称为。我的映射器如下所示: 一些注意事项: 表中的列族只是。有多个列,其中一些列称为和(第一次看到); 即使值正确显
我正在尝试编写一个关于HBase表中存在的数据的链接MapReduce作业,需要一些关于这个概念的帮助。我并不期望人们通过伪代码来提供代码,因为基于HBase的Java API会很好。 简单来说,我想做的是,
https://cloud.google.com/blog/products/data-analytics/how-to-how高效处理实时和聚合数据 用例说明步骤: 从pubsub获取流式原始事件。 验证接收的原始事件。 筛选特定类型的事件。 创建筛选事件的字典。 同时,将筛选的事件通过窗口操作传递并聚合。 2种输出类型-原始事件字典、聚合事件字典。 按照上面链接中解释的设计,原始事件字典属于低
现在我正在编写一个 Java 程序,使用哈道普映射还原将输出写入 HBase。问题是关于合并器类的。因为现在我的 reduce 类扩展了 TableReducer,而不是化简器。那么我的合并器类呢,它应该也扩展表还原器,还是仍然扩展化简器?
本文向大家介绍通用MapReduce程序复制HBase表数据,包括了通用MapReduce程序复制HBase表数据的使用技巧和注意事项,需要的朋友参考一下 编写MR程序,让其可以适合大部分的HBase表数据导入到HBase表数据。其中包括可以设置版本数、可以设置输入表的列导入设置(选取其中某几列)、可以设置输出表的列导出设置(选取其中某几列)。 原始表test1数据如下: 每个row key都有两
合并器在映射器之后、缩减器之前运行,它将接收由给定节点上的映射器实例发出的所有数据作为输入。然后输出到减速器。 而且,如果一个化简函数既是可交换的又是结合的,那么它可以用作组合器。 我的问题是,在这种情况下,“交换和结合”这个短语是什么意思?