我有两个数据帧df1
和ip2Country
df1
包含IP地址,我正在尝试将IP地址映射到地理位置信息,如经度和纬度,它们是ip2Country
中的列。
我运行它作为一个火花提交作业,但操作花了很长时间,即使df1
只有不到2500行。
我的代码:
val agg =df1.join(ip2Country, ip2Country("network_start_int")=df1("sint") , "inner") .select($"src_ip" ,$"country_name".alias("scountry") ,$"iso_3".alias("scode") ,$"longitude".alias("slong") ,$"latitude".alias("slat") ,$"dst_ip",$"dint",$"count") .filter($"slong".isNotNull) val agg1 =agg.join(ip2Country, ip2Country("network_start_int")=agg("dint") , "inner") .select($"src_ip",$"scountry" ,$"scode",$"slong" ,$"slat",$"dst_ip" ,$"country_name".alias("dcountry") ,$"iso_3".alias("dcode") ,$"longitude".alias("dlong") ,$"latitude".alias("dlat"),$"count") .filter($"dlong".isNotNull)
有没有其他方法可以加入这两张桌子?还是我做错了?
如果你有一个大的数据帧需要与一个小的数据帧连接,广播连接是非常有效的。阅读这里:广播连接(又名地图侧连接)
bigdf.join(broadcast(smalldf))
我正在通过API分页,并将请求保存到数据帧。 我一次可以收集100行,这个循环目前运行一个多小时。 我担心这是因为一旦我达到100000行以上,添加下一个100行就会变得非常低效。 这是我当前的代码: 为了尽可能提高效率,我在整个请求中只保留了3列。我还会删除100行中出现的任何重复项。
假设我有两个数据帧,具有不同级别的信息,如下所示: 我想加入df1和df2,并将“值”信息传递给df2:一天中的每一小时都将获得“日”值。 预期产出:
我有两个数据帧需要以我正在努力的特定方式连接。 数据帧 1: 数据框2: 期望结果: 基本上-它应该在上连接df1和df2,但是如果df2中不存在,那么生成的应该是df1中的。 我尝试了 ),但这显然在列中留下了 以供 ,因为它在 df2 中没有匹配的域。对于此特定情况,我如何将它们添加到列中?
我有两个火花DF,我需要加入。只选择df1中存在的df2中的值,不应该有重复的行。 例如: df1: df2: 我正在做以下工作: 但是我的输出有几个重复的行。 如果val从df1中删除,我试图实现一个类似except的操作。但是除了之外,
我有两个Spark数据帧,每个数据帧有5.39亿行和4列。A和B列是字符串,C、D、E和F列是浮点数 我想加入DF1(5.39亿行)和DF2(5.39万行)。我在50节点集群和8GB执行器内存上尝试了DF1.join(DF2,“fullouter”)。它会自动终止集群,并显示内存不足错误消息。 是否有替代方法与rdds连接,或者比df.join()内存效率高的数据集?
基于“SC”代码,我需要将SRCTable与RefTable-1或RefTable-2连接起来 条件:如果SC为“D”,则SRCTable在KEY=KEY1上与RefTable-1连接以获得值。否则,如果SC为“U”,则SRCTable与键=键2上的RefTable-2连接 这是输入spark数据帧。 预期产出: 注意:输入表将有数百万条记录,因此需要一个优化的解决方案