我正在使用下面的代码将43列和大约2,000,000行的DataFrame写入SQL Server的表中:
dataFrame
.write
.format("jdbc")
.mode("overwrite")
.option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver")
.option("url", url)
.option("dbtable", tablename)
.option("user", user)
.option("password", password)
.save()
不幸的是,尽管它确实适用于小型DataFrame,但它要么非常慢,要么对于大型DataFrame超时。关于如何优化它的任何提示?
我尝试设置 rewriteBatchedStatements=true
谢谢。
我们求助于使用azure-sqldb-spark库,而不是使用Spark的默认内置导出功能。这个库给你一个bulkCopyToSqlDB
这是一个方法 真正的 批量插入,去 了很多
更快。它比内置功能使用起来不太实用,但是以我的经验,还是值得的。
我们或多或少地像这样使用它:
import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._
import com.microsoft.azure.sqldb.spark.query._
val options = Map(
"url" -> "***",
"databaseName" -> "***",
"user" -> "***",
"password" -> "***",
"driver" -> "com.microsoft.sqlserver.jdbc.SQLServerDriver"
)
// first make sure the table exists, with the correct column types
// and is properly cleaned up if necessary
val query = dropAndCreateQuery(df, "myTable")
val createConfig = Config(options ++ Map("QueryCustom" -> query))
spark.sqlContext.sqlDBQuery(createConfig)
val bulkConfig = Config(options ++ Map(
"dbTable" -> "myTable",
"bulkCopyBatchSize" -> "20000",
"bulkCopyTableLock" -> "true",
"bulkCopyTimeout" -> "600"
))
df.bulkCopyToSqlDB(bulkConfig)
如您所见,我们CREATE TABLE
自己生成查询。您 可以
让该库创建表,但是这样做dataFrame.limit(0).write.sqlDB(config)
仍然会非常低效,可能需要您缓存您的DataFrame
,并且不允许您选择SaveMode
。
也可能很有趣:ExclusionRule
在将此库添加到sbt构建中时,我们必须使用an ,否则assembly
任务将失败。
libraryDependencies += "com.microsoft.azure" % "azure-sqldb-spark" % "1.0.2" excludeAll(
ExclusionRule(organization = "org.apache.spark")
)
我试图将写入并关注了其他几个博客,其中一个就是这个,但它不起作用。 但是,我可以成功地从读取数据为。此外,一些帖子使用了格式,其他帖子使用了格式。我不确定该用哪一个。;;和从这里开始。 守则如下: 这里有个例外: 在org.apache.hadoop.hbase.security.userprovider.instantiate(userprovider.java:122)在org.apache.
在output.csv的第2行,转义字符和引号(“”)一起丢失了。我的要求是在output.csv中也保留转义字符。任何形式的帮助都将非常感谢。 提前谢了。
我正在尝试将 json 文件读入 Spark 数据帧,但我将整个文件作为一行和一列获取,我试图将其拆分为多列: 这是我运行这行代码得到的输出: 这是已创建的数据帧的架构: 我的目标是将图像中的每个对象放在一行和分隔的列中,这意味着例如“Z4ah9SemQjX2cKN187pX”,其值为:艺术家,created_at…在第一行,“Z552dVXF5vp80bAajYrn”在第二行等。 我很陌生,可以
我正在使用从JSON事件流转换而来的Dataframes处理事件,这些数据流最终被写为Parquet格式。 但是,有些JSON事件在键中包含空格,我希望在将数据帧转换为Parquet之前记录并过滤/删除这些事件,因为被认为是Parquet模式(CatalystSchemaConverter)中的特殊字符,如下面[1]所列,因此列名中不应允许。 我如何在Dataframe中对列名进行这样的验证,并在
我有一个Spark作业,它使用以下分组查询。我知道group by是邪恶的,但在我的用例中我无法避免它。我尝试使用数据帧和hiveContext来使用它。sql()但这两种方法都会洗牌大量数据,而且速度非常慢:一个查询大约需要5分钟。我曾看到一组按阶段执行20 GB的随机读取和10 GB的随机写入。我有大约8个字段按字段分组传入 或 我已经尝试了几乎所有的调谐参数,如钨丝、lz4等。洗牌memor