当前位置: 首页 > 知识库问答 >
问题:

在HDFS读书,给HBASE写信

梁承恩
2023-03-14

Mapper正在从两个地方读取文件1)用户访问的文章(按国家排序)2)国家统计(国家明智)

两个Mapper的输出都是Text,Text

我正在运行Amazon集群的程序

我的目标是从两个不同的集合中读取数据,并将结果组合起来存储在hbase中。

HDFS到HDFS正在工作。代码在减少67%时卡住了,并给出了如下错误:

17/02/24 10:45:31 INFO mapreduce.Job:  map 0% reduce 0%
17/02/24 10:45:37 INFO mapreduce.Job:  map 100% reduce 0%
17/02/24 10:45:49 INFO mapreduce.Job:  map 100% reduce 67%
17/02/24 10:46:00 INFO mapreduce.Job: Task Id : attempt_1487926412544_0016_r_000000_0, Status : FAILED
Error: java.lang.IllegalArgumentException: Row length is 0
        at org.apache.hadoop.hbase.client.Mutation.checkRow(Mutation.java:565)
        at org.apache.hadoop.hbase.client.Put.<init>(Put.java:110)
        at org.apache.hadoop.hbase.client.Put.<init>(Put.java:68)
        at org.apache.hadoop.hbase.client.Put.<init>(Put.java:58)
        at com.happiestminds.hadoop.CounterReducer.reduce(CounterReducer.java:45)
        at com.happiestminds.hadoop.CounterReducer.reduce(CounterReducer.java:1)
        at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:171)
        at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:635)
        at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:390)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:164)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)

驱动程序类是

package com.happiestminds.hadoop;



import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;


public class Main extends Configured implements Tool {

    /**
     * @param args
     * @throws Exception
     */
    public static String outputTable = "mapreduceoutput";

    public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(new Main(), args);
        System.exit(exitCode);
    }

    @Override
    public int run(String[] args) throws Exception {


        Configuration config = HBaseConfiguration.create();

        try{
            HBaseAdmin.checkHBaseAvailable(config);
        }
        catch(MasterNotRunningException e){
            System.out.println("Master not running");
            System.exit(1);
        }

        Job job = Job.getInstance(config, "Hbase Test");

        job.setJarByClass(Main.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);



        MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class, ArticleMapper.class);
        MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class, StatisticsMapper.class);

        TableMapReduceUtil.addDependencyJars(job);
        TableMapReduceUtil.initTableReducerJob(outputTable, CounterReducer.class, job);

        //job.setReducerClass(CounterReducer.class);

        job.setNumReduceTasks(1);


        return job.waitForCompletion(true) ? 0 : 1;
    }

}

减速器等级为

package com.happiestminds.hadoop;

import java.io.IOException;

import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;


public class CounterReducer extends TableReducer<Text, Text, ImmutableBytesWritable> {

    public static final byte[] CF = "counter".getBytes();
    public static final byte[] COUNT = "combined".getBytes();


    @Override
    protected void reduce(Text key, Iterable<Text> values,
            Reducer<Text, Text, ImmutableBytesWritable, Mutation>.Context context)
            throws IOException, InterruptedException {

        String vals = values.toString();
        int counter = 0;

        StringBuilder sbr = new StringBuilder();
        System.out.println(key.toString());
        for (Text val : values) {
            String stat = val.toString();
            if (stat.equals("***")) {
                counter++;
            } else {
                sbr.append(stat + ",");
            }

        }
        sbr.append("Article count : " + counter);


        Put put = new Put(Bytes.toBytes(key.toString()));
        put.addColumn(CF, COUNT, Bytes.toBytes(sbr.toString()));
        if (counter != 0) {
            context.write(null, put);
        }

    }



}

属国

<dependencies>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.7.3</version>
        </dependency>



        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>1.2.2</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-common</artifactId>
            <version>1.2.2</version>
        </dependency>


        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>1.2.2</version>
        </dependency>



    </dependencies>

共有3个答案

焦宁
2023-03-14

是否可以尝试检查是否插入任何空值?

HBase数据模型不允许零长度行键,它至少应为1个字节。

请在执行put命令之前检查您的reducer代码,是否有一些值被填充为null。

范浩宕
2023-03-14

根据程序抛出的异常,很明显,密钥长度为0,因此在放入hbase之前,您可以检查密钥长度是否为0,然后只有您可以放入hbase。

更清楚为什么hbase不支持密钥长度为0

HBase数据模型不允许0长度的行键,它应该至少是1个字节。0 字节行键保留供内部使用(用于指定空的开始键和结束键)。

宦翔
2023-03-14

一个好的做法是在提交之前验证你的价值。在您的特殊情况下,您可以验证您的密钥和sbr,或者使用适当的通知策略将它们包装到try-catch部分。如果它们不正确,您应该将它们输出到一些日志中,并用新测试用例更新您的单元测试:

 try
 {
    Put put = new Put(Bytes.toBytes(key.toString()));
    put.addColumn(CF, COUNT, Bytes.toBytes(sbr.toString()));
    if (counter != 0) {
        context.write(null, put);
    }
 }
 catch (IllegalArgumentException ex)
 {
      System.err.println("Error processing record - Key: "+ key.toString() +", values: " +sbr.ToString());
 }
 类似资料:
  • 我是大数据生态系统的新手,有点起步。 我读过几篇关于使用spark流媒体阅读Kafka主题的文章,但我想知道是否可以使用spark作业而不是流媒体阅读Kafka主题?如果是的话,你们能帮我指出一些可以让我开始学习的文章或代码片段吗。 问题的第二部分是以拼花格式向hdfs写信。一旦我读了Kafka的书,我想我会有一个rdd。将此rdd转换为数据帧,然后将数据帧写入拼花文件。这是正确的方法吗。 感谢您

  • 一、Storm集成HDFS 1.1 项目结构 本用例源码下载地址:storm-hdfs-integration 1.2 项目主要依赖 项目主要依赖如下,有两个地方需要注意: 这里由于我服务器上安装的是 CDH 版本的 Hadoop,在导入依赖时引入的也是 CDH 版本的依赖,需要使用 <repository> 标签指定 CDH 的仓库地址; hadoop-common、hadoop-client、

  • 到目前为止,我做了一些研究和ACC。据我所知,Hadoop在HDFS中提供了处理原始数据块(文件)的框架,而HBase是Hadoop之上的数据库引擎,它基本上处理结构化数据而不是原始数据块。Hbase在HDFS上提供了一个逻辑层,就像SQL一样。正确吗?

  • 找到7个项目drwxr-xr-x-hbase用户0 201 4-06-25 18:58/hbase/.tmp ... 但当我运行此命令时,我会得到 yarn-site.xml Hbase配置hbase-site.xml 我可以浏览http://localhost:50070和http://localhost:8088/cluster 在hbase-marc-master-marc-pc.log中,

  • 我是Hadoop的新手。我正在浏览专业Hadoop解决方案的书,以获得一些关于Hadoop和生态系统的知识。我想澄清HDFS和HBase之间的主要区别是什么。我理解的方式就像两者都是存储系统。它们的区别只是在访问数据方面。HBase通过非关系型数据库访问数据,HDFS使用计算框架(MapReduce)处理数据。如果是这种情况,为什么我们不能只有一个存储HDFS或HBase。根据需求,他们将插入和插

  • 我正在尝试将文本文件复制到hdfs位置。 我面临访问问题,所以我尝试更改权限。 但我无法更改以下相同的面临错误: