当前位置: 首页 > 知识库问答 >
问题:

使用phoenix连接器将Spark dataframe写入Hbase

谭泳
2023-03-14
sql.write().format("org.apache.phoenix.spark")
    .mode(SaveMode.Overwrite).option("table", targetTable)
    .option("zkUrl", "localhost:2181:/hbase-unsecure)
    .insertInto(targetTable);
java.lang.NullPointerException
at org.apache.phoenix.hive.PhoenixStorageHandler.configureJobProperties(PhoenixStorageHandler.java:185)
at org.apache.phoenix.hive.PhoenixStorageHandler.configureOutputJobProperties(PhoenixStorageHandler.java:130)
at org.apache.spark.sql.hive.HiveTableUtil$.configureJobPropertiesForStorageHandler(TableReader.scala:324)
at org.apache.spark.sql.hive.SparkHiveWriterContainer.<init>(hiveWriterContainers.scala:67)
at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.sideEffectResult$lzycompute(InsertIntoHiveTable.scala:226)
at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.sideEffectResult(InsertIntoHiveTable.scala:142)
at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.doExecute(InsertIntoHiveTable.scala:310)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:86)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:86)
at org.apache.spark.sql.DataFrameWriter.insertInto(DataFrameWriter.scala:259)
at org.apache.spark.sql.DataFrameWriter.insertInto(DataFrameWriter.scala:239)
at com.lti.unitrax.data.load.IncrementalHiveTableLoadUnitraxMain.fullDataLoad(IncrementalHiveTableLoadUnitraxMain.java:166)
at com.lti.unitrax.data.load.TestDataLoad.main(TestDataLoad.java:38)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:627)

共有1个答案

夹谷腾
2023-03-14

我知道我在比赛中迟到了,但看到了这个帖子。以为我的答案能帮到别人。

以下是我在实现中所做的工作

  1. df=dataframe
  2. zookeeperurl=集群的zookeeper URL
  3. _tgttable=要在其中写入数据的表
df.write.format("org.apache.phoenix.spark")
  .mode(org.apache.spark.sql.SaveMode.Overwrite)
  .options(collection.immutable.Map("zkUrl" -> zookeeperURL, "table" -> _tgtTable)) 
  .save()
 类似资料:
  • 我在伪分布式设置中使用Hbase 1.2.6、Phoenix-4.13.1-Hbase 1.2、hadoop 2.9.0。我可以在hbase shell中创建表,我想使用phoenix来读取那些表。运行'sqlline.py localhost:2181:/hbase-unsecure'时出现以下错误: hbase-site.xml文件:

  • 我们最近开始了使用Scala、Spark和Cassandra的大数据项目,我对所有这些技术都是新手。我试图做简单的任务写到和读从卡桑德拉表。如果将属性名和列名都保留为小写或snake大小写(unserscores)就可以实现这一点,但我希望在scala代码中使用camel大小写。在Scala中使用camel case格式,在Cassandra中使用snake case格式,有没有更好的方法来实现这

  • 我的要求是尽可能的实时,这似乎离得很远。生产环境大约每3秒有400个事件。 是否需要对Cassandra中的YAML文件进行调优,或者对cassandra-connector本身进行任何更改

  • 在中有一个,可以将日志写入到。 我需要在中使用相同的功能,但我还没有找到这样做的选项。有人知道如何使用实现同样的效果吗?

  • 我正在使用pyspark和spark-cassandra-connector_2.11-2.3.0.jar与cassandra DB。我正在从一个密钥空间读取数据帧并写入另一个不同的密钥空间。这两个密钥空间具有不同的用户名和密码。 我使用以下方法创建了 sparkSession: 我使用此 SparkSession 将数据作为数据帧读取为: 我可以使用上述会话读取数据。spark_session附