有人能给一个Hbase的mapduce链接吗?我的要求是在hdfs文件上运行mapduce,并将减速器输出存储到hbase表中。Mapper输入将是hdfs文件,输出将是Text、IntWritable键值对。减速器输出将是put对象,即添加减速器Iterable IntWritable值并存储在hbase表中。
**Ckeck波纹管代码,适用于我与凤凰Hbase和地图减少**
该程序将从Hbase表中读取数据,并在map-reduce作业之后将结果插入另一个表中。
表格:-
StockComputationJob.java
public static class StockMapper extends Mapper<NullWritable, StockWritable, Text , DoubleWritable> {
private Text stock = new Text();
private DoubleWritable price = new DoubleWritable ();
@Override
protected void map(NullWritable key, StockWritable stockWritable, Context context) throws IOException, InterruptedException {
double[] recordings = stockWritable.getRecordings();
final String stockName = stockWritable.getStockName();
System.out.println("Map-"+recordings);
double maxPrice = Double.MIN_VALUE;
for(double recording : recordings) {
System.out.println("M-"+key+"-"+recording);
if(maxPrice < recording) {
maxPrice = recording;
}
}
System.out.println(stockName+"--"+maxPrice);
stock.set(stockName);
price.set(maxPrice);
context.write(stock,price);
}
}
public static void main(String[] args) throws Exception {
final Configuration conf = new Configuration();
HBaseConfiguration.addHbaseResources(conf);
conf.set(HConstants.ZOOKEEPER_QUORUM, zkUrl);
final Job job = Job.getInstance(conf, "stock-stats-job");
// We can either specify a selectQuery or ignore it when we would like to retrieve all the columns
final String selectQuery = "SELECT STOCK_NAME,RECORDING_YEAR,RECORDINGS_QUARTER FROM STOCK ";
// StockWritable is the DBWritable class that enables us to process the Result of the above query
PhoenixMapReduceUtil.setInput(job,StockWritable.class,"STOCK",selectQuery);
// Set the target Phoenix table and the columns
PhoenixMapReduceUtil.setOutput(job, "STOCK_STATS", "STOCK_NAME,MAX_RECORDING");
job.setMapperClass(StockMapper.class);
job.setReducerClass(StockReducer.class);
job.setOutputFormatClass(PhoenixOutputFormat.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(DoubleWritable.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(StockWritable.class);
TableMapReduceUtil.addDependencyJars(job);
job.waitForCompletion(true);
}
}
StockReducer.java
public class StockReducer extends Reducer<Text, DoubleWritable, NullWritable , StockWritable> {
protected void reduce(Text key, Iterable<DoubleWritable> recordings, Context context) throws IOException, InterruptedException {
double maxPrice = Double.MIN_VALUE;
System.out.println(recordings);
for(DoubleWritable recording : recordings) {
System.out.println("R-"+key+"-"+recording);
if(maxPrice < recording.get()) {
maxPrice = recording.get();
}
}
final StockWritable stock = new StockWritable();
stock.setStockName(key.toString());
stock.setMaxPrice(maxPrice);
System.out.println(key+"--"+maxPrice);
context.write(NullWritable.get(),stock);
}
}
StockWritable.java
public class StockWritable implements DBWritable,Writable {
private String stockName;
private int year;
private double[] recordings;
private double maxPrice;
public void readFields(DataInput input) throws IOException {
}
public void write(DataOutput output) throws IOException {
}
public void readFields(ResultSet rs) throws SQLException {
stockName = rs.getString("STOCK_NAME");
setYear(rs.getInt("RECORDING_YEAR"));
final Array recordingsArray = rs.getArray("RECORDINGS_QUARTER");
setRecordings((double[])recordingsArray.getArray());
}
public void write(PreparedStatement pstmt) throws SQLException {
pstmt.setString(1, stockName);
pstmt.setDouble(2, maxPrice);
}
public int getYear() {
return year;
}
public void setYear(int year) {
this.year = year;
}
public double[] getRecordings() {
return recordings;
}
public void setRecordings(double[] recordings) {
this.recordings = recordings;
}
public double getMaxPrice() {
return maxPrice;
}
public void setMaxPrice(double maxPrice) {
this.maxPrice = maxPrice;
}
public String getStockName() {
return stockName;
}
public void setStockName(String stockName) {
this.stockName = stockName;
}
}
这是可以解决你的问题的代码
HBaseConfiguration conf = HBaseConfiguration.create();
Job job = new Job(conf,"JOB_NAME");
job.setJarByClass(yourclass.class);
job.setMapperClass(yourMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Intwritable.class);
FileInputFormat.setInputPaths(job, new Path(inputPath));
TableMapReduceUtil.initTableReducerJob(TABLE,
yourReducer.class, job);
job.setReducerClass(yourReducer.class);
job.waitForCompletion(true);
class yourMapper extends Mapper<LongWritable, Text, Text,IntWritable> {
//@overide map()
}
class yourReducer
extends
TableReducer<Text, IntWritable,
ImmutableBytesWritable>
{
//@override rdeuce()
}
我有一个简单的MapReduce作业,它应该从文本文件中读取字典,然后逐行处理另一个大文件并计算逆文档矩阵。输出应该如下所示: 但是,减速器的输出只在一个huuuge行中发出。我不明白为什么它应该为每个(这是减速器的关键)发出新行。 映射器生成正确的输出(一对<code>单词id的值在单独的行中)。我在没有减速器的情况下进行了测试。reducer应该只为每个键在一行中附加与相同键对应的值。 你能看
问题内容: 下面的Mappers代码从HDFS读取文本文件正确吗?如果是这样的话: 如果不同节点中的两个映射器尝试几乎同时打开文件,会发生什么情况? 是否不需要关闭?如果是这样,如何在不关闭文件系统的情况下执行此操作? 我的代码是: 问题答案: 这将起作用,并进行一些修改-我假设您粘贴的代码被截断了: 您可以有多个映射器读取同一个文件,但是使用分布式缓存存在更多的局限性(不仅减少了承载文件块的数据
嗨,我试图实现的是将SQL原生查询结果映射到java spring jpa存储库中的DTO中,如何正确地做到这一点?我尝试了几个代码,但都不起作用,下面是我所尝试的: 第二个错误是: 无法提取ResultSet;SQL[N/A];嵌套异常是org.hibernate.exception.SqlGrammareXception:无法提取ResultSet 下面是我的DTO:
我遇到了一个非常非常奇怪的问题。还原器确实工作,但如果我检查输出文件,我只能找到映射器的输出。当我尝试调试时,在将映射器的输出值类型从Longwritable更改为Text之后,我发现了与单词计数示例相同的问题 这是结果。 然后我在输出文件中发现了奇怪的结果。这个问题发生在我将map的输出值类型和reducer的输入键类型更改为Text之后,无论我是否更改了reduce输出值的类型。我还被迫更改j
JPA存储库方法是:@query(“select date,sum(crAmt),sum(drAmt)from Daybook u where u date=?1”) public DaybookBalance findDaybookBalance(字符串d1); 我得到以下错误
我有一个用户类,有16个属性,比如名字,姓氏,出生日期,用户名,密码等...这些都存储在MySQL数据库中,当我想要检索用户时,我使用ResultSet。我想将每一列映射回用户属性,但我这样做的效率似乎非常低。例如,我正在做: 也就是说,我检索所有的列,然后通过将所有的列值插入用户构造函数来创建用户对象。 有人知道更快、更整洁的方法吗?