val outputTableName = "test3"
val conf2 = HBaseConfiguration.create()
conf2.set("hbase.zookeeper.quorum", "xx.xx.xx.xx")
conf2.set("hbase.mapred.outputtable", outputTableName)
conf2.set("mapreduce.outputformat.class", "org.apache.hadoop.hbase.mapreduce.TableOutputFormat")
val job = createJob(outputTableName, conf2)
val outputTable = sc.broadcast(outputTableName)
val hbasePuts = simpleRdd.map(k => convertToPut(k, outputTable))
hbasePuts.saveAsNewAPIHadoopDataset(job.getConfiguration)
def createJob(table: String, conf: Configuration): Job = {
conf.set(TableOutputFormat.OUTPUT_TABLE, table)
val job = Job.getInstance(conf, this.getClass.getName.split('$')(0))
job.setOutputFormatClass(classOf[TableOutputFormat[String]])
job
}
此函数转换Hbase格式的数据
def convertToPut(k: (String, String, String), outputTable: Broadcast[String]): (ImmutableBytesWritable, Put) = {
val rowkey = k._1
val put = new Put(Bytes.toBytes(rowkey))
val one = Bytes.toBytes("cf1")
val two = Bytes.toBytes("cf2")
put.addColumn(one, Bytes.toBytes("a"), Bytes.toBytes(k._2))
put.addColumn(two, Bytes.toBytes("a"), Bytes.toBytes(k._3))
(new ImmutableBytesWritable(Bytes.toBytes(outputTable.value)), put)
}
这是我在第125行得到的错误:hbaseputs.saveasnewapiHadoopDataSet(job.getconfiguration)
Exception in thread "main" java.lang.NullPointerException
at org.apache.hadoop.hbase.security.UserProvider.instantiate(UserProvider.java:122)
at org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:214)
at org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:119)
at org.apache.hadoop.hbase.mapreduce.TableOutputFormat.checkOutputSpecs(TableOutputFormat.java:177)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1099)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:1085)
at ScalaSpark$.main(ScalaSpark.scala:125)
at ScalaSpark.main(ScalaSpark.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:738)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
我也遇到过同样的问题。我认为org.apache.hadoop.hbase.MapReduce.TableOutputFormat类中有一个bug。
TableOutputFormat原始代码如下:
public void checkOutputSpecs(JobContext context) throws IOException,
InterruptedException {
try (Admin admin = ConnectionFactory.createConnection(getConf()).getAdmin()) {
TableName tableName = TableName.valueOf(this.conf.get(OUTPUT_TABLE));
if (!admin.tableExists(tableName)) {
throw new TableNotFoundException("Can't write, table does not exist:" +
tableName.getNameAsString());
}
if (!admin.isTableEnabled(tableName)) {
throw new TableNotEnabledException("Can't write, table is not enabled: " +
tableName.getNameAsString());
}
}
}
如果我将其修复如下:
public void checkOutputSpecs(JobContext context) throws IOException,
InterruptedException {
//set conf by context parameter
setConf(context.getConfiguration());
try (Admin admin = ConnectionFactory.createConnection(getConf()).getAdmin()) {
TableName tableName = TableName.valueOf(this.conf.get(OUTPUT_TABLE));
if (!admin.tableExists(tableName)) {
throw new TableNotFoundException("Can't write, table does not exist:" +
tableName.getNameAsString());
}
if (!admin.isTableEnabled(tableName)) {
throw new TableNotEnabledException("Can't write, table is not enabled: " +
tableName.getNameAsString());
}
}
}
val session = SparkSession.builder()
.config("spark.hadoop.validateOutputSpecs", false)
.getOrCreate()
我在运行一个简单的helloworld示例drools项目时遇到了以下错误。 代码: 口水代码:
当我运行Nutch命令~/Nutch/runtime/deploy$bin/Nutch crawl urls-dir/user/dlequoc/urls-depth 2-topn5时,我得到了以下异常: ====================================================================================== 你能帮忙吗?谢谢!
我正在使用CSV reader从获取数据,并在使用DataProvider的测试函数中使用该数据。
我现在使用的是Itext PDFSmartCopy。我正在使用XMLWorker向document对象添加一些业务内容。然后我声明了一个阅读器(用于连接到此文档对象的pdf文件)。然后我用相同的文档对象和输出文件流作为参数调用PdfSmartCopy。然后使用常规步骤将页面复制到文档中, 但如果我使用一个新的文档对象ie而不添加业务内容,则这一块工作得很好。
我是Selenium的新手,在运行下面的脚本时,我会遇到空指针异常,一旦站点调用loads,测试就会停止运行。我无法理解例外的原因。 代码如下: 我得到以下错误: 请帮助我,我做错了什么,提前感谢!
我试图存根这个方法:QueryUtils.to顺序(排序,根,构建器)和我正在做 但它进入queryUtils方法体,它会说Sort为null,并抛出一个NPE。但是,当它是存根时,为什么需要进入方法体?我以前没有遇到过这个问题,我认为它不应该关心该方法的内部逻辑是什么。