当前位置: 首页 > 知识库问答 >
问题:

当连接键是 bucketBy 键的超集时,如何说服 Spark 不进行交换?

郑星辰
2023-03-14

在测试生产用例时,我创建并保存了(使用Hive Metastore)这样的表:

table1:
fields: key1, key2, value1
sortedBy key1,key2
bucketBy: key1, 100 buckets

table2:
fields: key1, key2, value2
sortedBy: key1,key2
bucketBy: key1, 100 buckets

我正在运行这样一个查询(伪代码)

table1.join(table2, [“key1”, “key2”])
 .groupBy(“value2”)
 .countUnique(“key1”)

常识认为,这种连接应该简单地用没有交换的排序合并连接来完成;不管spark如何进行交换,然后加入。

即使对于这个特定的用例,我也可以同时使用两个键,因为其他一些用例我需要按键1进行bucket。当我使用单个键进行(更简单)连接时,如下所示:

table1.join(table2, [“key1”])

它按预期工作(即排序合并连接,没有交换)。

现在我在这些表上有了一个优化连接,如果我想过滤,如下所示:

table1.join(table2, [“key1”])
 .filter(table1.col(“key2”) == table2.col(“key2”))

它恢复到交换,然后加入。

当连接键是 bucketBy 键的超集时,如何说服 Spark 不进行交换?

注意:

我知道的一个技巧是,如果我重写不等式检查,火花不会洗牌,而不是等式检查。

(x==y)也可以表示为(x

. filter(table1.col("key 2")

. filter(table1.col("key 2")

它将继续使用没有交换的排序-合并连接,但是这不是一个解决方案,这是一个黑客。

共有3个答案

朱乐逸
2023-03-14

org.apache.spark.sql.catalyst.optimizer.PushPredicateThroughJoin 是通过 Join 推送谓词的优化器规则。~~
我们可以从优化器规则中排除此规则。这样我们就不必对用户代码进行任何更改。
要排除,我们可以执行以下操作之一
1.--conf spark.sql.optimizer.excludedRules=org.apache.spark.sql.catalyst.optimizer.PushPredicateThroughJoin.
2. 在 spark-defaults .conf. 中添加属性。
3. 将 set spark.sql.optimizer.excludedRules=org.apache.spark.sql.catalyst.optimizer.PushPredicateThroughJoin 添加到用户代码中。

同样,这又是一个黑客。.
理想情况下,筛选器应通过联接向下推送,从而减少要联接的行数

更新:。
1.我错了下推。由于谓词有两个表中的列,因此不会有过滤器下推
2.当where子句有“非相等”谓词时,为什么SortMergeJoin(SMJ)不添加额外的交换
这是因为SMJ只能将基于等式的谓词作为连接条件org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys#unapply的一部分

并且确保负责添加交换的Requirements看到SMJ没有新的加入条件,并且输出分布已经被满足。< br > code:org . Apache . spark . SQL . execution . exchange . ensure requirements # ensureDistributionAndOrdering。< br> 3。添加执行equals的UDF或者将谓词表示为大于号和小于号的组合,哪个更有效?。< br >为了评估这一点,我使用,

val df = spark.sql(<joinquery>)
df.queryExecution.debug.codegen

a.UDF等于-涉及虚函数调用的额外开销。< br> b .小于和大于的组合-没有虚函数调用。一旦我们找到了一个连接的行(使用key1),代码就会逐个检查其他谓词。

从上述 3 中的观察结果来看,使用基于非相等的谓词似乎更有效。

都阳辉
2023-03-14

我正面临着同样的问题。似乎有一个公关完成,解决这个问题

(PR)https://github.com/apache/spark/pull/19054

https://issues.apache.org/jira/browse/SPARK-18067(吉拉票)

但我本来希望它已经被包含(我正在使用 Spark 3.0.0,问题仍然存在,而票证已于 2019 年 5 月 21 日解决,比 Spark3 发布早一年多)。

感谢使用不等式运算符的“黑客”,虽然感觉不太好,但这是一个简单的解决方法。我也会尝试用PR中的解决方案修补我的火花版,但如果我想分享我的代码,这就不太可持续/可复制了。

马飞
2023-03-14

根据一些研究和探索,这似乎是最不黑客的解决方案:

基于此示例:

table1.join(table2, [“key1”])
      .filter(table1.col(“key2”) == table2.col(“key2”))

不使用Spark中的< code>equalTo (==),实现一个自定义的< code>MyEqualTo(通过委托给spark EqualTo实现就可以)似乎可以解决这个问题。这样spark就不会优化(!)连接,它会将过滤器拉至SortMergeJoin。

类似地,连接条件也可以这样形成:

(table1.col(“key1”) == table2.col(“key1”)) AND
table1.col(“key2”).myEqualTo(table2.col(“key2”))
 类似资料:
  • **dataframe2:从另一个来源获得的键的Dataframe(这些键是上表中ID列的分区键)-此表中不同键的数量约为0.15万** 现在,此代码总是导致“com.datastax.oss.driver.api.core.servererrors.ReadFailureException:在一致性LOCAL_ONE读取查询期间Cassandra失败(需要1个响应,但只有0个副本响应,1个失败)

  • 使用Spark Dataset/DataFrame联接时,我面临长时间运行且OOM作业失败的问题。 以下是输入: ~10个不同大小的数据集,大部分是巨大的( 经过一番分析,我发现作业失败和缓慢的原因是歪斜键:当左侧有数百万条记录时,用连接键。 我用了一些蛮力的方法来解决这个问题,这里我想和大家分享一下。 如果您有更好的或任何内置的解决方案(针对常规Apache Spark),请与他人分享。

  • 问题内容: 区别在于,我的值是列表,而不仅仅是单个值。 因此,我想转: 变成: 假设我很高兴自己的价值观是独一无二的。 问题答案: 您可以使用字典理解和方法。

  • 因此,如果键被旋转,我不能“更新”DocumentClient,因为它是一个单例。显然,重新启动应用程序是可行的,但我想避免这样做的要求。 那么,对如何实现两者有什么想法吗?我是不是漏掉了一些明显的东西?

  • Cassandra 2.1,Spark 1.1,Spark-Cassandra-Connector 1.1

  • 假设我有以下基类: 如果我想编写一个类,它将覆盖< code>f()并且不允许覆盖它的派生类,可以使用以下方法编写它: 方法一: 方法2: < code >方法1非常详细,而< code >方法2更加紧凑,并且< code>final仍然表示< code>f()实际上是虚拟的,并且覆盖了基类方法。 哪一种方法更合适?