当前位置: 首页 > 知识库问答 >
问题:

在UDF PySpark中传递多个列

沈凯康
2023-03-14

我想计算PySpark数据帧的两列之间的Jaro Winkler距离。Jaro-Winkler距离可通过所有节点上的pyjarowinkler包获得。

pyjarowinkler的工作原理如下:

from pyjarowinkler import distance
distance.get_jaro_distance("A", "A", winkler=True, scaling=0.1)

输出:

1.0

我试图编写一个UDF,将两列作为序列传递,并使用lambda函数计算距离。我是这样做的:

@pandas_udf("float", PandasUDFType.SCALAR)
def get_distance(col1, col2):
    import pandas as pd
    distance_df  = pd.DataFrame({'column_A': col1, 'column_B': col2})
    distance_df['distance'] = distance_df.apply(lambda x: distance.get_jaro_distance(str(distance_df['column_A']), str(distance_df['column_B']), winkler = True, scaling = 0.1))
    return distance_df['distance']

temp = temp.withColumn('jaro_distance', get_distance(temp.x, temp.x))

我应该能够在上述函数中传递任意两个字符串列。我得到以下输出:

+---+---+---+-------------+
|  x|  y|  z|jaro_distance|
+---+---+---+-------------+
|  A|  1|  2|         null|
|  B|  3|  4|         null|
|  C|  5|  6|         null|
|  D|  7|  8|         null|
+---+---+---+-------------+

预期产出:

+---+---+---+-------------+
|  x|  y|  z|jaro_distance|
+---+---+---+-------------+
|  A|  1|  2|          1.0|
|  B|  3|  4|          1.0|
|  C|  5|  6|          1.0|
|  D|  7|  8|          1.0|
+---+---+---+-------------+

我怀疑这可能是因为str(distance_df['column_A'])不正确。它包含所有行值的连接字符串。

虽然这个代码为我工作:

@pandas_udf("float", PandasUDFType.SCALAR)
def get_distance(col):
    return col.apply(lambda x: distance.get_jaro_distance(x, "A", winkler = True, scaling = 0.1))

temp = temp.withColumn('jaro_distance', get_distance(temp.x))

输出:

+---+---+---+-------------+
|  x|  y|  z|jaro_distance|
+---+---+---+-------------+
|  A|  1|  2|          1.0|
|  B|  3|  4|          0.0|
|  C|  5|  6|          0.0|
|  D|  7|  8|          0.0|
+---+---+---+-------------+

熊猫UDF有办法做到这一点吗?我正在处理数百万张唱片,所以UDF会很贵,但如果有效,仍然可以接受。谢谢。

共有2个答案

贺波
2023-03-14

您可以首先合并所有数据帧,在分区被洗牌并分发到工作节点之后,使用相同的分区键进行分区,并在计算之前恢复它们。请查看我为这个场景编写了一个小工具包的示例:SparkyPandas

郑宇
2023-03-14

错误来自df中的函数。应用方法,将其调整到以下位置,以便修复:

@pandas_udf("float", PandasUDFType.SCALAR)
def get_distance(col1, col2):
    import pandas as pd
    distance_df  = pd.DataFrame({'column_A': col1, 'column_B': col2})
    distance_df['distance'] = distance_df.apply(lambda x: distance.get_jaro_distance(x['column_A'], x['column_B'], winkler = True, scaling = 0.1), axis=1)
    return distance_df['distance']

然而,熊猫却不例外。apply方法不是矢量化的,这不符合我们在PySpark中需要pandas_udf而不是udf的目的。一个更快、开销更小的解决方案是使用列表理解来创建返回的pd。系列(有关Pandas df.apply及其备选方案的更多讨论,请查看此链接):

from pandas import Series

@pandas_udf("float", PandasUDFType.SCALAR)
def get_distance(col1, col2):
   return Series([ distance.get_jaro_distance(c1, c2, winkler=True, scaling=0.1) for c1,c2 in zip(col1, col2) ])

df.withColumn('jaro_distance', get_distance('x', 'y')).show()
+---+---+---+-------------+
|  x|  y|  z|jaro_distance|
+---+---+---+-------------+
| AB| 1B|  2|         0.67|
| BB| BB|  4|          1.0|
| CB| 5D|  6|          0.0|
| DB|B7F|  8|         0.61|
+---+---+---+-------------+
 类似资料:
  • 但是,当我试图将此代码更改为以下代码时,我得到了一个错误-错误:Main method,在类MyClass中找不到,请将Main method,定义为:public static void Main(string[]args)。我需要接收owner和consumerName作为我的程序的参数/输入。 ,这是怎么做到的?

  • 问题内容: 假设我有这个功能: 我想这样称呼它: 当然,不能用这种方法来完成,因为Postgres试图用该名称和三个不存在的参数来查找函数。 我试图用引号引起来,但在这种情况下,参数解释错误: data1’,’data2’,’data3 ,就像一个字符串一样。 有没有一种方法可以在参数中放置多个值,以便IN子句可以识别它? 问题答案: 您的函数将不会被创建。之后是句法废话。 无论哪种方式,带有 参

  • 例如,我的mapper.xml文件中有以下xml片段: 如您所见,with订阅只有一列 我想传递2列给它,因此得到的代码,我们怎么做?

  • 问题内容: 如何在Go中将多个外部命令通过管道传递?我已经试过了这段代码,但是看到一条错误消息。 问题答案: StdoutPipe返回一条管道,该管道将在命令启动时连接到命令的标准输出。在Wait看到命令退出后,管道将自动关闭。 (来自http://golang.org/pkg/os/exec/#Cmd.StdinPipe) 您确实关闭了事实。 我做了一个工作示例(只是一个演示,添加了错误捕获功能

  • 问题内容: 我试图弄清楚如何在URL中传递多个参数。我想将纬度和经度从我的android类传递给Java servlet。我怎样才能做到这一点? 在这种情况下,输出(写入文件)为。这是可行的,但我想在两个单独的参数中传递纬度和经度,以便减少在服务器端的工作。如果不可能,我如何至少在&之间添加一个空格,以便可以使用class获取经度和纬度。我试过以下行,但无济于事。 我的servlet代码如下: 我

  • 本文向大家介绍在 mapper 中如何传递多个参数?相关面试题,主要包含被问及在 mapper 中如何传递多个参数?时的应答技巧和注意事项,需要的朋友参考一下 1、第一种:   2、第二种:使用 \@param 注解:   然后,就可以在 xml 像下面这样使用(推荐封装为一个 map,作为单个参数传递给mapper)   3、第三种:多个参数封装成 map