当前位置: 首页 > 知识库问答 >
问题:

pyspark中的内部连接

钱钊
2023-03-14

我有一个pyspark数据帧(df1 ),它由10K行组成,数据帧看起来像-

id       mobile_no       value
1        1111111111        .43
2        2222222222        .54
3        3333333333        .03
4        4444444444        .22

另一个pyspark数据帧(df2)由100k记录组成,看起来像-

mobile_no            gender
912222222222           M
914444444444           M
919999999999           F
915555555555           M
918888888888           F

我想使用pyspark内连接,最终的数据帧看起来像-

mobile_no          value           gender
2222222222         .54               M
4444444444         .22               M

df2中mobile_no的长度是12,但df1中是10。我可以加入它,但这是昂贵的操作。使用pyspark有帮助吗?

common_cust = spark.sql("SELECT mobile_number, age \
                         FROM df1 \
                         WHERE mobile_number IN (SELECT DISTINCT mobile_number FROM df2)")

共有1个答案

农雅畅
2023-03-14

一种方法可以是在df2上使用子字符串函数,只保留最后10位数字,以获得与 中相同的长度:

import pyspark.sql.functions as F

ddf2.select(F.substring('mobile_no', 3, 10).alias('mobile_no'),'gender').show()
+----------+------+
| mobile_no|gender|
+----------+------+
|2222222222|     M|
|4444444444|     M|
|9999999999|     F|
|5555555555|     M|
|8888888888|     F|
+----------+------+

然后,您只需要执行一个内部< code>join来获得您期望的输出:

common_cust = df1.select('mobile_no', 'value')\
                 .join( df2.select(F.substring('mobile_no', 3, 10).alias('mobile_no'),'gender'), 
                        on=['mobile_no'], how='inner')
common_cust.show()
+----------+-----+------+
| mobile_no|value|gender|
+----------+-----+------+
|2222222222| 0.54|     M|
|4444444444| 0.22|     M|
+----------+-----+------+

如果你想使用spark.sql,我猜你可以这样做:

common_cust = spark.sql("""select df1.mobile_no, df1.value, df2.gender
                           from df1
                           inner join df2 
                           on df1.mobile_no = substring(df2.mobile_no, 3, 10)""")
 类似资料:
  • 我在pyspark有两个数据框。如下所示,df1保存来自传感器的整个long_lat。第二个数据帧df2是第一个数据帧的子集,其中lat-long值被向上舍入到2位小数,然后删除重复项以保留唯一的lat_long数据点。 DF1: df2: 因此,df2 的行数比第一个少得多。在 df2 中,我应用了一个 udf 来计算状态名称。 现在我想在 df1 中填充状态名称。由于 df2 的 lat_lo

  • 我有两个具有大量(几百万到几千万)行的数据帧。我想为他们牵线搭桥。 在我目前使用的BI系统中,您可以通过首先对特定键进行分区,然后在该键上进行连接来快速完成此操作。 这是我在Spark中需要遵循的模式吗,或者这并不重要?乍一看,在分区之间转移数据似乎浪费了很多时间,因为没有正确地进行预分区。 如果有必要,我该怎么做?

  • 由于某种原因,当RuleExecutor应用名为CheckCartesianProducts的优化规则集时,这些逻辑计划的连接条件中似乎没有列(参见https://github.com/apache/spark/blob/v2.3.0/sql/catalys/src/main/scala/org/apache/spark/sql/catalys/optimizer/optimizer.scala#

  • 我试图建立一个队列研究来跟踪应用程序内的用户行为,我想问你是否知道当我使用.join()时如何在pyspark中指定条件: 这两个rdds表示有关用户的信息,其ID为'6df99638e4584a618f92a9cfdf318cf8',并在2016-10-19和2016-10-20登录到服务1和服务2。我的objectif是连接我的两个RDD,每一个至少包含20,000行。所以它必须是一个内部联接

  • 我使用的是Struts-Spring-Hibernate,我很难让我的HQL正确执行。 我有四个对象,objectA、objectB、objectC和objectD。对象A与对象B之间有一对多关系,它们之间的关系就像ObjectB与FK ObjectA之间的关系一样。Id。该模式在对象中延续,即对象B与对象C等有一对多关系。 我现在一直在做的是调用“从对象XVO到对象XId=?”来获取对象列表。我

  • 问题内容: 我想通过内部联接从更多表中选择数据。 这些是我的桌子。 我想写一份声明,显示学生去过哪个考试,年级和日期。日期后排序。 这是我的声明。它可以运行,但是我想确保自己做的正确。 问题答案: 几乎正确..查看联接,您引用的字段错误