这个问题的目的是记录:
>
在PySpark中使用JDBC连接读写数据所需的步骤
JDBC源代码和已知解决方案可能存在的问题
只要稍作改动,这些方法就可以与其他受支持的语言(包括Scala和R)一起使用。
请参考此链接下载jdbc for postgres,并按照以下步骤下载jar文件
https://jaceklaskowski.gitbooks.io/mastering-apache-spark/exercises/spark-exercise-dataframe-jdbc-postgresql.htmljar文件将像这样在路径中下载。"/home/anand/. ivy2/jars/org.postgresql_postgresql-42.1.1.jar"
如果你的火花版本是2
from pyspark.sql import SparkSession
spark = SparkSession.builder
.appName("sparkanalysis")
.config("spark.driver.extraClassPath",
"/home/anand/.ivy2/jars/org.postgresql_postgresql42.1.1.jar")
.getOrCreate()
//for localhost database//
pgDF = spark.read \
.format("jdbc") \
.option("url", "jdbc:postgresql:postgres") \
.option("dbtable", "public.user_emp_tab") \
.option("user", "postgres") \
.option("password", "Jonsnow@100") \
.load()
print(pgDF)
pgDF.filter(pgDF["user_id"]>5).show()
将文件另存为python并运行“python respectivefilename.py”
下载mysql连接器java驱动程序并保存在spark jar文件夹中,观察下面的python代码将数据写入“acotr1”,我们必须在mysql数据库中创建acotr1表结构
spark = SparkSession.builder.appName("prasadad").master('local').config('spark.driver.extraClassPath','D:\spark-2.1.0-bin-hadoop2.7\jars\mysql-connector-java-5.1.41-bin.jar').getOrCreate()
sc = spark.sparkContext
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
df = sqlContext.read.format("jdbc").options(url="jdbc:mysql://localhost:3306/sakila",driver="com.mysql.jdbc.Driver",dbtable="actor",user="root",password="****").load()
mysql_url="jdbc:mysql://localhost:3306/sakila?user=root&password=****"
df.write.jdbc(mysql_url,table="actor1",mode="append")
>
bin/pyspark --packages group:name:version
或者将驱动程序类路径
和JAR
bin/pyspark --driver-class-path $PATH_TO_DRIVER_JAR --jars $PATH_TO_DRIVER_JAR
在JVM实例启动之前,也可以使用PYSPARK\u SUBMIT\u ARGS
环境变量或使用conf/spark默认值来设置这些属性。conf
设置火花。罐。包装
或火花。罐子
/火花。驾驶员extraClassPath
。
选择所需的模式。Spark JDBC writer支持以下模式:
append
:将这个类的内容:DataFrame
附加到存量数据。 覆盖:覆盖存量数据。
忽略
:如果数据已经存在,则静默忽略此操作。 错误
(默认情况):如果数据已经存在,则抛出异常。
不支持升级或其他细粒度修改
mode = ...
准备JDBC URI,例如:
# You can encode credentials in URI or pass
# separately using properties argument
# of jdbc method or options
url = "jdbc:postgresql://localhost/foobar"
(可选)创建JDBC参数字典。
properties = {
"user": "foo",
"password": "bar"
}
属性
/选项
也可用于设置受支持的JDBC连接属性。
使用数据帧。写jdbc
df.write.jdbc(url=url, table="baz", mode=mode, properties=properties)
要保存数据(有关详细信息,请参阅pyspark.sql.DataFrameWriter)。
已知问题:
>
假设没有驱动程序版本不匹配来解决这个问题,您可以将驱动程序
类添加到属性
。例如:
properties = {
...
"driver": "org.postgresql.Driver"
}
使用df。写格式(“jdbc”)。选项(…)。save()
可能会导致:
JAVAlang.RuntimeException:org。阿帕奇。火花sql。处决数据源。jdbc。DefaultSource不允许以选择方式创建表。
解决方案未知。
在Pyspark 1.3中,您可以尝试直接调用Java方法:
df._jdf.insertIntoJDBC(url, "baz", True)
>
使用sqlContext。阅读jdbc
:
sqlContext.read.jdbc(url=url, table="baz", properties=properties)
或者sqlContext。阅读格式(“jdbc”)
:
(sqlContext.read.format("jdbc")
.options(url=url, dbtable="baz", **properties)
.load())
已知问题和陷阱:
>
找不到合适的驱动程序-请参阅:写入数据
Spark SQL支持JDBC源的谓词下推,尽管并非所有谓词都可以下推。它也不授权限制或聚合。可能的解决方法是用有效的子查询替换dbtable
/table
参数。例如,见:
默认情况下,JDBC数据源使用单个执行器线程顺序加载数据。为了确保分布式数据加载,您可以:
列
(必须是整型
)、下边
、上边
、数值分区
见:
在分布式模式下(使用分区列或谓词),每个执行器在自己的事务中运行。如果同时修改源数据库,则无法保证最终视图的一致性。
>
Maven Repository(要获得-包
所需的坐标,请选择所需的版本,并从Gradle选项卡中复制数据,以表单compile-group: name: version
替换相应的字段)或Maven Central Repository:
根据数据库的不同,可能存在专门的源,并且在某些情况下首选:
问题内容: 这个问题的目的是记录: 在PySpark中使用JDBC连接读取和写入数据所需的步骤 JDBC源可能存在的问题以及已知的解决方案 只要稍作更改,这些方法就可以与其他支持的语言一起使用,包括Scala和R。 问题答案: 写数据 提交应用程序或启动Shell时,包括适用的JDBC驱动程序。您可以使用例如: 或结合和 也可以在启动JVM实例之前使用环境变量来设置这些属性,或使用set或/来设置
例如,我可以获得开始行和结束行,但是如何获得开始行和结束行之间的源代码呢。下面是示例代码。 我想得到下面的代码,这是与cmds相关的定义。
本文向大家介绍R使用NA值读取和写入数据,包括了R使用NA值读取和写入数据的使用技巧和注意事项,需要的朋友参考一下 示例 使用read.*函数读取表格数据集时,R自动查找看起来像的缺失值"NA"。但是,缺失值并不总是由表示NA。有时,点(.),连字符(-)或字符值(例如:)empty表示值是NA。该函数的na.strings参数read.*可用于告诉R需要将哪些符号/字符视为NA值: 还可能表明需
本文向大家介绍Php连接及读取和写入mysql数据库的常用代码,包括了Php连接及读取和写入mysql数据库的常用代码的使用技巧和注意事项,需要的朋友参考一下 既然现在你看到了这篇文章,说明你肯定知道PHP和MySQL是怎么一回事,我就不啰嗦了。但为什么你还要继续阅读此文呢?可能是以前你习惯复制粘贴一些代码,并没有真正弄懂代码的含义;也可能你以前弄懂了,但像我一样,有一段时间没有接触,生疏了;再或
我想通过将spark Java/Scala api转换为dll文件来运行C#中的apache spark源代码。我引用了IKVM/IKVMC将spark jar文件转换为dll文件,但无法得到正确的结果。有没有办法在C#中运行spark源?
我正在尝试使用read.jdbc从Db2中提取数据以实现spark。无法在查询中传递UR字符串。 如何在spark jdbc读取中将隔离设置为UR。 这将失败,错误:com.ibm.DB2.jcc.am.sqlsyntaxerrorexception:DB2 SQL error:sqlcode=-199,sqlstate=42601,sqlerrmc=ur;fetch,)OFFSET LIMIT