当前位置: 首页 > 知识库问答 >
问题:

Spark a query from PostgreSQL(JDBC)中的分区

秦学林
2023-03-14

我正在集群模式下运行spark(使用pyspark)并通过JDBC从RDBMS读取数据。我通过查询(而不是直接通过表)读取信息

我使用选项来分区,比如numPartitions、upperBound等。

sql = (select ... )

而且

df=spark
.read
.jdbc(url=jdbcUrl, table=sql, 
properties=connectionProperties, column="brand_id", lowerBound=1, 
upperBound=12000,numPartitions=10000 )

不幸的是,在生成查询的末尾,在WHERE子句上触发转换分区选项,因此PostGreSQL读取full表full而不使用索引!

我有一个这样的问题

SELECT "brand_id","brand_name","eq_ref_raw","oe","btimestamp" FROM 
  ( select  ... ) 
tab WHERE brand_id >= 5 AND brand_id < 6  

共有1个答案

张伯寅
2023-03-14

你试图做的事情在目前的Spark版本中似乎是不可能的。执行的查询构造为:

   val sqlText = s"SELECT $columnList FROM ${options.table} $myWhereClause"
    stmt = conn.prepareStatement(sqlText,
        ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)

(请参阅org.apache.spark.sql.execution.datasources.jdbc.jdbcrdd#compute)

options.table对应于table属性中的(SELECT...)语句。

您能解释一下为什么需要从子查询加载列吗?如果在此子查询中进行联接或其他SQL操作,则始终可以“绕过”这些操作,并使用Spark SQL来完成这些操作(联接、SQL操作等等)。

编辑:

正如您所解释的,使用子查询的原因是JSONB提取。显然,它作为SQL本机操作会执行得更好,但是如果您想使用Spark来并行化您的处理,IMO,您需要在Spark级别声明JSON处理,如下所示:

CREATE TABLE jsonb_test (
  content jsonb
);

INSERT INTO jsonb_test (content) VALUES 
('{"first_name": "X", "last_name": "Y"}');

和代码:

val opts = Map("url" -> "jdbc:postgresql://127.0.0.1:5432/spark_jsonb_test",
  "dbtable" -> "jsonb_test", "user" -> "root", "password" -> "root",
  "driver" -> "org.postgresql.Driver")
val schema = StructType(Seq(
  StructField("first_name", StringType, true), StructField("last_name", StringType, true)
))
import sparkSession.implicits._
val personDataFrame = sparkSession.read
  .format("jdbc")
  .options(opts)
  .load()
  .withColumn("person", functions.from_json($"content", schema))

val extractedJsonNames = personDataFrame.collect.map(row => row.get(1).toString)

extractedJsonNames should have size 1
extractedJsonNames(0) shouldEqual "[X,Y]"

Spark支持贯穿PostgresDialect的JSONB字段,在其将DB类型转换为Catalyst类型的方法中,将JSONB视为StringType:

  private def toCatalystType(
  typeName: String,
  precision: Int,
  scale: Int): Option[DataType] = typeName match {
      case "bool" => Some(BooleanType)
      case "bit" => Some(BinaryType)
      case "int2" => Some(ShortType)
      case "int4" => Some(IntegerType)
      case "int8" | "oid" => Some(LongType)
      case "float4" => Some(FloatType)
      case "money" | "float8" => Some(DoubleType)
      case "text" | "varchar" | "char" | "cidr" | "inet" | "json" | "jsonb" | "uuid" =>
        Some(StringType)
 类似资料:
  • 问题内容: 我想使用JDBC实现分页。我想知道的实际情况是“如何分别从数据库中获取第1页和第2页的前50条记录,然后再获得50条记录” 我的查询是[数据表包含20,000行] 对于第1页,我可以获得50条记录,对于第2页,我想要获得下50条记录。如何在JDBC中有效地实现它? 我已经搜索过,发现这是跳过首页记录的方法,但是在大型结果集上花费一些时间,我不想花这么多时间。另外,我不想在查询中使用和+

  • 我正在尝试调用一个存储过程来在SQL表中创建一个项。运行代码时,运行存储过程并创建项,但抛出此错误。表中存储的值是正确的,generate键生成正确的键。 下面是我创建参数映射的create函数: }

  • 当使用Spark sql读取jdbc数据时,Spark默认只会启动1个分区。但是当表太大时,Spark读取速度会很慢。 我知道有两种方法可以制作分区: 1.在选项中设置分区列、lowerBound、upperBound和num分区; 2.在选项中设置偏移数组; 但我的情况是: 我的jdbc表没有INT列,或者列字符串可以很容易地用这两种方式的偏移量分隔。 这两种方法在我的情况下行不通,还有其他方法

  • 问题内容: 我在Java SE 7中制作了一个简单的JDBC程序,但在编译程序后却显示“ java.lang.ClassNotFoundException:org.postgreasql.Driver”错误,我点击了此链接http://docs.oracle.com/javase/7/ docs / technotes / guides / jdbc /, 但尚未获取,因此请帮忙。 问题答案: 可

  • 问题内容: 我在where子句中使用带有时间戳的PreparedStatement: 当我在客户端计算机上具有不同的时区时,得到的结果是不同的。这是Oracle jdbc中的错误吗?或正确的行为? Oracle数据库版本为10.2,并且我尝试使用Oracle jdbc瘦驱动程序版本10.2和11.1。 参数为时间戳,我希望途中不会进行任何时间转换。数据库列类型为DATE,但我还使用TIMESTAM