我正在尝试为以下sql查询编写pyspark代码:
Create table table1 as
Select a.ip_address,a.ip_number,b.ip_start_int,b.ip_end_int,b.post_code_id,b.city,b.region_name,b.two_letter_country
from nk_ip_address_check a
join
ip_additional_pulse b
on a.ip_number between b.ip_start_int and b.ip_end_int
上面的查询在两个表之间进行连接,并使用带有“on”子句的“between”子句。我写了一个UDF,做了同样的事情,但是看起来非常慢。有没有什么方法可以用pyspark代码编写上面的查询,这样可以获得更好的性能。
下面是我正在使用的代码
def ip_mapping(ip_int):
ip_qry = "select country_code,region_code,city_code,postal_code from de_pulse_ip_pqt where ip_start_int < {} and ip_end_int > {}".format(ip_int,ip_int)
result = spark.sql(ip_qry)
country_code = result.rdd.map(lambda x: x['country_code']).first()
return country_code
ip_mapped = udf(ip_mapping, IntegerType())
df_final = df.withColumn("country_code", ip_mapped("ip_int"))
这是非常低效的。此外,如果我有region_code,我必须通过更改函数ip_mapping的返回值来调用。
df_final = df.withColumn("region_code", ip_mapped("ip_int"))
您可以使用 between 定义连接条件,
并在连接中利用它。下面的示例应该适合您。
join_condition = [nk_ip_address_check.ip_number.between(ip_additional_pulse.ip_start_int,ip_additional_pulse.ip_end_int)]
nk_ip_address_check.alias('a')\
.join(ip_additional_pulse.alias('b'),cond)\
.selectExpr("a.ip_address",
"a.ip_number",
"b.ip_start_int",
"b.ip_end_int",
"b.post_code_id",
"b.city",
"b.region_name",
"b.two_letter_country")
因此,对于DF中的每个IP,您在IP的另一个DF中执行搜索-
简单的解决方案 -
无论如何,您应该对每个 IP 执行一次操作,并返回特定 IP 的所有 GeoIP 数据。
您的ip_mapping函数应返回项目列表(例如:(country_code,city_code,region_code))
您的UDF应该使用数组模式,UDF的结果将是多列输出(请参见https://stackoverflow.com/a/35323680/5088142更多信息)
我在dataframe中总共有100列。我试图比较两个数据帧,并找到列名不匹配的记录。我得到了以下代码的输出,但当我运行100列的代码时,作业被中止。 我正在为SCD类型2增量进程错误查找执行此操作。 请建议任何其他方式。
我想找出使用内部联接联接时两个数据帧的列值之间的差异。 df1有10列,即。key 1, key 2 现在我想比较连接的df3中已经存在的两个数据帧df1和df2的对应列。 现在我对zip(df1.columns,df2.columns)中的每个x,y进行循环,并存储在list < code > un match list . append((df3 . select(df1 . x,df2.y)
我有两个数据帧df1和df2,其中df2是df1的子集。我如何获得一个新的数据帧(df3),它是两个数据帧之间的差值? 换句话说,一个数据帧,它包含了df1中所有的行/列,而不是DF2中的行/列?
我有两个数据帧,我需要连接一列,如果id包含在第二个数据帧的同一列中,则只从第一个数据帧中获取行: df1: 断续器: 期望输出: 我已经用df1.join(df2("id ")," left ")试过了,但是给我错误:“Dataframe”对象是不可调用的。
假设我有两个数据帧,具有不同级别的信息,如下所示: 我想加入df1和df2,并将“值”信息传递给df2:一天中的每一小时都将获得“日”值。 预期产出:
我有两个熊猫数据帧(示例是说明性的)。df1['list\u of_keywords']和df2['list\u of_words']列分别包含单词列表。 df1: df2: 对于df1['list\u of_keywords'](源单元格)中的每个单元格,我想了解df2['list\u of_words'](目标单元格)中是否存在匹配的单元格。匹配单元格定义为包含源单元格中的所有单词的单元格(它