当前位置: 首页 > 知识库问答 >
问题:

Spark scala 数据框 udf 返回行

益富
2023-03-14

假设我有一个数据帧,其中包含一个列(称为colA ),这是一个行序列。我想在可乐的每个记录中添加一个新字段。(而新的字段与之前的记录关联,所以我必须写一个udf。)这个udf应该怎么写?

我尝试编写一个udf,它将colA作为输入,并输出Seq[Row],其中每个记录都包含新字段。但问题是udf无法返回Seq[Row]/异常是“org.apache.spark.sql类型的模式”。不支持行“”。我该怎么办?

我编写的udf:<code>val convert=udf[Seq[Row],Seq[Row]](blablabla…

共有2个答案

薛高澹
2023-03-14

这是个老问题,我只是想根据新版Spark更新一下。

从Spark 3.0.0开始,@Raphael罗斯提到的方法被弃用。因此,您可能会得到一个分析异常。原因是使用此方法的输入闭包没有类型检查,并且在涉及空值时,行为可能与我们在SQL中预期的行为不同。

如果您真的知道自己在做什么,则需要设置<code>spark.sql.legacy。allowUntypedScalaUDF配置为true

另一种解决方案是使用<code>case class

case class Foo(field1: String, field2: String)

val convertFunction: Seq[Row] => Seq[Foo] = input => {
    input.map {
        x => // do something with x and convert to Foo
    }
}

val myUdf = udf(convertFunction)
邰昀
2023-03-14

从 spark 2.0 开始,您可以创建返回 Row / Seq[Row] 的 UDF,但您必须提供返回类型的架构,例如,如果您使用双精度数组:

val schema = ArrayType(DoubleType)

val myUDF = udf((s: Seq[Row]) => {
  s // just pass data without modification
}, schema)

但我真的无法想象这在哪里有用,我宁愿从UDF返回元组或case类(或其Seq)。

编辑:如果您的行包含超过22个字段(元组/案例类的字段限制),则这可能很有用

 类似资料:
  • 问题内容: Python pandas具有pct_change函数,可用于计算数据帧中股票价格的回报: 我正在使用以下代码获取对数返回值,但它给出的值与pct.change()函数完全相同: 问题答案: 这是一种使用来计算日志返回的方法。结果与所计算的总收益相似但不相同。您可以上传示例数据的副本(Dropbox共享链接)以重现您看到的不一致之处吗?

  • 我正在尝试在PySpark中为两个数据框(df1和df2)创建自定义连接(类似于此),代码如下所示: 我得到的错误消息是: 有没有办法编写一个可以处理来自两个单独数据帧的列的 PySpark UDF?

  • 问题内容: 对于数据框 我有兴趣按名称和等级分组,并且可能会得到汇总值 但是我想在原始字段中获得一个字段,其中包含该行的组号,例如 有没有在熊猫中做到这一点的好方法? 我可以用python来获得 但是在大型数据框上它的运行速度很慢,因此我认为可能会有更好的内置熊猫方法来做到这一点。 问题答案: 很多方便的东西存储在对象中。例如: 所以: 潜伏在某个地方可能会有更好的别名,但是无论如何这应该起作用。

  • 我使用的是sparkSql 1.6.2(Java API),我必须处理下面的DataFrame,其中包含两列中的值列表: 所需的表为: 我想我必须使用爆炸函数和自定义UDF函数的组合。 null register(“combineUDF”,combineUDF,retSchema); 任何帮助都将非常感谢。 更新:我试图首先实现zip(AttributeName,AttributeValue),所

  • 火花UDF是否可能返回多个值?如果是这样,如何在数据框架API中访问各个项目。

  • 如何只在pos时返回case类