df.withColumn("event", ..)
如何将一个新的列事件添加到数据帧中,该事件将是< code>generate_header的结果?如何添加一行作为列值?
可能我们需要将函数转换为UDF
def generate_header(df_row):
header = {
"id": 1,
...
}
return EntityEvent(header, df_row)
class EntityEvent:
def __init__(self, _header, _payload):
self.header = _header
self.payload = _payload
假设我们有这样的东西
+---------------+--------------------+
|book_id |Author |
+---------------+--------------------+
|865731 |{name: 'A', } |
+---------------+--------------------+
我们想得到这个
+---------------+--------------------+------------------------------
|book_id |Author | event |
+---------------+--------------------+------------------------------+
|865731 |{name: 'A', } | {header: { id: '865731'}, payload: {name: 'A'}}
+---------------+--------------------+----------------------------------------------------------
您可以使用< code>create_map在列中生成映射类型。
(df.withColumn('event', F.create_map(
F.lit('header'), F.create_map(F.lit('id'), F.col('book_id')),
F.lit('payload'), F.col('Author'))
)
fyi:你可能不能在火花列中有Python对象。是否可以将自定义类对象作为列值存储在 Spark 数据框中?
更新:
如果您需要派生包含一些Python库函数的部分。
import base64
# udf function takes the return type schema.
@F.udf(MapType(StringType(), MapType(StringType(), StringType())))
def generate_header(book_id, author):
b64str = base64.b64encode('some text'.encode('utf-8'))
return {
'header': { 'id': book_id, 'key': b64str },
'payload': author
}
df.withColumn('event', generate_header(F.col('book_id'), F.col('Author')))
现在,我想在一个函数中使用这个,如下所示- 然后使用此函数在我的DataFrame中创建一个新列 总之,我希望我的列“new_col”是一个类型数组,其值为[[x,x,x]] 我得到以下错误。我在这里做错了什么? 原因:java.lang.UnsupportedOperationException:不支持org.apache.spark.sql.Column类型的模式
我一直在尝试用py函数在pyspark中实现udf,如下所示: 它采用了我之前训练过的bin模型。 input_text列包含普通文本,df是包含整个数据的数据框。 我在哪里得到以下错误: Fasttext当前正在运行,python函数在同一个笔记本上运行没有任何问题。 谢谢你的帮助,
问题内容: 使用Spark 1.6,我有一个Spark (命名为),其值分别为A,B,C,DS,DNS,E,F,G和H,我想使用下面的值创建一个新列(例如),我该如何映射?(因此fi’A’需要映射为’S’等。) 问题答案: 使用UDF(与版本无关)的低效率解决方案: 结果: 创建文字的效率更高( Spark > = 2.0,Spark <3.0): 结果相同: 但更有效的执行计划: 与UDF版本相
我想要一个udf函数,它遍历列“Values”,并检查下一个值是否是当前行值的50%或更多。如果它在50%之内,那么我希望包含值“是”,如果不是,那么我不希望包含值。如果该值在最后一个值和下一个值之间下降得太快,则不应将其包括在内,但如果该值逐渐下降,且与最后一个包括的值相比不超过50%,则可以。这就是为什么。未包括id 5的1,但。包含id 9的1,因为它遵循的值从逐渐下降。4不超过50%。我曾
我有一个PySpark Dataframe,它有两列(,,其类型为),其值为或。我正在尝试添加一个新列,这是这两个列的总和。我遵循Pyspark中的示例:在UDF中传递多列 这显示了一系列的< code>NULL,而不是我期望的结果。 我尝试了以下任何一种方法,以查看数据类型是否存在问题 仍然得到空值。 我试着移除阵列: 这可以正常工作并显示 我试着移除UDF,但是离开了阵列: 这可以正常工作并显
我是新的spark和python,面临着从元数据文件构建模式的困难,该模式可以应用于我的数据文件。场景:数据文件的元数据文件(csv格式),包含列及其类型:例如: 我已成功将其转换为如下数据帧: 但是当我尝试用这个将其转换为StructField格式时 或 然后使用 我得到以下错误: 一旦我准备好了模式,我想使用createDataFrame来应用于我的数据文件。这个过程必须为许多表完成,所以我不