当前位置: 首页 > 知识库问答 >
问题:

转换蟒蛇 Lambda 函数而不将值返回到派斯帕克

茹航
2023-03-14

我在Python中有一个有效的lambda函数,它计算dataset1中的每个字符串和dataset2中的字符串之间的最高相似度。在迭代过程中,它将字符串、最佳匹配和相似性以及一些其他信息写入bigquery。没有返回值,因为该函数的目的是向bigquery数据集中插入一行。这个过程需要相当长的时间,这就是为什么我想使用Pyspark和Dataproc来加速这个过程。

将熊猫的数据帧转换成spark很容易。我在注册我的udf时遇到了麻烦,因为它没有返回值,而pyspark需要一个。此外,我不明白如何将python中的‘apply’函数映射到pyspark变体。所以基本上我的问题是如何转换下面的python代码来处理spark数据帧。

以下代码适用于常规 Python 环境:

def embargomatch(name, code, embargo_names):
     find best match 
     insert best match and additional information to bigquery

customer_names.apply(lambda x: embargoMatch(x['name'], x['customer_code'],embargo_names),axis=1)

因为pyspark需要一个返回类型,所以我在udf中添加了“return 1 ”,并尝试了以下方法:


customer_names = spark.createDataFrame(customer_names)

from pyspark.sql.types import IntegerType
embargo_match_udf = udf(lambda x: embargoMatch(x['name'], x['customer_code'],embargo_names), IntegerType())

现在我一直在尝试应用select函数,因为我不知道要给出什么参数。

共有1个答案

松霖
2023-03-14

我怀疑您对如何将多列传递给udf感到困惑——这是这个问题的一个很好的答案:Pyspark:在UDF中传递多列。

与其基于包装函数的 lambda 创建 udf,不如考虑通过直接基于禁运匹配创建 udf 来简化。

embargo_names = ...

# The parameters here are the columns passed into the udf
def embargomatch(name, customer_code):
    pass
embargo_match_udf = udf(embargomatch, IntegerType())
customer_names.select(embargo_match_udf(array('name', 'customer_code')).alias('column_name'))

话虽如此,怀疑您的udf不会返回任何内容 - 我通常将udfs视为向数据帧添加列的一种方式,但不会产生副作用。如果要将记录插入到 bigquery 中,请考虑执行如下操作:

customer_names.select('column_name').write.parquet('gs://some/path')
os.system("bq load --source_format=PARQUET [DATASET].[TABLE] gs://some/path")
 类似资料:
  • 我试图了解异步/等待如何与promise一起工作。 据我所知,await应该是阻塞的,在上面的代码中,它似乎阻塞了返回带有原语

  • 我使用这个MCP3561,没有外部时钟(MCLK),也没有使用中断(irq)(都是浮动的)。 数据表 我最初试图运行快速命令来获取adcdata,但它全部返回0. 然后我试图相应地设置所有配置位,并且当读取adcdata时,它仍然返回数据的所有0。但是在读取所有地址的增量中,我可以看到配置中的数据很好。 我的阅读功能: 我读到的数据是: r_buf[3]返回写入的配置0和。。。。以r_buf[28

  • 我正在运行Ubuntu 18.04。 我使用mysql连接器-python连接Python到MySQL。 我使用的是Python 3.6.7,并且已经安装了mysql连接器-python。 我已经安装了mysql连接器-python-py3_8.0.13-1ubuntu18.10_all.deb. 在运行Python脚本时,mysql。连接器模块似乎加载正确,但脚本在碰到光标时失败。next()具

  • 假设我有一些资源,我想在用python编写的aws lambda中的不同请求之间共享。我应该如何实现这一点? 是否有“启动后”挂钩,或者我应该在第一次调用时惰性地创建资源?“延迟初始化”的缺点是,它意味着一些请求会随机变慢,因为您选择了一个消费者来承担启动成本。 此外…这些资源会在lambda可执行文件被“冻结”后幸存下来吗? 本页https://docs.aws.amazon.com/lambd

  • 从数据库中提取博客。 为什么这里的blogContent在函数的上下文中是全局的,而我们正在内部更改它,所以它应该返回值,有人能解释为什么会这样吗? 有没有不使用promise/async/await的方法>>>有人给了我答案 我如何从异步调用返回响应? 但这一点我不知道,最主要的是我不知道Ajax。

  • 我用的是美丽的汤。它给我一些超文本标记语言节点的文本,但是这些节点有一些Unicode字符,这些字符被转换为字符串中的转义序列。 例如,一个具有以下: 的 HTML 元素由美丽的汤检索,如:作为以下字符串:,这只能在 Python 控制台中读取。但是,当写入JSON文件时,它变得不可读。注意:我使用以下代码保存到 JSON 文件: 我如何将这些Unicode字符转换回UTF-8或任何使它们可读的东