当前位置: 首页 > 知识库问答 >
问题:

spark从mysql并行读取数据

房冥夜
2023-03-14

我试图从mysql读取数据,并将其写回s3中的parquet文件,具体分区如下:

df=sqlContext.read.format('jdbc')\
   .options(driver='com.mysql.jdbc.Driver',url="""jdbc:mysql://<host>:3306/<>db?user=<usr>&password=<pass>""",
         dbtable='tbl',
         numPartitions=4 )\
   .load()


df2=df.withColumn('updated_date',to_date(df.updated_at))
df2.write.parquet(path='s3n://parquet_location',mode='append',partitionBy=['updated_date'])

我的问题是,它只打开一个到mysql的连接(而不是4个),并且在从mysql获取所有数据之前,它不会写入parquert,因为mysql中的表很大(100M行),进程在OutOfMemory上失败。

有没有办法将Spark配置为打开多个到mysql的连接并将部分数据写入镶木地板?

共有2个答案

籍利
2023-03-14

为了火花

...
private val dbUrl =
s"""jdbc:mysql://${host}:${port}/${db_name}
    |?zeroDateTimeBehavior=convertToNull
    |&read_buffer_size=100M""".stripMargin.replace("\n", "")

def run(sqlQuery: String): DataFrame = {
println(sqlQuery)
Datapipeline.spark.read
  .format("jdbc")
  .option("driver", "com.mysql.jdbc.Driver")
  .option("url", dbUrl)
  .option("user", user)
  .option("password", pass)
  .option("dbtable", s"($sqlQuery) as tmp")
  .load()
}
...
def getBounds(table: String, whereClause: String, partitionColumn: String): Array[Int] = {
val sql = s"select min($partitionColumn) as min, max($partitionColumn) as max from $table${
  if (whereClause.length > 0) s" where $whereClause"
}"
val df = run(sql).collect()(0)

Array(df.get(0).asInstanceOf[Int], df.get(1).asInstanceOf[Int])
}

def getTableFields(table: String): String = {
val sql =
  s"""
     |SELECT *
     |FROM information_schema.COLUMNS
     |WHERE table_name LIKE '$table'
     |  AND TABLE_SCHEMA LIKE '${db_name}'
     |ORDER BY ORDINAL_POSITION
   """.stripMargin
run(sql).collect().map(r => r.getAs[String]("COLUMN_NAME")).mkString(", ")
}

/**
* Returns DataFrame partitioned by <partritionColumn> to number of partitions provided in
* <numPartitions> for a <table> with WHERE clause
* @param table - a table name
* @param whereClause - WHERE clause without "WHERE" key word
* @param partitionColumn - column name used for partitioning, should be numeric
* @param numPartitions - number of partitions
* @return - a DataFrame
*/
def run(table: String, whereClause: String, partitionColumn: String, numPartitions: Int): DataFrame = {
val bounds = getBounds(table, whereClause, partitionColumn)

val fields = getTableFields(table)
val dfs: Array[DataFrame] = new Array[DataFrame](numPartitions)

val lowerBound = bounds(0)
val partitionRange: Int = ((bounds(1) - bounds(0)) / numPartitions)

for (i <- 0 to numPartitions - 2) {
  dfs(i) = run(
    s"""select $fields from $table
        | where $partitionColumn >= ${lowerBound + (partitionRange * i)} and $partitionColumn < ${lowerBound + (partitionRange * (i + 1))}${
      if (whereClause.length > 0)
        s" and $whereClause"
    }
     """.stripMargin.replace("\n", ""))
}

dfs(numPartitions - 1) = run(s"select $fields from $table where $partitionColumn >= ${lowerBound + (partitionRange * (numPartitions - 1))}${
  if (whereClause.length > 0)
    s" and $whereClause"
}".replace("\n", ""))

dfs.reduceLeft((res, df) => res.union(df))

}

上次运行方法将创建许多必要的分区。调用操作方法时,Spark 将创建与为 run 方法返回的数据帧定义的分区数一样多的并行任务。

享受吧。

齐鹏程
2023-03-14

您应该设置这些属性:

partitionColumn, 
lowerBound, 
upperBound, 
numPartitions

如本文所述:http://spark.apache.org/docs/latest/sql-programming-guide.html#jdbc-到其他数据库

 类似资料:
  • 问题内容: 我对此进行了一些讨论,但还不太了解正确的解决方案:我想将S3中的数百个文件加载到RDD中。这是我现在的做法: 在不使用实际的阅读客户端: 我从在Scala中针对相同问题看到的答案中“翻译”了一下。我认为也可以将整个路径列表传递给,但是我不确定哪种是最佳做法。 问题答案: 根本的问题是,在s3中列出对象的速度确实很慢,并且每当执行树遍历时,看起来像目录树的方式都会降低性能(就像路径的通配

  • 问题内容: 我在通过火花流从天蓝色斑点读取数据时遇到问题 上面的代码适用于HDFS,但无法从Azure blob读取文件 上面是在azure UI中显示的路径,但是这行不通,我是否丢失了某些内容,以及如何访问它。 我知道Eventhub是流数据的理想选择,但是我目前的情况要求使用存储而不是队列 问题答案: 为了从Blob存储中读取数据,需要完成两件事。首先,您需要告诉Spark在基础Hadoop配

  • 问题内容: 是否可以将数据从Microsoft Sql Server(以及oracle,mysql等)读取到Spark应用程序中的rdd中?还是我们需要创建一个内存中的集合并将其并行化为RDD? 问题答案: 从邮件列表中找到了解决方案。可以使用JdbcRDD完成此操作。我需要获取MS Sql Server JDBC驱动程序jar并将其添加到项目的lib中。我想使用集成安全性,因此需要将sqljdb

  • 我遇到了一个挑战,我必须读取CSV文件并将其读取,直到定义的可变大小限制(BATCH_SIZE)。读取 CSV 中的行数后,将其发送到不同的 AWS API。由于我的CSV文件大小可以是1Gb到2Gb的任何地方,因此我避免使用JSR223 CSV文件读取。我想知道如何使用JMeter和CSV数据集配置来实现它。

  • 在scala火花数据帧中是否有的替代方案。我想从火花数据帧的列中选择特定的行。例如,在R等效代码中的第100行