当前位置: 首页 > 知识库问答 >
问题:

withColumn上的PySPARK UDF替换column

咸亦
2023-03-14

编写此自定义项是为了用变量替换列的值。Python 2.7;Spark 2.2.0

import pyspark.sql.functions as func

    def updateCol(col, st):
       return func.expr(col).replace(func.expr(col), func.expr(st))

  updateColUDF = func.udf(updateCol, StringType())

变量L_1到L_3更新了每行的列。我这样称呼它:

updatedDF = orig_df.withColumn("L1", updateColUDF("L1", func.format_string(L_1))). \
                withColumn("L2", updateColUDF("L2", func.format_string(L_2))). \
                withColumn("L3", updateColUDF("L3", 
                withColumn("NAME", func.format_string(name)). \
                withColumn("AGE", func.format_string(age)). \
                select("id", "ts", "L1", "L2", "L3",
                     "NAME", "AGE")

错误是:

return Column(sc._jvm.functions.expr(str))
AttributeError: 'NoneType' object has no attribute '_jvm'

共有2个答案

鲍永春
2023-03-14

错误是因为您正在udf中使用pyspark函数。了解母语和二语的内容也会很有帮助。。变量。

然而,如果我理解了你想正确地做什么,你就不需要自定义项。我假设L1,L2等是常数,对吗?如果不让我知道相应地调整代码。举个例子:

from pyspark import SparkConf
from pyspark.sql import SparkSession, functions as F


conf = SparkConf()
spark_session = SparkSession.builder \
    .config(conf=conf) \
    .appName('test') \
    .getOrCreate()

data = [{'L1': "test", 'L2': "data"}, {'L1': "other test", 'L2': "other data"}]
df = spark_session.createDataFrame(data)
df.show()

# +----------+----------+
# |        L1|        L2|
# +----------+----------+
# |      test|      data|
# |other test|other data|
# +----------+----------+

L1 = 'some other data'
updatedDF = df.withColumn(
    "L1",
    F.lit(L1)
)
updatedDF.show()
# +---------------+----------+
# |             L1|        L2|
# +---------------+----------+
# |some other data|      data|
# |some other data|other data|
# +---------------+----------+


# or if you need to replace the value in a more complex way
pattern = '\w+'
updatedDF = updatedDF.withColumn(
    "L1",
    F.regexp_replace(F.col("L1"), pattern, "testing replace")
)

updatedDF.show()
# +--------------------+----------+
# |                  L1|        L2|
# +--------------------+----------+
# |testing replace t...|      data|
# |testing replace t...|other data|
# +--------------------+----------+

# or even something more complicated:
# set L1 value to L2 column when L2 column equals to data, otherwise, just leave L2 as it is
updatedDF = df.withColumn(
    "L2",
    F.when(F.col('L2') == 'data', L1).otherwise(F.col('L2'))
)
updatedDF.show()

# +----------+---------------+
# |        L1|             L2|
# +----------+---------------+
# |      test|some other data|
# |other test|     other data|
# +----------+---------------+

所以你的例子是:

DF = orig_df.withColumn("L1", pyspark_func.lit(L_1))
...

另外,请确保在此之前您有一个活跃的火花会话

我希望这能有所帮助。

编辑:如果L1、L2等是列表,那么一个选项是用它们创建一个数据帧并连接到初始df。不幸的是,我们需要索引来连接,因为您的数据帧相当大,我认为这不是一个性能非常好的解决方案。我们也可以使用广播和udf,或者广播和加入。

下面是一个如何进行连接的(我认为是次优的)示例:

L1 = ['row 1 L1', 'row 2 L1']
L2 = ['row 1 L2', 'row 2 L2']

# create a df with indexes    
to_update_df = spark_session.createDataFrame([{"row_index": i, "L1": row[0], "L2": row[1]} for i, row in enumerate(zip(L1, L2))])

# add indexes to the initial df 
indexed_df = updatedDF.rdd.zipWithIndex().toDF()
indexed_df.show()
# +--------------------+---+
# | _1 | _2 |
# +--------------------+---+
# | [test, some other... | 0 |
# | [other test, othe... | 1 |
# +--------------------+---+

# bring the df back to its initial form
indexed_df = indexed_df.withColumn('row_number', F.col("_2"))\
    .withColumn('L1', F.col("_1").getItem('L1'))\
    .withColumn('L2', F.col("_1").getItem('L2')).\
    select('row_number', 'L1', 'L2')

indexed_df.show()
# +----------+----------+---------------+
# |row_number|        L1|             L2|
# +----------+----------+---------------+
# |         0|      test|some other data|
# |         1|other test|     other data|
# +----------+----------+---------------+

# join with your results and keep the updated columns
final_df = indexed_df.alias('initial_data').join(to_update_df.alias('other_data'), F.col('row_index')==F.col('row_number'), how='left')
final_df = final_df.select('initial_data.row_number', 'other_data.L1', 'other_data.L2')
final_df.show()

# +----------+--------+--------+
# |row_number|      L1|      L2|
# +----------+--------+--------+
# |         0|row 1 L1|row 1 L2|
# |         1|row 2 L1|row 2 L2|
# +----------+--------+--------+

这个^在性能方面肯定可以更好。

罗金林
2023-03-14

尝试创建示例数据帧,然后使用PySpark中的lit函数。

似乎工作正常,这是使用数据库笔记

 类似资料:
  • 例如,我有这样的代码 如何将“\on the”替换为replace eAll()?

  • 我只想知道如何用另一个文件完全替换用户的主机文件?注意:我只想给用户我的. exe编译文件(附加我自己的主机文件),并且在运行exe文件后,用户的主机文件应替换为我自己的主机文件,我附加到我的exe文件。

  • Git 对象是不可改变的,但它提供一种有趣的方式来用其他对象假装替换数据库中的 Git 对象。 replace 命令可以让你在 Git 中指定一个对象并可以声称“每次你遇到这个 Git 对象时,假装它是其他的东西”。 在你用一个不同的提交替换历史中的一个提交时,这会非常有用。 例如,你有一个大型的代码历史并想把自己的仓库分成一个短的历史和一个更大更长久的历史,短历史供新的开发者使用,后者给喜欢数据

  • 然后我试着这样称呼它。 不幸的是,修改URL编码路径替换后,我最终请求而不是。是否有任何方法可以禁用路径替换的URL编码或跨多个路径段进行替换?不幸的是,我甚至不知道有多少路径段,它都是由API控制的。

  • 问题内容: 我正在尝试运行一个SSH到远程主机上的bash脚本,并停止正在运行的单个docker容器。 但是,出现以下错误: 当我手动执行此操作(向计算机SSH,运行命令)时,一切都很好,但是当尝试通过脚本执行操作时,我得到了错误。我猜我的命令替换语法不正确,我已经搜索并尝试了各种引号等,但无济于事。 谁能指出我要去哪里了? 问题答案: 在启动您的Heredoc时,请使用(或-仅引用第一个字符将具

  • 我正在使用一些非常古老的。 我试图通过删除非字母字符(标点符号和数字等)来净化一些输入 通常我会做这样的事情: 然而,是在中引入的!所以它不会编译!http://docs.oracle.com/javase/7/docs/api/java/lang/string.html#ReplaceAll(java.lang.string,java.lang.string) 在之前,我们是如何做到这一点的?