当前位置: 首页 > 知识库问答 >
问题:

Spark使用ur命令读取jdbc db2

郑博
2023-03-14

我正在尝试使用read.jdbc从Db2中提取数据以实现spark。无法在查询中传递UR字符串。

如何在spark jdbc读取中将隔离设置为UR。

import json
#spark = SparkSession.builder.config('spark.driver.extraClassPath', '/home/user/db2jcc4.jar').getOrCreate()
jdbcUrl = "jdbc:db2://{0}:{1}/{2}".format("db2.1234.abcd.com", "3910", "DSN")
connectionProperties = {
  "user" : "user1",
  "password" : "password1",
  "driver" : "com.ibm.db2.jcc.DB2Driver",
  "fetchsize" : "100000"
}
pushdown_query = "(SELECT T6.COLUMN1, T6.COLUMN2 ,TO_DATE('07/11/2019 10:52:24', 'MM/DD/YYYY HH24:MI:SS') AS INSERT_DATE FROM DB1.T6 WITH UR ) ALIAS"
print(jdbcUrl)
df = spark.read.jdbc(url=jdbcUrl, table=pushdown_query, column="COLUMN1", lowerBound=1, upperBound=12732076, numPartitions=5, properties=connectionProperties)

这将失败,错误:com.ibm.DB2.jcc.am.sqlsyntaxerrorexception:DB2 SQL error:sqlcode=-199,sqlstate=42601,sqlerrmc=ur;fetch,)OFFSET LIMIT INTERSECT ORDER GROUP WHERE HAVING JOIN,driver=4.13.80

jdbc中有connection参数,但该参数仅适用于编写isolationLevel(事务隔离级别),它适用于当前连接。它可以是NONE、READ_COMMITTED、READ_UNCOMMITTED、REPEATABLE_READ或SERIALIZABLE之一,对应于JDBC的Connection对象定义的标准事务隔离级别,默认为READ_UNCOMMITTED。此选项仅适用于写作。请参阅java.sql.Connection中的文档。

下面的内容能奏效吗?

connectionProperties = {
      "user" : "user1",
      "password" : "password1",
      "driver" : "com.ibm.db2.jcc.DB2Driver",
      "fetchsize" : "100000",
"isolationLevel" : "READ_UNCOMMITTED" 
    }

共有1个答案

唐宇定
2023-03-14

根据文档和本博客,在读取操作中会忽略isolationLevel。

老实说,我不明白为什么,因为java.sql.connection setIsolationLevel为整个连接设置了默认值,而read本身并没有设置isolationLevel。

然而,这里提供了一种不同的方法。

#spark = SparkSession.builder.config('spark.driver.extraClassPath', '/home/user/db2jcc4.jar').getOrCreate()
jdbcUrl = "jdbc:db2://{0}:{1}/{2}".format("db2.1234.abcd.com", "3910", "DSN")
connectionProperties = {
  "user" : "user1",
  "password" : "password1",
  "driver" : "com.ibm.db2.jcc.DB2Driver",
  "fetchsize" : "100000"
}

df = spark.read.jdbc(url=jdbcUrl, table="DB1.T6", predicates=["1=1 WITH UR"], properties=connectionProperties).select("COLUMN1", "COLUMN2", ...)

我使用1=1子句创建了一个有效的where条件。这个摊派看起来,好像一定有一个更干净的方法,但它的工作很好

 类似资料:
  • 我正在通过Spark使用以下命令读取csv文件。 我需要创建一个Spark DataFrame。 我使用以下方法将此rdd转换为spark df: 但是在将rdd转换为df时,我需要指定df的模式。我试着这样做:(我只有两列文件和消息) 然而,我得到了一个错误:java。lang.IllegalStateException:输入行没有架构所需的预期值数。需要2个字段,但提供1个值。 我还尝试使用以

  • 我想在Java使用JNA调用Linux mount命令,并从调用结果填充一个装入点列表,但无法理解接口方法的实际返回类型应该是什么。 如果我使用int,那么它将没有任何错误地打印-1。我认为这是某种错误的迹象。 } 我尝试使用基于以下文档的不同返回类型,但没有任何工作。 默认类型映射 我想我的问题是基于错误的签名 我的库有时会导致VM崩溃:仔细检查导致崩溃的方法的签名,以确保所有参数的大小和类型都

  • 我正在尝试使用spack-csv从spack-shell中的aws s3读取csv。 下面是我所做的步骤。使用下面的命令启动spack-shell 箱子/火花壳——包装com。数据块:spark-csv\u 2.10:1.2.0 在shell中,执行以下scala代码 获取以下错误 我在这里错过了什么?请注意,我可以使用 同样的scala代码在databricks笔记本中也可以正常工作 在spar

  • 我在尝试使用Spark简单读取CSV文件时遇到了这个问题。在这样的操作之后,我想确保: 数据类型是正确的(使用提供的模式) 根据提供的架构,标头是正确的 这是我使用的代码,并且有问题: 类型为产品类型,即案例类。这是可行的,但它不会检查列名是否正确,因此我可以提供另一个文件,只要数据类型正确,就不会发生错误,而且我不知道用户提供了错误的文件,但由于某种程度上的巧合,正确的数据类型具有正确的顺序。

  • 我正在尝试使用spark阅读Kafka,但我想我会遇到一些图书馆相关的问题。 线程“main”org.apache.spark.sql.AnalysisException中出现异常:找不到数据源:Kafka。请按照“结构化流媒体+Kafka集成指南”的部署部分部署应用程序。;在org.apache.spark.sql.execution.datasources.datasource$.lookup

  • 外部读取配置命令 可以写到一个文件中,用 Mininet 直接调用。例如脚本文件名为 my_cli_script,则可以执行 mininet> source my_cli_script 或者 # mn --pre my_cli_script