当前位置: 首页 > 知识库问答 >
问题:

检测到PySpark中文字列上的内部连接的笛卡尔积

凌和颂
2023-03-14
first_df = spark.createDataFrame([{"first_id": "1"}, {"first_id": "1"}, {"first_id": "1"}, ])
second_df = spark.createDataFrame([{"some_value": "????"}, ])

second_df = second_df.withColumn("second_id", F.lit("1"))

# If the next line is uncommented, then the JOIN is working fine.
# second_df.persist()

result_df = first_df.join(second_df,
                          first_df.first_id == second_df.second_id,
                          'inner')
data = result_df.collect()

result_df.explain()
Filter (first_id#0 = 1)
+- LogicalRDD [first_id#0], false
and
Project [some_value#2, 1 AS second_id#4]
+- LogicalRDD [some_value#2], false
Join condition is missing or trivial.
Use the CROSS JOIN syntax to allow cartesian products between these relations.;

由于某种原因,当RuleExecutor应用名为CheckCartesianProducts的优化规则集时,这些逻辑计划的连接条件中似乎没有列(参见https://github.com/apache/spark/blob/v2.3.0/sql/catalys/src/main/scala/org/apache/spark/sql/catalys/optimizer/optimizer.scala#L1114)。

但是,如果我在JOIN之前使用“persist”方法,它可以工作,物理计划是:

*(3) SortMergeJoin [first_id#0], [second_id#4], Inner
:- *(1) Sort [first_id#0 ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(first_id#0, 10)
:     +- Scan ExistingRDD[first_id#0]
+- *(2) Sort [second_id#4 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(second_id#4, 10)
      +- InMemoryTableScan [some_value#2, second_id#4]
            +- InMemoryRelation [some_value#2, second_id#4], true, 10000, StorageLevel(disk, memory, 1 replicas)
                  +- *(1) Project [some_value#2, 1 AS second_id#4]
                     +- Scan ExistingRDD[some_value#2]

所以,也许有人可以解释导致这种结果的内部原因,因为持久化数据帧看起来并不是一个解决方案。

共有1个答案

华谭三
2023-03-14

问题是,一旦持久化数据,second_id就会合并到缓存的表中,不再被视为常量。因此,规划者不能再推断查询应该表示为笛卡尔积,而是在散列分区的sortmergejoin上使用标准的second_id

使用UDF在没有持久性的情况下实现相同的结果是微不足道的

from pyspark.sql.functions import lit, pandas_udf, PandasUDFType

@pandas_udf('integer', PandasUDFType.SCALAR)                   
def identity(x):                                                   
    return x    

second_df = second_df.withColumn('second_id', identity(lit(1)))

result_df = first_df.join(second_df,
                         first_df.first_id == second_df.second_id,
                         'inner')

result_df.explain()
== Physical Plan ==
*(6) SortMergeJoin [cast(first_id#4 as int)], [second_id#129], Inner
:- *(2) Sort [cast(first_id#4 as int) ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(cast(first_id#4 as int), 200)
:     +- *(1) Filter isnotnull(first_id#4)
:        +- Scan ExistingRDD[first_id#4]
+- *(5) Sort [second_id#129 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(second_id#129, 200)
      +- *(4) Project [some_value#6, pythonUDF0#154 AS second_id#129]
         +- ArrowEvalPython [identity(1)], [some_value#6, pythonUDF0#154]
            +- *(3) Project [some_value#6]
               +- *(3) Filter isnotnull(pythonUDF0#153)
                  +- ArrowEvalPython [identity(1)], [some_value#6, pythonUDF0#153]
                     +- Scan ExistingRDD[some_value#6]

但是,sortmergejoin不是您应该在这里尝试实现的。如果使用常量键,它将导致极端的数据偏斜,并且很可能失败,而不是玩具数据。

 类似资料:
  • 我正在尝试加入两个数据集 DS2: 当我执行下面的操作时,输出 但当我做下面的操作时,我会出错

  • 主要内容:Oracle CROSS JOIN子句简介,Oracle Cross Join示例在本教程中,您将学习如何使用Oracle 创建连接表的笛卡尔积。 Oracle CROSS JOIN子句简介 在数学中,给定两个集合和,的笛卡尔乘积是所有有序对(,)的集合,属于,属于。 要在Oracle中创建表的笛卡尔乘积,可以使用子句。 以下说明了子句的语法: 与其他连接(如或)不同,没有连接谓词的子句。 当执行两个没有关系的表的交叉连接时,将得到两个表的行和列的笛卡尔乘积。 当您想要生成大量

  • 问题内容: 我正在尝试编写一些代码来测试一堆输入参数的笛卡尔积。 我看过了,但是它的功能并不是我想要的。有没有一种简单明了的方法来获取一个字典,每个字典中包含任意数量的键 和 每个值中任意数量的元素,然后生成具有下一个排列的字典? 输入: 输出示例: 问题答案: 好的,感谢@dfan告诉我我在错误的位置查看。我现在知道了: 编辑 :经过多年的Python经验,我认为一个更好的解决方案是接受输入,而

  • 我有一个pyspark数据帧(df1 ),它由10K行组成,数据帧看起来像- 另一个pyspark数据帧(df2)由100k记录组成,看起来像- 我想使用pyspark内连接,最终的数据帧看起来像- df2中mobile_no的长度是12,但df1中是10。我可以加入它,但这是昂贵的操作。使用pyspark有帮助吗?

  • 问题内容: 在Tensorflow中有什么简单的方法可以像itertools.product一样做笛卡尔积吗?我想获得两个张量(和)的元素组合,在Python中可以通过itertools作为。我正在Tensorflow中寻找替代方案。 问题答案: 我将在此假定和均为一维张量。 为了得到两者的笛卡尔积,我会用的组合和: 您使用LEN(一) LEN(B) 2张量,其中的元件的每个组合结束并且在最后一维