利用spark读取phoenix(phoenix-spark)

柴浩大
2023-12-01

为何不是jdbc访问phoenix?

具体原因参照phoenix官网地址:(https://phoenix.apache.org/phoenix_spark.html)

为何不是官网提倡的访问方式?

官网提倡:

import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.phoenix.spark._

val sc = new SparkContext("local", "phoenix-test")
val sqlContext = new SQLContext(sc)

val df = sqlContext.load(
  "org.apache.phoenix.spark",
  Map("table" -> "TABLE1", "zkUrl" -> "phoenix-server:2181")
)

df
  .filter(df("COL1") === "test_row_1" && df("ID") === 1L)
  .select(df("ID"))
  .show

但是实际上,spark2.0+版本已经不建议使用sqlContext.load()方法了,而且这种方式也有一些bug会导致phoenix数据取不出来。

正确的连接phoenix的姿势!!!

主要代码如下:

val spark = SparkSession.builder()
      .master("local[*]")
      .getOrCreate()

    val conf = new Configuration
    conf.addResource("phoenix/core-site.xml")
    conf.addResource("phoenix/hbase-site.xml")
    conf.addResource("phoenix/hdfs-site.xml")

    val df = spark.sqlContext.phoenixTableAsDataFrame(
      "tableName",
      Seq("USER_ID"),
      conf = conf,
      zkUrl = Some("zk1,zk2,zk3:2181:/hbase")
    )

spark连接phoenix的完整代码

参照我的github地址:
https://github.com/brain1234/phoenix-spark-demo.git)

 类似资料: