在测试生产用例时,我创建并保存了(使用Hive Metastore)这样的表:
table1:
fields: key1, key2, value1
sortedBy key1,key2
bucketBy: key1, 100 buckets
table2:
fields: key1, key2, value2
sortedBy: key1,key2
bucketBy: key1, 100 buckets
我正在运行这样一个查询(伪代码)
table1.join(table2, [“key1”, “key2”])
.groupBy(“value2”)
.countUnique(“key1”)
常识认为,这种连接应该简单地用没有交换的排序合并连接来完成;不管spark如何进行交换,然后加入。
即使对于这个特定的用例,我也可以同时使用两个键,因为其他一些用例我需要按键1进行bucket。当我使用单个键进行(更简单)连接时,如下所示:
table1.join(table2, [“key1”])
它按预期工作(即排序合并连接,没有交换)。
现在我在这些表上有了一个优化连接,如果我想过滤,如下所示:
table1.join(table2, [“key1”])
.filter(table1.col(“key2”) == table2.col(“key2”))
它恢复到交换,然后加入。
当连接键是 bucketBy 键的超集时,如何说服 Spark 不进行交换?
注意:
我知道的一个技巧是,如果我重写不等式检查,火花不会洗牌,而不是等式检查。
(x==y)也可以表示为(x
. filter(table1.col("key 2")
. filter(table1.col("key 2")
它将继续使用没有交换的排序-合并连接,但是这不是一个解决方案,这是一个黑客。
org.apache.spark.sql.catalyst.optimizer.PushPredicateThroughJoin
是通过 Join 推送谓词的优化器规则。~~
我们可以从优化器规则中排除此规则。这样我们就不必对用户代码进行任何更改。
要排除,我们可以执行以下操作之一
1.--conf spark.sql.optimizer.excludedRules=org.apache.spark.sql.catalyst.optimizer.PushPredicateThroughJoin.
2. 在 spark-defaults .conf. 中添加属性。
3. 将 set spark.sql.optimizer.excludedRules=org.apache.spark.sql.catalyst.optimizer.PushPredicateThroughJoin
添加到用户代码中。
同样,这又是一个黑客。.
理想情况下,筛选器应通过联接向下推送,从而减少要联接的行数
更新:。
1.我错了下推。由于谓词有两个表中的列,因此不会有过滤器下推
2.当where子句有“非相等”谓词时,为什么SortMergeJoin(SMJ)不添加额外的交换
这是因为SMJ只能将基于等式的谓词作为连接条件org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys#unapply的一部分
并且确保负责添加交换的Requirements看到SMJ没有新的加入条件,并且输出分布已经被满足。< br > code:org . Apache . spark . SQL . execution . exchange . ensure requirements # ensureDistributionAndOrdering。< br> 3。添加执行equals的UDF或者将谓词表示为大于号和小于号的组合,哪个更有效?。< br >为了评估这一点,我使用,
val df = spark.sql(<joinquery>)
df.queryExecution.debug.codegen
a.UDF等于-涉及虚函数调用的额外开销。< br> b .小于和大于的组合-没有虚函数调用。一旦我们找到了一个连接的行(使用key1),代码就会逐个检查其他谓词。
从上述 3 中的观察结果来看,使用基于非相等的谓词似乎更有效。
我正面临着同样的问题。似乎有一个公关完成,解决这个问题
(PR)https://github.com/apache/spark/pull/19054
https://issues.apache.org/jira/browse/SPARK-18067(吉拉票)
但我本来希望它已经被包含(我正在使用 Spark 3.0.0,问题仍然存在,而票证已于 2019 年 5 月 21 日解决,比 Spark3 发布早一年多)。
感谢使用不等式运算符的“黑客”,虽然感觉不太好,但这是一个简单的解决方法。我也会尝试用PR中的解决方案修补我的火花版,但如果我想分享我的代码,这就不太可持续/可复制了。
根据一些研究和探索,这似乎是最不黑客的解决方案:
基于此示例:
table1.join(table2, [“key1”])
.filter(table1.col(“key2”) == table2.col(“key2”))
不使用Spark中的< code>equalTo (==),实现一个自定义的< code>MyEqualTo(通过委托给spark EqualTo
实现就可以)似乎可以解决这个问题。这样spark就不会优化(!)连接,它会将过滤器拉至SortMergeJoin。
类似地,连接条件也可以这样形成:
(table1.col(“key1”) == table2.col(“key1”)) AND
table1.col(“key2”).myEqualTo(table2.col(“key2”))
**dataframe2:从另一个来源获得的键的Dataframe(这些键是上表中ID列的分区键)-此表中不同键的数量约为0.15万** 现在,此代码总是导致“com.datastax.oss.driver.api.core.servererrors.ReadFailureException:在一致性LOCAL_ONE读取查询期间Cassandra失败(需要1个响应,但只有0个副本响应,1个失败)
使用Spark Dataset/DataFrame联接时,我面临长时间运行且OOM作业失败的问题。 以下是输入: ~10个不同大小的数据集,大部分是巨大的( 经过一番分析,我发现作业失败和缓慢的原因是歪斜键:当左侧有数百万条记录时,用连接键。 我用了一些蛮力的方法来解决这个问题,这里我想和大家分享一下。 如果您有更好的或任何内置的解决方案(针对常规Apache Spark),请与他人分享。
问题内容: 区别在于,我的值是列表,而不仅仅是单个值。 因此,我想转: 变成: 假设我很高兴自己的价值观是独一无二的。 问题答案: 您可以使用字典理解和方法。
因此,如果键被旋转,我不能“更新”DocumentClient,因为它是一个单例。显然,重新启动应用程序是可行的,但我想避免这样做的要求。 那么,对如何实现两者有什么想法吗?我是不是漏掉了一些明显的东西?
Cassandra 2.1,Spark 1.1,Spark-Cassandra-Connector 1.1
假设我有以下基类: 如果我想编写一个类,它将覆盖< code>f()并且不允许覆盖它的派生类,可以使用以下方法编写它: 方法一: 方法2: < code >方法1非常详细,而< code >方法2更加紧凑,并且< code>final仍然表示< code>f()实际上是虚拟的,并且覆盖了基类方法。 哪一种方法更合适?