当前位置: 首页 > 知识库问答 >
问题:

pyspark内部连接的替代方法,用于比较pyspark中的两个数据帧

毛德华
2023-03-14

我在pyspark有两个数据框。如下所示,df1保存来自传感器的整个long_lat。第二个数据帧df2是第一个数据帧的子集,其中lat-long值被向上舍入到2位小数,然后删除重复项以保留唯一的lat_long数据点。

DF1:

+-----------------+---------+-----+--------------------+----------+------------+
|              UID|    label|value|            datetime|  latitude|   longitude|
+-----------------+---------+-----+--------------------+----------+------------+
|1B0545GD6546Y|evnt     | 3644|2020-06-08T23:32:...|40.1172005|-105.0823546|
|1B0545GD6FG67|evnt     | 3644|2020-06-08T23:32:...|40.1172201|-105.0821007|
|15GD6546YFG67|evnt     | 3644|2020-06-08T23:32:...|40.1172396|-105.0818468|
|1BGD6546YFG67|evnt     | 3644|2020-06-08T23:32:...|40.1172613|-105.0815929|
|1BGD6546YFG67|evnt     | 3644|2020-06-08T23:32:...|40.1172808|-105.0813368|
|1B054546YFG67|evnt     | 3644|2020-06-08T23:32:...|40.1173003|-105.0810742|
|1B056546YFG67|evnt     | 3644|2020-06-08T23:32:...| 40.117322|-105.0808073|

df2:

+-------+--------+----------------+--------------+                              
|new_lat|new_long|        lat_long|    State_name|
+-------+--------+----------------+--------------+
|  40.13|  -105.1|[40.13, -105.1] |      Colorado|
|  40.15| -105.11|[40.15, -105.11]|      Colorado|
|  40.12| -105.07|[40.12, -105.07]|      Colorado|
|  40.13| -104.99|[40.13, -104.99]|      Colorado|
|  40.15| -105.09|[40.15, -105.09]|      Colorado|
|  40.15| -105.13|[40.15, -105.13]|      Colorado|
|  40.12| -104.94|[40.12, -104.94]|      Colorado|

因此,df2 的行数比第一个少得多。在 df2 中,我应用了一个 udf 来计算状态名称。

现在我想在 df1 中填充状态名称。由于 df2 的 lat_long 值向上舍入到 2 位十进制,为了匹配我使用如下所示的阈值,我在这里使用连接操作。

threshold = 0.01

df4 = df1.join(df2)\
        .filter(df2.new_lat - threshold < df1.latitude)\
        .filter(df1.latitude < df2.new_lat + threshold)

有没有其他有效的方法来实现同样的目标?因为连接操作是做笛卡尔积的,需要时间和大量的任务。

考虑一下,我的df1将有1万亿记录。

任何,帮助将不胜感激。

共有1个答案

翟嘉年
2023-03-14

每当使用较小的数据帧加入大型数据帧时,都应始终尝试执行广播联接。

如果df2足够小,可以广播,那么df1.join(广播(df2))将更高性能。

join() 方法的第二个参数应该是连接条件。

def approx_equal(col1, col2, threshold):
    return abs(col1 - col2) < threshold

threshold = lit(0.01)

df4 = df1.join(broadcast(df2), approx_equal(df2.new_lat, df1.latitude, threshold) && approx_equal(df2.new_long, df1. longitude, threshold))

编辑:我在quinn中添加approx_equal函数,因此您的代码可以更简洁:

import quinn as Q

threshold = lit(0.01)

df4 = df1.join(broadcast(df2), Q.approx_equal(df2.new_lat, df1.latitude, threshold) && Q.approx_equal(df2.new_long, df1. longitude, threshold))
 类似资料:
  • 假设我有两个数据帧,具有不同级别的信息,如下所示: 我想加入df1和df2,并将“值”信息传递给df2:一天中的每一小时都将获得“日”值。 预期产出:

  • 我有一个pyspark数据帧(df1 ),它由10K行组成,数据帧看起来像- 另一个pyspark数据帧(df2)由100k记录组成,看起来像- 我想使用pyspark内连接,最终的数据帧看起来像- df2中mobile_no的长度是12,但df1中是10。我可以加入它,但这是昂贵的操作。使用pyspark有帮助吗?

  • 所以我一直在寻找不同的方法来比较两个没有键列的PySpark数据帧。 假设我有两个数据帧,df1 我的想法是,我将得到一个输出数据帧,其中包含df1中与df2中任何行都不匹配的行,反之亦然。我还想要一些标志,以便区分df1中的行和df2中的行。 到目前为止,我已经将完全外部连接视为方法,例如: 完整的外部连接的问题是我可能需要处理一些非常大的数据帧(100万记录),我关心效率。我想过使用反左连接和

  • 问题内容: 我正在与n列的PySpark DataFrame。我有一组m列(m <n),我的任务是选择其中包含最大值的列。 例如: 输入:PySpark DataFrame包含: Ouput: 在PySpark中有什么方法可以执行此操作,还是应该将PySpark df转换为Pandas df,然后执行操作? 问题答案: 您可以减少在列列表中使用SQL表达式: Spark 1.5+还提供, 如果要保

  • 我有两个包含GB数据的大型pyspark数据框df1和df2。第一个数据框中的列是id1,col1。第二个数据框中的列是id2,col2。数据框的行数相等。id1和id2的所有值都是唯一的。id1的所有值也正好对应一个值id2。 因为。前几个条目与df1和df2区域相同,如下所示 DF1: df2: 所以我需要连接键 id1 和 id2 上的两个数据帧。df = df1.join(df2, df1

  • 我有两个具有大量(几百万到几千万)行的数据帧。我想为他们牵线搭桥。 在我目前使用的BI系统中,您可以通过首先对特定键进行分区,然后在该键上进行连接来快速完成此操作。 这是我在Spark中需要遵循的模式吗,或者这并不重要?乍一看,在分区之间转移数据似乎浪费了很多时间,因为没有正确地进行预分区。 如果有必要,我该怎么做?