我在pyspark有两个数据框。如下所示,df1保存来自传感器的整个long_lat。第二个数据帧df2是第一个数据帧的子集,其中lat-long值被向上舍入到2位小数,然后删除重复项以保留唯一的lat_long数据点。
DF1:
+-----------------+---------+-----+--------------------+----------+------------+
| UID| label|value| datetime| latitude| longitude|
+-----------------+---------+-----+--------------------+----------+------------+
|1B0545GD6546Y|evnt | 3644|2020-06-08T23:32:...|40.1172005|-105.0823546|
|1B0545GD6FG67|evnt | 3644|2020-06-08T23:32:...|40.1172201|-105.0821007|
|15GD6546YFG67|evnt | 3644|2020-06-08T23:32:...|40.1172396|-105.0818468|
|1BGD6546YFG67|evnt | 3644|2020-06-08T23:32:...|40.1172613|-105.0815929|
|1BGD6546YFG67|evnt | 3644|2020-06-08T23:32:...|40.1172808|-105.0813368|
|1B054546YFG67|evnt | 3644|2020-06-08T23:32:...|40.1173003|-105.0810742|
|1B056546YFG67|evnt | 3644|2020-06-08T23:32:...| 40.117322|-105.0808073|
df2:
+-------+--------+----------------+--------------+
|new_lat|new_long| lat_long| State_name|
+-------+--------+----------------+--------------+
| 40.13| -105.1|[40.13, -105.1] | Colorado|
| 40.15| -105.11|[40.15, -105.11]| Colorado|
| 40.12| -105.07|[40.12, -105.07]| Colorado|
| 40.13| -104.99|[40.13, -104.99]| Colorado|
| 40.15| -105.09|[40.15, -105.09]| Colorado|
| 40.15| -105.13|[40.15, -105.13]| Colorado|
| 40.12| -104.94|[40.12, -104.94]| Colorado|
因此,df2 的行数比第一个少得多。在 df2 中,我应用了一个 udf 来计算状态名称。
现在我想在 df1 中填充状态名称。由于 df2 的 lat_long 值向上舍入到 2 位十进制,为了匹配我使用如下所示的阈值,我在这里使用连接操作。
threshold = 0.01
df4 = df1.join(df2)\
.filter(df2.new_lat - threshold < df1.latitude)\
.filter(df1.latitude < df2.new_lat + threshold)
有没有其他有效的方法来实现同样的目标?因为连接操作是做笛卡尔积的,需要时间和大量的任务。
考虑一下,我的df1将有1万亿记录。
任何,帮助将不胜感激。
每当使用较小的数据帧加入大型数据帧时,都应始终尝试执行广播联接。
如果df2
足够小,可以广播,那么df1.join(广播(df2))
将更高性能。
join()
方法的第二个参数应该是连接条件。
def approx_equal(col1, col2, threshold):
return abs(col1 - col2) < threshold
threshold = lit(0.01)
df4 = df1.join(broadcast(df2), approx_equal(df2.new_lat, df1.latitude, threshold) && approx_equal(df2.new_long, df1. longitude, threshold))
编辑:我在quinn中添加了approx_equal
函数,因此您的代码可以更简洁:
import quinn as Q
threshold = lit(0.01)
df4 = df1.join(broadcast(df2), Q.approx_equal(df2.new_lat, df1.latitude, threshold) && Q.approx_equal(df2.new_long, df1. longitude, threshold))
假设我有两个数据帧,具有不同级别的信息,如下所示: 我想加入df1和df2,并将“值”信息传递给df2:一天中的每一小时都将获得“日”值。 预期产出:
我有一个pyspark数据帧(df1 ),它由10K行组成,数据帧看起来像- 另一个pyspark数据帧(df2)由100k记录组成,看起来像- 我想使用pyspark内连接,最终的数据帧看起来像- df2中mobile_no的长度是12,但df1中是10。我可以加入它,但这是昂贵的操作。使用pyspark有帮助吗?
所以我一直在寻找不同的方法来比较两个没有键列的PySpark数据帧。 假设我有两个数据帧,df1 我的想法是,我将得到一个输出数据帧,其中包含df1中与df2中任何行都不匹配的行,反之亦然。我还想要一些标志,以便区分df1中的行和df2中的行。 到目前为止,我已经将完全外部连接视为方法,例如: 完整的外部连接的问题是我可能需要处理一些非常大的数据帧(100万记录),我关心效率。我想过使用反左连接和
问题内容: 我正在与n列的PySpark DataFrame。我有一组m列(m <n),我的任务是选择其中包含最大值的列。 例如: 输入:PySpark DataFrame包含: Ouput: 在PySpark中有什么方法可以执行此操作,还是应该将PySpark df转换为Pandas df,然后执行操作? 问题答案: 您可以减少在列列表中使用SQL表达式: Spark 1.5+还提供, 如果要保
我有两个包含GB数据的大型pyspark数据框df1和df2。第一个数据框中的列是id1,col1。第二个数据框中的列是id2,col2。数据框的行数相等。id1和id2的所有值都是唯一的。id1的所有值也正好对应一个值id2。 因为。前几个条目与df1和df2区域相同,如下所示 DF1: df2: 所以我需要连接键 id1 和 id2 上的两个数据帧。df = df1.join(df2, df1
我有两个具有大量(几百万到几千万)行的数据帧。我想为他们牵线搭桥。 在我目前使用的BI系统中,您可以通过首先对特定键进行分区,然后在该键上进行连接来快速完成此操作。 这是我在Spark中需要遵循的模式吗,或者这并不重要?乍一看,在分区之间转移数据似乎浪费了很多时间,因为没有正确地进行预分区。 如果有必要,我该怎么做?