当前位置: 首页 > 知识库问答 >
问题:

对 hdfs 文件运行映射还原并将化简器结果存储在 hbase 表中的示例

邓仲卿
2023-03-14

有人能给一个Hbase的mapduce链接吗?我的要求是在hdfs文件上运行mapduce,并将减速器输出存储到hbase表中。Mapper输入将是hdfs文件,输出将是Text、IntWritable键值对。减速器输出将是put对象,即添加减速器Iterable IntWritable值并存储在hbase表中。

共有2个答案

龙凯
2023-03-14

**Ckeck波纹管代码,适用于我与凤凰Hbase和地图减少**

该程序将从Hbase表中读取数据,并在map-reduce作业之后将结果插入另一个表中。

表格:-

StockComputationJob.java

public static class StockMapper extends Mapper<NullWritable, StockWritable, Text , DoubleWritable> {

    private Text stock = new Text(); 
    private DoubleWritable price = new DoubleWritable ();

    @Override
    protected void map(NullWritable key, StockWritable stockWritable, Context context) throws IOException, InterruptedException {
       double[] recordings = stockWritable.getRecordings();
       final String stockName = stockWritable.getStockName();
       System.out.println("Map-"+recordings);
       double maxPrice = Double.MIN_VALUE;
       for(double recording : recordings) {
           System.out.println("M-"+key+"-"+recording);
         if(maxPrice < recording) {
          maxPrice = recording;
             }
       }
       System.out.println(stockName+"--"+maxPrice);
       stock.set(stockName);
       price.set(maxPrice);
       context.write(stock,price);
    }

}

    public static void main(String[] args) throws Exception {

         final Configuration conf = new Configuration();
         HBaseConfiguration.addHbaseResources(conf);
         conf.set(HConstants.ZOOKEEPER_QUORUM, zkUrl);
         final Job job = Job.getInstance(conf, "stock-stats-job");
      // We can either specify a selectQuery or ignore it when we would like to retrieve all the columns
         final String selectQuery = "SELECT STOCK_NAME,RECORDING_YEAR,RECORDINGS_QUARTER FROM STOCK ";

         // StockWritable is the DBWritable class that enables us to process the Result of the above query
         PhoenixMapReduceUtil.setInput(job,StockWritable.class,"STOCK",selectQuery);  

         // Set the target Phoenix table and the columns
         PhoenixMapReduceUtil.setOutput(job, "STOCK_STATS", "STOCK_NAME,MAX_RECORDING");

         job.setMapperClass(StockMapper.class);
         job.setReducerClass(StockReducer.class); 
         job.setOutputFormatClass(PhoenixOutputFormat.class);

         job.setMapOutputKeyClass(Text.class);
         job.setMapOutputValueClass(DoubleWritable.class);
         job.setOutputKeyClass(NullWritable.class);
         job.setOutputValueClass(StockWritable.class); 
         TableMapReduceUtil.addDependencyJars(job);
         job.waitForCompletion(true);
     }

}

StockReducer.java

    public class StockReducer extends Reducer<Text, DoubleWritable, NullWritable , StockWritable> {

     protected void reduce(Text key, Iterable<DoubleWritable> recordings, Context context) throws IOException, InterruptedException {
          double maxPrice = Double.MIN_VALUE;
          System.out.println(recordings);
          for(DoubleWritable recording : recordings) {
              System.out.println("R-"+key+"-"+recording);
            if(maxPrice < recording.get()) {
             maxPrice = recording.get(); 
            }
          } 
          final StockWritable stock = new StockWritable();
          stock.setStockName(key.toString());
          stock.setMaxPrice(maxPrice);
          System.out.println(key+"--"+maxPrice);
          context.write(NullWritable.get(),stock);
        }


}

StockWritable.java

public class StockWritable  implements DBWritable,Writable {

      private String stockName;

        private int year;

        private double[] recordings;

        private double maxPrice;   

        public void readFields(DataInput input) throws IOException {

        }

        public void write(DataOutput output) throws IOException {

        }

        public void readFields(ResultSet rs) throws SQLException {
           stockName = rs.getString("STOCK_NAME");
           setYear(rs.getInt("RECORDING_YEAR"));
           final Array recordingsArray = rs.getArray("RECORDINGS_QUARTER");
           setRecordings((double[])recordingsArray.getArray());
        }

        public void write(PreparedStatement pstmt) throws SQLException {
           pstmt.setString(1, stockName);
           pstmt.setDouble(2, maxPrice); 
        }

        public int getYear() {
            return year;
        }

        public void setYear(int year) {
            this.year = year;
        }

        public double[] getRecordings() {
            return recordings;
        }

        public void setRecordings(double[] recordings) {
            this.recordings = recordings;
        }

        public double getMaxPrice() {
            return maxPrice;
        }

        public void setMaxPrice(double maxPrice) {
            this.maxPrice = maxPrice;
        }

        public String getStockName() {
            return stockName;
        }

        public void setStockName(String stockName) {
            this.stockName = stockName;
        }


}
魏彦
2023-03-14

这是可以解决你的问题的代码

HBaseConfiguration conf =  HBaseConfiguration.create();
Job job = new Job(conf,"JOB_NAME");
    job.setJarByClass(yourclass.class);
    job.setMapperClass(yourMapper.class);
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(Intwritable.class);
    FileInputFormat.setInputPaths(job, new Path(inputPath));
    TableMapReduceUtil.initTableReducerJob(TABLE,
            yourReducer.class, job);
    job.setReducerClass(yourReducer.class);
            job.waitForCompletion(true);
class yourMapper extends Mapper<LongWritable, Text, Text,IntWritable> {
//@overide map()
 }
class yourReducer
        extends
        TableReducer<Text, IntWritable, 
        ImmutableBytesWritable>
{
//@override rdeuce()
}
 类似资料:
  • 我有一个简单的MapReduce作业,它应该从文本文件中读取字典,然后逐行处理另一个大文件并计算逆文档矩阵。输出应该如下所示: 但是,减速器的输出只在一个huuuge行中发出。我不明白为什么它应该为每个(这是减速器的关键)发出新行。 映射器生成正确的输出(一对<code>单词id的值在单独的行中)。我在没有减速器的情况下进行了测试。reducer应该只为每个键在一行中附加与相同键对应的值。 你能看

  • 问题内容: 下面的Mappers代码从HDFS读取文本文件正确吗?如果是这样的话: 如果不同节点中的两个映射器尝试几乎同时打开文件,会发生什么情况? 是否不需要关闭?如果是这样,如何在不关闭文件系统的情况下执行此操作? 我的代码是: 问题答案: 这将起作用,并进行一些修改-我假设您粘贴的代码被截断了: 您可以有多个映射器读取同一个文件,但是使用分布式缓存存在更多的局限性(不仅减少了承载文件块的数据

  • 嗨,我试图实现的是将SQL原生查询结果映射到java spring jpa存储库中的DTO中,如何正确地做到这一点?我尝试了几个代码,但都不起作用,下面是我所尝试的: 第二个错误是: 无法提取ResultSet;SQL[N/A];嵌套异常是org.hibernate.exception.SqlGrammareXception:无法提取ResultSet 下面是我的DTO:

  • 我遇到了一个非常非常奇怪的问题。还原器确实工作,但如果我检查输出文件,我只能找到映射器的输出。当我尝试调试时,在将映射器的输出值类型从Longwritable更改为Text之后,我发现了与单词计数示例相同的问题 这是结果。 然后我在输出文件中发现了奇怪的结果。这个问题发生在我将map的输出值类型和reducer的输入键类型更改为Text之后,无论我是否更改了reduce输出值的类型。我还被迫更改j

  • JPA存储库方法是:@query(“select date,sum(crAmt),sum(drAmt)from Daybook u where u date=?1”) public DaybookBalance findDaybookBalance(字符串d1); 我得到以下错误

  • 我有一个用户类,有16个属性,比如名字,姓氏,出生日期,用户名,密码等...这些都存储在MySQL数据库中,当我想要检索用户时,我使用ResultSet。我想将每一列映射回用户属性,但我这样做的效率似乎非常低。例如,我正在做: 也就是说,我检索所有的列,然后通过将所有的列值插入用户构造函数来创建用户对象。 有人知道更快、更整洁的方法吗?