当前位置: 首页 > 知识库问答 >
问题:

加入两个pyspark数据帧,使用之间子句从一系列IP中查找IP详细信息

巫马正卿
2023-03-14

我正在尝试为以下sql查询编写pyspark代码:

Create table table1 as
Select a.ip_address,a.ip_number,b.ip_start_int,b.ip_end_int,b.post_code_id,b.city,b.region_name,b.two_letter_country
from nk_ip_address_check a 
join 
ip_additional_pulse b
on a.ip_number between b.ip_start_int and b.ip_end_int

上面的查询在两个表之间进行连接,并使用带有“on”子句的“between”子句。我写了一个UDF,做了同样的事情,但是看起来非常慢。有没有什么方法可以用pyspark代码编写上面的查询,这样可以获得更好的性能。

下面是我正在使用的代码

def ip_mapping(ip_int):
    ip_qry = "select country_code,region_code,city_code,postal_code from de_pulse_ip_pqt where ip_start_int < {} and ip_end_int > {}".format(ip_int,ip_int)
    result = spark.sql(ip_qry)
    country_code = result.rdd.map(lambda x: x['country_code']).first()
    return country_code

ip_mapped = udf(ip_mapping, IntegerType())  
df_final = df.withColumn("country_code", ip_mapped("ip_int"))

这是非常低效的。此外,如果我有region_code,我必须通过更改函数ip_mapping的返回值来调用。

df_final = df.withColumn("region_code", ip_mapped("ip_int"))

共有2个答案

毛德华
2023-03-14

您可以使用 between 定义连接条件并在连接中利用它。下面的示例应该适合您。

join_condition = [nk_ip_address_check.ip_number.between(ip_additional_pulse.ip_start_int,ip_additional_pulse.ip_end_int)]

nk_ip_address_check.alias('a')\
    .join(ip_additional_pulse.alias('b'),cond)\
    .selectExpr("a.ip_address",
                "a.ip_number",
                "b.ip_start_int",
                "b.ip_end_int",
                "b.post_code_id",
                "b.city",
                "b.region_name",
                "b.two_letter_country")
郑锋
2023-03-14

因此,对于DF中的每个IP,您在IP的另一个DF中执行搜索-

简单的解决方案 -

无论如何,您应该对每个 IP 执行一次操作,并返回特定 IP 的所有 GeoIP 数据。

您的ip_mapping函数应返回项目列表(例如:(country_code,city_code,region_code))

您的UDF应该使用数组模式,UDF的结果将是多列输出(请参见https://stackoverflow.com/a/35323680/5088142更多信息)

 类似资料:
  • 我在dataframe中总共有100列。我试图比较两个数据帧,并找到列名不匹配的记录。我得到了以下代码的输出,但当我运行100列的代码时,作业被中止。 我正在为SCD类型2增量进程错误查找执行此操作。 请建议任何其他方式。

  • 我想找出使用内部联接联接时两个数据帧的列值之间的差异。 df1有10列,即。key 1, key 2 现在我想比较连接的df3中已经存在的两个数据帧df1和df2的对应列。 现在我对zip(df1.columns,df2.columns)中的每个x,y进行循环,并存储在list < code > un match list . append((df3 . select(df1 . x,df2.y)

  • 我有两个数据帧df1和df2,其中df2是df1的子集。我如何获得一个新的数据帧(df3),它是两个数据帧之间的差值? 换句话说,一个数据帧,它包含了df1中所有的行/列,而不是DF2中的行/列?

  • 我有两个数据帧,我需要连接一列,如果id包含在第二个数据帧的同一列中,则只从第一个数据帧中获取行: df1: 断续器: 期望输出: 我已经用df1.join(df2("id ")," left ")试过了,但是给我错误:“Dataframe”对象是不可调用的。

  • 假设我有两个数据帧,具有不同级别的信息,如下所示: 我想加入df1和df2,并将“值”信息传递给df2:一天中的每一小时都将获得“日”值。 预期产出:

  • 我有两个熊猫数据帧(示例是说明性的)。df1['list\u of_keywords']和df2['list\u of_words']列分别包含单词列表。 df1: df2: 对于df1['list\u of_keywords'](源单元格)中的每个单元格,我想了解df2['list\u of_words'](目标单元格)中是否存在匹配的单元格。匹配单元格定义为包含源单元格中的所有单词的单元格(它