我使用Spark SQL v2。4.7关于EMR(含纱线)。我编写Spark Sql查询来执行转换。
估计复杂查询的最佳随机分区数:
我正在尝试估计需要设置的最佳随机分区数,以便为具有多个连接的复杂查询获得最佳性能。在Internet上我发现分区的最佳大小应该在10 MB
-100 MB
的范围内。现在,由于我知道这个值,我的下一步是计算查询的数据随机体积(以MB为单位),然后将其除以100
以获得随机分区数。但是,对于涉及多个与大表的连接的复杂查询,估计洗牌量变得非常困难。
那么如何估计洗牌量,从而估计大查询所需的最佳洗牌分区数?目前(经过大量搜索)我正在执行以下步骤-
scala> spark.sql("""
| create temp view initial_tbl
| as
| with first_qry as
| (
| select a.id,
| a.fund_id,
| b.fname,
| c.state_nm,
| row_number() over (partition by a.id order by c.eff_dt desc) as rownum
| from tblA a
| left join tblB b
| on a.id = b.id
| left join tblC c
| on b.st_id = c.st_id
| )
| select * from first_qry
| where rownum = 1
| """)
scala> spark.sql("""
| create temp view final_tbl as
| select a.id, a.fname, a.state_nm, b.salary, c.age
| from initial_tbl a
| left join fin_dtls b
| on a.id = b.id
| and a.fund_id = b.fund_id
| left join age_dtls c
| on b.age_id = c.age_id
| union all
| select id, fname, 'NA' as state_nm, salary, age
| from another_pre_created_tbl
| """)
scala> spark.sql("""
| select * from final_tbl
| limit 50
| """)
注意:这只是实际查询的简化版本。
好的,现在,我正在尝试估计此查询的数据大小,然后我可以将其除以100 MB
以获得查询的最佳随机分区数。
scala> val df = spark.read.table("final_tbl")
scala> println(df.queryExecution.analyzed.stats)
Statistics(sizeInBytes=34.5 GB, hints=none)
所以上面查询的大小是34.5 GB
,除以100 MB
给出~350
随机分区。现在设置configSETspark.sql.shuffle.partitions=350
后,我仍然看到查询很慢。所以我的问题是-
SKEW:
对于上面提到的查询,我看到12
作业是在SparkUI中触发的。在UI中,最后一个作业显示出高倾斜,即一个任务是一个长条形,其他同时进行的任务由几个非常小的条形表示(我希望我能提供UI截图)-所以,我的问题(基于上述)是-
写入输出时发生倾斜:
最后,我将上述查询的输出写入SQL API(%SQL%
)中的S3-
create table final_out
using parquet
options (
path 's3:/my/test/location/',
mode: 'overwrite'
)
as
select * from final_tbl
distribute by id;
即使是这样,当我检查UI时,我也会发现像上面一样的巨大倾斜,其中一个任务是非常长的条,而其他同时进行的任务是非常小的条。如果您仔细注意,您会发现上面显示的最终查询将所有与另一个具有硬编码值(即'NA'作为state_nm
)的查询联合起来。现在,由于1亿
记录在联合
-ed表中,值'NA'
成为输出中state_nm
列的主导值,从而产生倾斜,从而使写入非常慢。
所以我的最后一个问题是-
id
)上重新划分最终输出数据集,以强制在写入输出之前均匀分布记录,但没有成功-请注意上面给出的创建表语句末尾的按id分发部分我的群集配置如下:
Nodes: 20
Cores: 8
Memory: 64 GB
我非常抱歉发了这么长的帖子,但这些问题困扰了我很长时间。我在网上搜索了很多,但没有找到任何具体的答案。谁能帮我解决这些问题。非常感谢您的帮助。
谢谢
我不能回答你所有的问题,但我可以分享一些想法,因为我有一些问题:
如何确定上述查询的哪一部分,或者具体地说,这个大型复杂查询中的哪个表/列是导致倾斜的主要原因?
您可以列出查询中的所有表,对用于连接它们的列进行计数,并查看哪些值表示行的超大部分。如果您想自动化,请使用pandas profiler或great Expection库为每个表自动生成列的摘要。
我这样做对吗?否则,请让我知道如何计算复杂查询(涉及多个连接)的洗牌量,并最终能够计算任何给定复杂查询的最佳洗牌分区数。
我不确定在洗牌分区设置方面还有更多的工作要做,我想到的唯一一件事是在执行查询之前计算最大表的大小,并使用spark动态计算/估计洗牌分区数。conf.set(“spark.sql.shuffle.partitions”,calculatedNumber),但我不相信这会有帮助。
根据我的经验,更大的好处应该来自缓存多次使用的表,在运行查询之前广播较小的表并在连接列上对较大的数据框进行分区。
至于编写,我怀疑不是编写itsef会使过程变慢,而是在编写之前执行最终查询的整个计算(延迟执行),这需要大部分时间。
我实际上使用的是Spark SQL使用group by查询,我遇到了OOM问题。所以考虑增加火花的价值。sql。洗牌分区从默认的200到1000,但这没有帮助。 我相信这个分区会共享数据洗牌负载,所以分区越多,需要保存的数据就越少。我是新手。我使用的是Spark 1.4.0,我有大约1TB的未压缩数据要使用按查询分组。
我的理解是,在mapreduce编程模型中,我们有映射和减少是两个阶段。完成映射阶段后,将生成中间值(键、值),并将其传递给化简器。 我的怀疑是,在map()阶段之后,洗牌和排序将到来。所以,我觉得洗牌和排序是还原阶段的一部分,是真的吗? 如果是这样的话,组合器()是如何工作的?
问题内容: 我有以下表格,分别是BankDetails和Transactiondetails。使用这两个表,我想获得帐户名称的当前余额。 表格: 插入两个表的脚本: 输出将是这样的: 我需要使用以上两个表格输入帐户持有人姓名,帐户编号和当前余额。 下面是我的查询,我想获得优化的查询,即如果可能的话不使用子查询。 注意: 在我的情况下,贷方=添加到帐户中的金额,借方=从帐户中扣除的金额。 对于未遵循
问题内容: 看来在SQL中。我需要能够在严格的等效字符基础上进行区分。 IE 产量 等等.. 我查看了不同的排序规则,以查看是否有一个忽略字符扩展的单词,但到目前为止没有运气。 数据库具有默认设置- 我也尝试过和,但这两个都不起作用。 任何帮助,将不胜感激。 问题答案: 您需要使用二进制排序规则,以便区分大小写,区分重音等。 排序规则:
如果有人能用简单的术语回答这些与火花洗牌相关的问题,我将不胜感激。 在spark中,当加载一个数据集时,我们指定分区的数量,这表示输入数据(RDD)应该被划分为多少个块,并且根据分区的数量启动相等数量的任务(如果假设错误,请纠正我)。对于工作节点中的X个核心数。一次运行相应的X个任务。 沿着类似的思路,这里有几个问题。 因为,所有byKey操作以及联合、重新分区、连接和共组都会导致数据混乱。 >
问题内容: 我想知道,是否有某种方法可以在mysql / sql中改组字符串的字母,即类似 伪代码的内容 :? 从http://dev.mysql.com/doc/refman/5.0/en/string- functions.html 找不到任何内容,对其进行搜索似乎只是找到改组结果的解决方案,而不是字符串。 问题答案: 干得好: 有关输出,请参见sqlfiddle.com。 已使用mariad