当前位置: 首页 > 面试题库 >

如何使用JDBC源在(Py)Spark中写入和读取数据?

柳坚白
2023-03-14
问题内容

这个问题的目的是记录:

  • 在PySpark中使用JDBC连接读取和写入数据所需的步骤

  • JDBC源可能存在的问题以及已知的解决方案

只要稍作更改,这些方法就可以与其他支持的语言一起使用,包括Scala和R。


问题答案:

写数据

  1. 提交应用程序或启动Shell时,包括适用的JDBC驱动程序。您可以使用例如--packages
    bin/pyspark --packages group:name:version
    

或结合driver-class-pathjars

    bin/pyspark --driver-class-path $PATH_TO_DRIVER_JAR --jars $PATH_TO_DRIVER_JAR

也可以PYSPARK_SUBMIT_ARGS在启动JVM实例之前使用环境变量来设置这些属性,或使用conf/spark- defaults.confsetspark.jars.packagesspark.jars/来设置这些属性spark.driver.extraClassPath

  1. 选择所需的模式。Spark JDBC编写器支持以下模式:
* `append`:将此:class:的内容追加`DataFrame`到现有数据中。
* `overwrite`:覆盖现有数据。
* `ignore`:如果数据已经存在,请静默忽略此操作。
* `error` (默认情况):如果数据已经存在,则引发异常。

不支持更新或其他细粒度的修改

    mode = ...
  1. 准备JDBC URI,例如:

    # You can encode credentials in URI or pass
    

    of jdbc method or options

    url = “jdbc:postgresql://localhost/foobar”

  2. (可选)创建JDBC参数字典。

    properties = {
    "user": "foo",
    "password": "bar"
    

    }

properties/options还可以用于设置支持的JDBC连接属性。

  1. 采用 DataFrame.write.jdbc
    df.write.jdbc(url=url, table="baz", mode=mode, properties=properties)
    

保存数据(pyspark.sql.DataFrameWriter有关详细信息,请参阅)。

已知问题

  • 使用--packagesjava.sql.SQLException: No suitable driver found for jdbc: ...)包含驱动程序后,找不到合适的驱动程序

假设没有驱动程序版本不匹配可以解决此问题,则可以将driver类添加到中properties。例如:

    properties = {
    ...
    "driver": "org.postgresql.Driver"
}
  • 使用df.write.format("jdbc").options(...).save()可能会导致:

java.lang.RuntimeException:org.apache.spark.sql.execution.datasources.jdbc.DefaultSource不允许创建表为select。

解决方案未知。

  • 在Pyspark 1.3中,您可以尝试直接调用Java方法:
    df._jdf.insertIntoJDBC(url, "baz", True)
    

读取数据

  1. 遵循 写数据中的 步骤1-4 __
  2. 用途sqlContext.read.jdbc
    sqlContext.read.jdbc(url=url, table="baz", properties=properties)
    

sqlContext.read.format("jdbc")

    (sqlContext.read.format("jdbc")
    .options(url=url, dbtable="baz", **properties)
    .load())

已知问题和陷阱

  • 找不到合适的驱动程序-请参阅:写入数据
  • Spark SQL支持JDBC源的谓词下推,尽管并非所有谓词都可以下推。它也没有委派限制或聚合。可能的解决方法是用有效的子查询替换dbtable/table参数。
  • 默认情况下,JDBC数据源使用单个执行程序线程顺序加载数据。为确保分布式数据加载,您可以:

    • 提供分区column(必须IntegeTypelowerBoundupperBoundnumPartitions
    • 提供互斥谓词的列表,predicates每个所需分区一个。

看到:

* [通过JDBC从RDBMS读取时,对spark进行分区](https://stackoverflow.com/q/43150694/6910411),
* [从JDBC源迁移数据时如何优化分区?](https://stackoverflow.com/q/52603131/6910411),
* [如何使用DataFrame和JDBC连接提高慢速Spark作业的性能?](https://stackoverflow.com/q/32188295/6910411)
* [使用JDBC导入Postgres时如何对Spark RDD进行分区?](https://stackoverflow.com/q/39597971/6910411)
  • 在分布式模式(具有分区列或谓词)中,每个执行程序都在其自己的事务中运行。如果同时修改源数据库,则不能保证最终视图将保持一致。

在哪里找到合适的驱动程序:

  • Maven存储库(以获取用于--packages选择所需版本的所需坐标,并从Gradle选项卡中以compile-group:name:version替换各个字段的形式复制数据)或Maven Central存储库:

    • PostgreSQL的
    • 的MySQL

其他选择

根据数据库的不同,可能存在专门的来源,并且在某些情况下是首选的来源:

  • Greenplum-关键Greenplum-Spark连接器
  • Apache Phoenix- Apache Spark插件
  • Microsoft SQL Server- Azure SQL数据库和SQL Server的Spark连接器
  • Amazon Redshift- Databricks Redshift连接器(当前版本仅在专有Databricks Runtime中可用。已停产的开源版本,可在GitHub上获得)。


 类似资料:
  • 这个问题的目的是记录: > 在PySpark中使用JDBC连接读写数据所需的步骤 JDBC源代码和已知解决方案可能存在的问题 只要稍作改动,这些方法就可以与其他受支持的语言(包括Scala和R)一起使用。

  • 本文向大家介绍R使用NA值读取和写入数据,包括了R使用NA值读取和写入数据的使用技巧和注意事项,需要的朋友参考一下 示例 使用read.*函数读取表格数据集时,R自动查找看起来像的缺失值"NA"。但是,缺失值并不总是由表示NA。有时,点(.),连字符(-)或字符值(例如:)empty表示值是NA。该函数的na.strings参数read.*可用于告诉R需要将哪些符号/字符视为NA值: 还可能表明需

  • 我正在尝试使用read.jdbc从Db2中提取数据以实现spark。无法在查询中传递UR字符串。 如何在spark jdbc读取中将隔离设置为UR。 这将失败,错误:com.ibm.DB2.jcc.am.sqlsyntaxerrorexception:DB2 SQL error:sqlcode=-199,sqlstate=42601,sqlerrmc=ur;fetch,)OFFSET LIMIT

  • 我试图使用下面的代码将一个示例json文件读取到SqlContext中,但失败了,随后出现了datasource错误。 Java语言lang.ClassNotFoundException:未能找到数据源:json。请在以下位置查找包裹http://spark-packages.org位于组织。阿帕奇。火花sql。处决数据源。ResolvedDataSource美元。org上的lookUpdateS

  • 问题内容: 我已经编写了一个applet,并将其安装在智能卡中。但是我不知道如何在智能卡上读写数据? 从智能卡读取数据是否正确? 请告诉我如何使用javacard将数据写入智能卡。 问题答案: 我找到了解决方案。实际上我正在使用eclipse,因为其中安装了编辑器java卡插件。当我在智能卡上运行程序时,每次都会在以前的applet上安装最新的applet。要查看结果,我们可以使用pyapdu工具

  • 我使用Spark JDBC将数据摄取到Mysql表中。如果表不存在,它也会创建一个表。许多文本都有特殊字符。如果遇到任何特殊的食物,摄入就会失败。我通过在MySQL表中手动设置字符集utf8解决了这个问题。 是否可以在Spark JDBC中创建表时设置? 我使用DataFrames保存数据到MySQL。