我正在集群模式下运行spark(使用pyspark)并通过JDBC从RDBMS读取数据。我通过查询(而不是直接通过表)读取信息
我使用选项来分区,比如numPartitions、upperBound等。
sql = (select ... )
而且
df=spark
.read
.jdbc(url=jdbcUrl, table=sql,
properties=connectionProperties, column="brand_id", lowerBound=1,
upperBound=12000,numPartitions=10000 )
不幸的是,在生成查询的末尾,在WHERE子句上触发转换分区选项,因此PostGreSQL读取full表full而不使用索引!
我有一个这样的问题
SELECT "brand_id","brand_name","eq_ref_raw","oe","btimestamp" FROM
( select ... )
tab WHERE brand_id >= 5 AND brand_id < 6
你试图做的事情在目前的Spark版本中似乎是不可能的。执行的查询构造为:
val sqlText = s"SELECT $columnList FROM ${options.table} $myWhereClause"
stmt = conn.prepareStatement(sqlText,
ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
(请参阅org.apache.spark.sql.execution.datasources.jdbc.jdbcrdd#compute)
options.table
对应于table
属性中的(SELECT...)语句。
您能解释一下为什么需要从子查询加载列吗?如果在此子查询中进行联接或其他SQL操作,则始终可以“绕过”这些操作,并使用Spark SQL来完成这些操作(联接、SQL操作等等)。
编辑:
正如您所解释的,使用子查询的原因是JSONB提取。显然,它作为SQL本机操作会执行得更好,但是如果您想使用Spark来并行化您的处理,IMO,您需要在Spark级别声明JSON处理,如下所示:
CREATE TABLE jsonb_test (
content jsonb
);
INSERT INTO jsonb_test (content) VALUES
('{"first_name": "X", "last_name": "Y"}');
和代码:
val opts = Map("url" -> "jdbc:postgresql://127.0.0.1:5432/spark_jsonb_test",
"dbtable" -> "jsonb_test", "user" -> "root", "password" -> "root",
"driver" -> "org.postgresql.Driver")
val schema = StructType(Seq(
StructField("first_name", StringType, true), StructField("last_name", StringType, true)
))
import sparkSession.implicits._
val personDataFrame = sparkSession.read
.format("jdbc")
.options(opts)
.load()
.withColumn("person", functions.from_json($"content", schema))
val extractedJsonNames = personDataFrame.collect.map(row => row.get(1).toString)
extractedJsonNames should have size 1
extractedJsonNames(0) shouldEqual "[X,Y]"
Spark支持贯穿PostgresDialect
的JSONB字段,在其将DB类型转换为Catalyst类型的方法中,将JSONB视为StringType
:
private def toCatalystType(
typeName: String,
precision: Int,
scale: Int): Option[DataType] = typeName match {
case "bool" => Some(BooleanType)
case "bit" => Some(BinaryType)
case "int2" => Some(ShortType)
case "int4" => Some(IntegerType)
case "int8" | "oid" => Some(LongType)
case "float4" => Some(FloatType)
case "money" | "float8" => Some(DoubleType)
case "text" | "varchar" | "char" | "cidr" | "inet" | "json" | "jsonb" | "uuid" =>
Some(StringType)
问题内容: 我想使用JDBC实现分页。我想知道的实际情况是“如何分别从数据库中获取第1页和第2页的前50条记录,然后再获得50条记录” 我的查询是[数据表包含20,000行] 对于第1页,我可以获得50条记录,对于第2页,我想要获得下50条记录。如何在JDBC中有效地实现它? 我已经搜索过,发现这是跳过首页记录的方法,但是在大型结果集上花费一些时间,我不想花这么多时间。另外,我不想在查询中使用和+
我正在尝试调用一个存储过程来在SQL表中创建一个项。运行代码时,运行存储过程并创建项,但抛出此错误。表中存储的值是正确的,generate键生成正确的键。 下面是我创建参数映射的create函数: }
当使用Spark sql读取jdbc数据时,Spark默认只会启动1个分区。但是当表太大时,Spark读取速度会很慢。 我知道有两种方法可以制作分区: 1.在选项中设置分区列、lowerBound、upperBound和num分区; 2.在选项中设置偏移数组; 但我的情况是: 我的jdbc表没有INT列,或者列字符串可以很容易地用这两种方式的偏移量分隔。 这两种方法在我的情况下行不通,还有其他方法
问题内容: 我在Java SE 7中制作了一个简单的JDBC程序,但在编译程序后却显示“ java.lang.ClassNotFoundException:org.postgreasql.Driver”错误,我点击了此链接http://docs.oracle.com/javase/7/ docs / technotes / guides / jdbc /, 但尚未获取,因此请帮忙。 问题答案: 可
问题内容: 我在where子句中使用带有时间戳的PreparedStatement: 当我在客户端计算机上具有不同的时区时,得到的结果是不同的。这是Oracle jdbc中的错误吗?或正确的行为? Oracle数据库版本为10.2,并且我尝试使用Oracle jdbc瘦驱动程序版本10.2和11.1。 参数为时间戳,我希望途中不会进行任何时间转换。数据库列类型为DATE,但我还使用TIMESTAM