为了在spark中加载和划分传入的数据,我使用了以下语法。
val dataframe = spark.read.format("jdbc")
.option("url", url)
.option("driver", driver)
.option("user", user)
.option("password", password)
.option("dbtable", query)
.option("partitionColumn", partitionColumn)
.option("lowerBound", lowerBound_value)
.option("upperBound", upperBound_value)
.option("numPartitions", numPartitions)
.option("fetchsize", 15000)
.load()
参数< code>partitionColumn 、< code>lowerBound 、< code>upperBound 、< code>numPartitions用于优化作业的性能。
我有一个包含 1000 条记录的表
当数据较少时,上述设计效果很好。但我有一个场景如下。
我有一个包含 2030 亿条记录的表,其中没有包含唯一/串行整数的整数列。然后有一个日期列,其数据分布在 5 年,即 2016-2021 年。为了更快地移动数据,我每次都会移动每年一个月的数据。这是我正在使用的查询:
val query = s"(select * from table where date_column >= '${YearMonth.of(year.toInt, month).atDay(1).toString} and date_time <= '${YearMonth.of(year.toInt, month).atEndOfMonth().toString} 23:59:59.999') as datadf"
因此,上面的查询变成:<code>select*from table where date_column
这是对我的循环的粗略描述:
(2016 to 2021) { year =>
(1 to 12) { month =>
val query = s"(select * from table where date_column >= '${YearMonth.of(year.toInt, month).atDay(1).toString} and date_time <= '${YearMonth.of(year.toInt, month).atEndOfMonth().toString} 23:59:59.999') as datadf"
val dataframe = spark.read.format("jdbc")
.option("url", url)
.option("driver", driver)
.option("user", user)
.option("password", password)
.option("dbtable", query)
.option("partitionColumn", partitionColumn)
.option("lowerBound", lowerBound_value)
.option("upperBound", upperBound_value)
.option("numPartitions", numPartitions)
.option("fetchsize", 15000)
.load()
}
}
为了找出界限,我使用了以下月份和年份的相同过滤器:
val bounds = spark.read.format("jdbc")
.option("url", url)
.option("driver", driver)
.option("user", user)
.option("password", password)
.option("dbtable", "(select min(partitionColumn) as mn, max(partitionColum) as from tablename where date_column >= '${YearMonth.of(year.toInt, month).atDay(1).toString} and date_time <= '${YearMonth.of(year.toInt, month).atEndOfMonth().toString} 23:59:59.999') as boundsDF")
.load()
val lowerBound_value = bounds.select("mn").head.getInt(0)
val upperBound_value = bounds.select("mx").head.getInt(0)
问题在于查找过滤数据的下限和上限。由于数据量巨大,运行最少的查询
我尝试在那里给出随机值,但是在任务运行时观察到分区中的数据倾斜。
必须给出then partitionColumn的最小值和最大值作为更好的数据分布的上下限?如果没有,有没有办法指定下限和上限,而不是运行min
任何帮助都非常感谢。
对于2000亿行,我希望您的表在您访问数据的同一日期列上在您的数据库中进行分区。没有它,查询将毫无希望。
但是,您是否尝试过日期/时间戳值的整数等效值的下限和上限?查看Spark将整数值转换为时间戳的参考资料。
JDBC选项lowerBound和upperBound转换为TimestampType/DateType值的方式与将字符串转换为TimestampType/DeteType值的方法相同。转换基于Proleptic Gregorian日历和SQL配置spark.SQL.session.timeZone定义的时区。在Spark 2.4及以下版本中,转换基于混合日历(朱利安·格里高利)和默认系统时区。
正如你提到的,没有预先存在的整数列可以在这里使用。所以在你的循环中,上限和下限是静态的,因此可以转换为静态的上限和下限数值。根据Spark的内部结构,下限和上限值被划分为数字范围,并向数据库抛出多个查询,以每次查询获取单个分区的数据。这也意味着在相关列上进行表分区或在源数据库中具有适当的索引对性能非常重要。
您需要确保上限和下限的占位符适当地放置在您提供的查询中。作为提醒;实际数值可能会因使用的数据库系统而异。如果弹出该场景,即数据库系统到日期的整数转换不同,那么您需要提供数据库而不是Spark接受的值。来自相同的文档:
参数:connectionFactory-返回打开的Connection的工厂。RDD负责关闭连接。sql-查询的文本。查询必须包含两个?用于划分结果的参数的占位符。例如
select title, author from books where ? <= id and id <= ?
lowerBound - the minimum value of the first placeholder
upperBound - the maximum value of the second placeholder The lower and upper bounds are inclusive.
...
同样,也可以清楚地看到
问题内容: 我有一个Windows 7系统,上面安装了最新的Java编译器。我也有最新的Cygwin。我想使用Cygwin外壳程序中的Java编译器。我在Cygwin中编辑了PATH变量,如下所示: 我可以在上面的目录中看到二进制文件,但是当我尝试编译我的* .java文件时,我得到了: 在像这样设置PATH变量时我做错了吗?我还需要做其他事情吗?我是Java新手,对cygwin不太熟悉。 问题答
问题内容: 我有一个字符串“ ”。我想查找一个单词在字符串中出现多少次。示例hello发生2次。我尝试了只打印字符的方法- 我想学习如何找到字数统计。 问题答案: 如果要查找单个单词的计数,请使用: 使用和汇总所有单词:
想在Android中实现这个动画。感谢任何帮助。
主要内容:一、用一个创业公司的发展作为背景引入,二、多台服务器分库支撑高并发读写,三、大量分表来保证海量数据下的查询性能,四、读写分离来支撑按需扩容以及性能提升,五、高并发下的数据库架构设计总结这篇文章,我们来聊一下对于一个支撑日活百万用户的高并系统,他的数据库架构应该如何设计? 看到这个题目,很多人第一反应就是: 分库分表啊! 但是实际上,数据库层面的分库分表到底是用来干什么的,他的不同的作用如何应对不同的场景,我觉得很多同学可能都没搞清楚。 一、用一个创业公司的发展作为背景引入 假如我们现在
我是新来的颤振,有谁能帮助我知道如何实现屏幕设计,如使用颤振截图所示。
问题内容: 我目前正在从事一个项目,涉及通过SNMP从打印机获取信息。现在,我一直在测试/研究的打印机是Lexmark X950。 我一直在努力的一个问题是,我也希望该程序也适用于HP或Kyocera或Brother打印机,但是我使用的OID似乎只能在Lexmark上使用。 这是我使用的一些OID: 如您在这里看到的,我主要将1.3.6.1.4.1.641用作我的OID,但它们仅适用于Lexmar