当前位置: 首页 > 知识库问答 >
问题:

Pyspark-使用UDF创建具有StructType的新列

莘聪
2023-03-14
df.withColumn("event", ..)

如何将一个新的列事件添加到数据帧中,该事件将是< code>generate_header的结果?如何添加一行作为列值?

可能我们需要将函数转换为UDF

def generate_header(df_row):
    
    header = {
        "id": 1,
        ...
    }

    return EntityEvent(header, df_row)


class EntityEvent:
    def __init__(self, _header, _payload):
        self.header = _header
        self.payload = _payload

假设我们有这样的东西

+---------------+--------------------+
|book_id        |Author              |
+---------------+--------------------+
|865731         |{name: 'A',  }      |
+---------------+--------------------+

我们想得到这个

+---------------+--------------------+------------------------------
|book_id        |Author              | event                        |
+---------------+--------------------+------------------------------+
|865731         |{name: 'A',  }      | {header: { id: '865731'}, payload: {name: 'A'}}
+---------------+--------------------+----------------------------------------------------------

共有1个答案

滑乐逸
2023-03-14

您可以使用< code>create_map在列中生成映射类型。

(df.withColumn('event', F.create_map(
    F.lit('header'), F.create_map(F.lit('id'), F.col('book_id')),
    F.lit('payload'), F.col('Author'))
)

fyi:你可能不能在火花列中有Python对象。是否可以将自定义类对象作为列值存储在 Spark 数据框中?

更新:

如果您需要派生包含一些Python库函数的部分。

import base64

# udf function takes the return type schema.
@F.udf(MapType(StringType(), MapType(StringType(), StringType())))
def generate_header(book_id, author):
    b64str = base64.b64encode('some text'.encode('utf-8'))
    return {
        'header': { 'id': book_id, 'key': b64str },
        'payload': author
    }

df.withColumn('event', generate_header(F.col('book_id'), F.col('Author')))
 类似资料:
  • 现在,我想在一个函数中使用这个,如下所示- 然后使用此函数在我的DataFrame中创建一个新列 总之,我希望我的列“new_col”是一个类型数组,其值为[[x,x,x]] 我得到以下错误。我在这里做错了什么? 原因:java.lang.UnsupportedOperationException:不支持org.apache.spark.sql.Column类型的模式

  • 我一直在尝试用py函数在pyspark中实现udf,如下所示: 它采用了我之前训练过的bin模型。 input_text列包含普通文本,df是包含整个数据的数据框。 我在哪里得到以下错误: ​ Fasttext当前正在运行,python函数在同一个笔记本上运行没有任何问题。 谢谢你的帮助,

  • 问题内容: 使用Spark 1.6,我有一个Spark (命名为),其值分别为A,B,C,DS,DNS,E,F,G和H,我想使用下面的值创建一个新列(例如),我该如何映射?(因此fi’A’需要映射为’S’等。) 问题答案: 使用UDF(与版本无关)的低效率解决方案: 结果: 创建文字的效率更高( Spark > = 2.0,Spark <3.0): 结果相同: 但更有效的执行计划: 与UDF版本相

  • 我想要一个udf函数,它遍历列“Values”,并检查下一个值是否是当前行值的50%或更多。如果它在50%之内,那么我希望包含值“是”,如果不是,那么我不希望包含值。如果该值在最后一个值和下一个值之间下降得太快,则不应将其包括在内,但如果该值逐渐下降,且与最后一个包括的值相比不超过50%,则可以。这就是为什么。未包括id 5的1,但。包含id 9的1,因为它遵循的值从逐渐下降。4不超过50%。我曾

  • 我有一个PySpark Dataframe,它有两列(,,其类型为),其值为或。我正在尝试添加一个新列,这是这两个列的总和。我遵循Pyspark中的示例:在UDF中传递多列 这显示了一系列的< code>NULL,而不是我期望的结果。 我尝试了以下任何一种方法,以查看数据类型是否存在问题 仍然得到空值。 我试着移除阵列: 这可以正常工作并显示 我试着移除UDF,但是离开了阵列: 这可以正常工作并显

  • 我是新的spark和python,面临着从元数据文件构建模式的困难,该模式可以应用于我的数据文件。场景:数据文件的元数据文件(csv格式),包含列及其类型:例如: 我已成功将其转换为如下数据帧: 但是当我尝试用这个将其转换为StructField格式时 或 然后使用 我得到以下错误: 一旦我准备好了模式,我想使用createDataFrame来应用于我的数据文件。这个过程必须为许多表完成,所以我不