当前位置: 首页 > 面试题库 >

Spark:优化将DataFrame写入SQL Server

尉迟俊能
2023-03-14
问题内容

我正在使用下面的代码将43列和大约2,000,000行的DataFrame写入SQL Server的表中:

dataFrame
  .write
  .format("jdbc")
  .mode("overwrite")
  .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver")
  .option("url", url)
  .option("dbtable", tablename)
  .option("user", user)
  .option("password", password)
  .save()

不幸的是,尽管它确实适用于小型DataFrame,但它要么非常慢,要么对于大型DataFrame超时。关于如何优化它的任何提示?

我尝试设置 rewriteBatchedStatements=true

谢谢。


问题答案:

我们求助于使用azure-sqldb-spark库,而不是使用Spark的默认内置导出功能。这个库给你一个bulkCopyToSqlDB这是一个方法 真正的 批量插入,去 了很多
更快。它比内置功能使用起来不太实用,但是以我的经验,还是值得的。

我们或多或少地像这样使用它:

import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._
import com.microsoft.azure.sqldb.spark.query._

val options = Map(
  "url"          -> "***",
  "databaseName" -> "***",
  "user"         -> "***",
  "password"     -> "***",
  "driver"       -> "com.microsoft.sqlserver.jdbc.SQLServerDriver"
)

// first make sure the table exists, with the correct column types
// and is properly cleaned up if necessary
val query = dropAndCreateQuery(df, "myTable")
val createConfig = Config(options ++ Map("QueryCustom" -> query))
spark.sqlContext.sqlDBQuery(createConfig)

val bulkConfig = Config(options ++ Map(
  "dbTable"           -> "myTable",
  "bulkCopyBatchSize" -> "20000",
  "bulkCopyTableLock" -> "true",
  "bulkCopyTimeout"   -> "600"
))

df.bulkCopyToSqlDB(bulkConfig)

如您所见,我们CREATE TABLE自己生成查询。您 可以
让该库创建表,但是这样做dataFrame.limit(0).write.sqlDB(config)仍然会非常低效,可能需要您缓存您的DataFrame,并且不允许您选择SaveMode

也可能很有趣:ExclusionRule在将此库添加到sbt构建中时,我们必须使用an ,否则assembly任务将失败。

libraryDependencies += "com.microsoft.azure" % "azure-sqldb-spark" % "1.0.2" excludeAll(
  ExclusionRule(organization = "org.apache.spark")
)


 类似资料:
  • 我试图将写入并关注了其他几个博客,其中一个就是这个,但它不起作用。 但是,我可以成功地从读取数据为。此外,一些帖子使用了格式,其他帖子使用了格式。我不确定该用哪一个。;;和从这里开始。 守则如下: 这里有个例外: 在org.apache.hadoop.hbase.security.userprovider.instantiate(userprovider.java:122)在org.apache.

  • 在output.csv的第2行,转义字符和引号(“”)一起丢失了。我的要求是在output.csv中也保留转义字符。任何形式的帮助都将非常感谢。 提前谢了。

  • 我正在尝试将 json 文件读入 Spark 数据帧,但我将整个文件作为一行和一列获取,我试图将其拆分为多列: 这是我运行这行代码得到的输出: 这是已创建的数据帧的架构: 我的目标是将图像中的每个对象放在一行和分隔的列中,这意味着例如“Z4ah9SemQjX2cKN187pX”,其值为:艺术家,created_at…在第一行,“Z552dVXF5vp80bAajYrn”在第二行等。 我很陌生,可以

  • 我正在使用从JSON事件流转换而来的Dataframes处理事件,这些数据流最终被写为Parquet格式。 但是,有些JSON事件在键中包含空格,我希望在将数据帧转换为Parquet之前记录并过滤/删除这些事件,因为被认为是Parquet模式(CatalystSchemaConverter)中的特殊字符,如下面[1]所列,因此列名中不应允许。 我如何在Dataframe中对列名进行这样的验证,并在

  • 我有一个Spark作业,它使用以下分组查询。我知道group by是邪恶的,但在我的用例中我无法避免它。我尝试使用数据帧和hiveContext来使用它。sql()但这两种方法都会洗牌大量数据,而且速度非常慢:一个查询大约需要5分钟。我曾看到一组按阶段执行20 GB的随机读取和10 GB的随机写入。我有大约8个字段按字段分组传入 或 我已经尝试了几乎所有的调谐参数,如钨丝、lz4等。洗牌memor