我试图从mysql读取数据,并将其写回s3中的parquet文件,具体分区如下:
df=sqlContext.read.format('jdbc')\
.options(driver='com.mysql.jdbc.Driver',url="""jdbc:mysql://<host>:3306/<>db?user=<usr>&password=<pass>""",
dbtable='tbl',
numPartitions=4 )\
.load()
df2=df.withColumn('updated_date',to_date(df.updated_at))
df2.write.parquet(path='s3n://parquet_location',mode='append',partitionBy=['updated_date'])
我的问题是,它只打开一个到mysql的连接(而不是4个),并且在从mysql获取所有数据之前,它不会写入parquert,因为mysql中的表很大(100M行),进程在OutOfMemory上失败。
有没有办法将Spark配置为打开多个到mysql的连接并将部分数据写入镶木地板?
为了火花
...
private val dbUrl =
s"""jdbc:mysql://${host}:${port}/${db_name}
|?zeroDateTimeBehavior=convertToNull
|&read_buffer_size=100M""".stripMargin.replace("\n", "")
def run(sqlQuery: String): DataFrame = {
println(sqlQuery)
Datapipeline.spark.read
.format("jdbc")
.option("driver", "com.mysql.jdbc.Driver")
.option("url", dbUrl)
.option("user", user)
.option("password", pass)
.option("dbtable", s"($sqlQuery) as tmp")
.load()
}
...
def getBounds(table: String, whereClause: String, partitionColumn: String): Array[Int] = {
val sql = s"select min($partitionColumn) as min, max($partitionColumn) as max from $table${
if (whereClause.length > 0) s" where $whereClause"
}"
val df = run(sql).collect()(0)
Array(df.get(0).asInstanceOf[Int], df.get(1).asInstanceOf[Int])
}
def getTableFields(table: String): String = {
val sql =
s"""
|SELECT *
|FROM information_schema.COLUMNS
|WHERE table_name LIKE '$table'
| AND TABLE_SCHEMA LIKE '${db_name}'
|ORDER BY ORDINAL_POSITION
""".stripMargin
run(sql).collect().map(r => r.getAs[String]("COLUMN_NAME")).mkString(", ")
}
/**
* Returns DataFrame partitioned by <partritionColumn> to number of partitions provided in
* <numPartitions> for a <table> with WHERE clause
* @param table - a table name
* @param whereClause - WHERE clause without "WHERE" key word
* @param partitionColumn - column name used for partitioning, should be numeric
* @param numPartitions - number of partitions
* @return - a DataFrame
*/
def run(table: String, whereClause: String, partitionColumn: String, numPartitions: Int): DataFrame = {
val bounds = getBounds(table, whereClause, partitionColumn)
val fields = getTableFields(table)
val dfs: Array[DataFrame] = new Array[DataFrame](numPartitions)
val lowerBound = bounds(0)
val partitionRange: Int = ((bounds(1) - bounds(0)) / numPartitions)
for (i <- 0 to numPartitions - 2) {
dfs(i) = run(
s"""select $fields from $table
| where $partitionColumn >= ${lowerBound + (partitionRange * i)} and $partitionColumn < ${lowerBound + (partitionRange * (i + 1))}${
if (whereClause.length > 0)
s" and $whereClause"
}
""".stripMargin.replace("\n", ""))
}
dfs(numPartitions - 1) = run(s"select $fields from $table where $partitionColumn >= ${lowerBound + (partitionRange * (numPartitions - 1))}${
if (whereClause.length > 0)
s" and $whereClause"
}".replace("\n", ""))
dfs.reduceLeft((res, df) => res.union(df))
}
上次运行
方法将创建许多必要的分区。调用操作方法时,Spark 将创建与为 run
方法返回的数据帧定义的分区数一样多的并行任务。
享受吧。
您应该设置这些属性:
partitionColumn,
lowerBound,
upperBound,
numPartitions
如本文所述:http://spark.apache.org/docs/latest/sql-programming-guide.html#jdbc-到其他数据库
问题内容: 我对此进行了一些讨论,但还不太了解正确的解决方案:我想将S3中的数百个文件加载到RDD中。这是我现在的做法: 在不使用实际的阅读客户端: 我从在Scala中针对相同问题看到的答案中“翻译”了一下。我认为也可以将整个路径列表传递给,但是我不确定哪种是最佳做法。 问题答案: 根本的问题是,在s3中列出对象的速度确实很慢,并且每当执行树遍历时,看起来像目录树的方式都会降低性能(就像路径的通配
任何帮助都将非常感谢。
问题内容: 我在通过火花流从天蓝色斑点读取数据时遇到问题 上面的代码适用于HDFS,但无法从Azure blob读取文件 上面是在azure UI中显示的路径,但是这行不通,我是否丢失了某些内容,以及如何访问它。 我知道Eventhub是流数据的理想选择,但是我目前的情况要求使用存储而不是队列 问题答案: 为了从Blob存储中读取数据,需要完成两件事。首先,您需要告诉Spark在基础Hadoop配
问题内容: 是否可以将数据从Microsoft Sql Server(以及oracle,mysql等)读取到Spark应用程序中的rdd中?还是我们需要创建一个内存中的集合并将其并行化为RDD? 问题答案: 从邮件列表中找到了解决方案。可以使用JdbcRDD完成此操作。我需要获取MS Sql Server JDBC驱动程序jar并将其添加到项目的lib中。我想使用集成安全性,因此需要将sqljdb
我遇到了一个挑战,我必须读取CSV文件并将其读取,直到定义的可变大小限制(BATCH_SIZE)。读取 CSV 中的行数后,将其发送到不同的 AWS API。由于我的CSV文件大小可以是1Gb到2Gb的任何地方,因此我避免使用JSR223 CSV文件读取。我想知道如何使用JMeter和CSV数据集配置来实现它。
在scala火花数据帧中是否有的替代方案。我想从火花数据帧的列中选择特定的行。例如,在R等效代码中的第100行