当前位置: 首页 > 知识库问答 >
问题:

Spark:连接两个数据帧的更快方法?

阚允晨
2023-03-14

我有两个数据帧df1ip2Countrydf1包含IP地址,我正在尝试将IP地址映射到地理位置信息,如经度和纬度,它们是ip2Country中的列。

我运行它作为一个火花提交作业,但操作花了很长时间,即使df1只有不到2500行。

我的代码:

val agg =df1.join(ip2Country, ip2Country("network_start_int")=df1("sint")
, "inner")
.select($"src_ip"
,$"country_name".alias("scountry")
,$"iso_3".alias("scode")
,$"longitude".alias("slong")
,$"latitude".alias("slat")
,$"dst_ip",$"dint",$"count")
.filter($"slong".isNotNull)

val agg1 =agg.join(ip2Country, ip2Country("network_start_int")=agg("dint")
, "inner")
.select($"src_ip",$"scountry"
,$"scode",$"slong"
,$"slat",$"dst_ip"
,$"country_name".alias("dcountry")
,$"iso_3".alias("dcode")
,$"longitude".alias("dlong")
,$"latitude".alias("dlat"),$"count")
.filter($"dlong".isNotNull)

有没有其他方法可以加入这两张桌子?还是我做错了?

共有1个答案

沈曜灿
2023-03-14

如果你有一个大的数据帧需要与一个小的数据帧连接,广播连接是非常有效的。阅读这里:广播连接(又名地图侧连接)

bigdf.join(broadcast(smalldf))
 类似资料:
  • 我正在通过API分页,并将请求保存到数据帧。 我一次可以收集100行,这个循环目前运行一个多小时。 我担心这是因为一旦我达到100000行以上,添加下一个100行就会变得非常低效。 这是我当前的代码: 为了尽可能提高效率,我在整个请求中只保留了3列。我还会删除100行中出现的任何重复项。

  • 假设我有两个数据帧,具有不同级别的信息,如下所示: 我想加入df1和df2,并将“值”信息传递给df2:一天中的每一小时都将获得“日”值。 预期产出:

  • 我有两个数据帧需要以我正在努力的特定方式连接。 数据帧 1: 数据框2: 期望结果: 基本上-它应该在上连接df1和df2,但是如果df2中不存在,那么生成的应该是df1中的。 我尝试了 ),但这显然在列中留下了 以供 ,因为它在 df2 中没有匹配的域。对于此特定情况,我如何将它们添加到列中?

  • 我有两个火花DF,我需要加入。只选择df1中存在的df2中的值,不应该有重复的行。 例如: df1: df2: 我正在做以下工作: 但是我的输出有几个重复的行。 如果val从df1中删除,我试图实现一个类似except的操作。但是除了之外,

  • 我有两个Spark数据帧,每个数据帧有5.39亿行和4列。A和B列是字符串,C、D、E和F列是浮点数 我想加入DF1(5.39亿行)和DF2(5.39万行)。我在50节点集群和8GB执行器内存上尝试了DF1.join(DF2,“fullouter”)。它会自动终止集群,并显示内存不足错误消息。 是否有替代方法与rdds连接,或者比df.join()内存效率高的数据集?

  • 基于“SC”代码,我需要将SRCTable与RefTable-1或RefTable-2连接起来 条件:如果SC为“D”,则SRCTable在KEY=KEY1上与RefTable-1连接以获得值。否则,如果SC为“U”,则SRCTable与键=键2上的RefTable-2连接 这是输入spark数据帧。 预期产出: 注意:输入表将有数百万条记录,因此需要一个优化的解决方案