当前位置: 首页 > 知识库问答 >
问题:

在Spark Scala中将ArrayType列传递给UDF

雷锋
2023-03-14

我在Scala的Spark数据框架中有一列,它是使用

 agg(collect_list(struct(col(abc), col(aaa)).as(def)

我想将此列传递给UDF,以便进一步处理,以处理此聚合列中的一个索引。

当我将参数传递给我的UDF时:

.withColumn(def, remove
            (col(xyz), col(def)))

UDF-类型为Seq[Row]:val removeUnstableActivations:UserDefinedFunction=UDF((xyz:java.util.Date,def:Seq[Row])

我收到错误:

Exception encountered when invoking run on a nested suite - Schema for type org.apache.spark.sql.Row is not supported

我应该如何传递这些列,UDF中列的数据类型应该是什么?

共有1个答案

华景明
2023-03-14

事实上,不支持类型Row的模式,但您可以返回案例类。Spark将返回的案例类视为结构类型。例如:

import org.apache.spark.sql.functions._
import spark.implicits._
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.Row

val df = Seq(
  (1, "a"),
  (2, "b"),
  (3, "c")
).toDF("number", "word")

val aggDf = df.agg(
  collect_list(struct(col("number"), col("word"))) as "aggColumn"
)

aggDf.printSchema()
// |-- aggColumn: array (nullable = true)
// |    |-- element: struct (containsNull = true)
// |    |    |-- number: string (nullable = true)
// |    |    |-- word: integer (nullable = false)

case class ReturnSchema(word: String, number: Int)

val myUdf: UserDefinedFunction =
  udf((collection: Seq[Row]) => {
    collection.map(r => {
      val word   = r.getAs[String]("word")
      val newNumber = r.getAs[Int]("number") * 100

      new ReturnSchema(word, newNumber)
    })
  })
  
val finalDf = aggDf.select(myUdf(col("aggColumn")).as("udfTranformedColumn"))

finalDf.printSchema
// root
//  |-- udfTranformedColumn: array (nullable = true)
//  |    |-- element: struct (containsNull = true)
//  |    |    |-- word: string (nullable = true)
//  |    |    |-- number: integer (nullable = false)

finalDf.show(false)
// +------------------------------+
// |udfTranformedColumn           |
// +------------------------------+
// |[[a, 100], [b, 200], [c, 300]]|
// +------------------------------+
 类似资料:
  • 我有一个包含两列的数据帧,一列是数据,另一列是该数据字段中的字符计数。 我想根据count列中的值更改列数据的值。如何实现这一点?我尝试使用一个udf: 这似乎是失败的,这是正确的做法吗?

  • 问题内容: 我想按值将列表传递给函数。默认情况下,列表和其他复杂对象通过引用传递给函数。这是一些目标: 可以写得短些吗?换句话说,我不想更改 ad 。 问题答案: 您可以使用,但是对于包含列表(或其他可变对象)的列表,您应该使用: 等价于或,并返回列表的浅表副本。 何时使用:

  • 问题内容: 我正在尝试从SQL导出到.csv,如果我对其进行硬编码以接受一定数量的参数,则它可以正常工作。问题是,我想允许用户请求任意数量的参数,并将它们传递给where子句。该代码应该使这一点更加清楚。 所以我想我要做的是将列表传递给where子句,而不是显式的:dates#变量。例如,一个人可以使用参数“ 2012-01-0412:00、2012-02-04 12:00、2012-03-04

  • 有没有比传递分隔字符串并稍后解析它更优雅的方法来传递可选的整数列表到?我也有一个位置论点。 不工作,因为试图获取并抱怨它不是整数。 理想情况下,我想与其中一个执行 或者类似的东西,但也能

  • 我想合并火花中的多个数组类型[字符串类型]列以创建一个数组类型[字符串类型]。为了组合两列,我在这里找到了解决方案: 将Array[string]类型的两个spark sql列合并成一个新的Array[string]列 但是如果我不知道编译时的列数,我该如何进行组合呢?在运行时,我将知道所有要组合的列的名称。 一种选择是使用上面stackoverflow问题中定义的UDF,在循环中多次添加两列。但

  • 问题内容: 我的字符串中某些地方包含数字,并且我正尝试用其单词符号替换此数字(即3-> 3)。我有一个功能可以做到这一点。现在的问题是找到字符串中的数字,同时保持字符串的其余部分不变。为此,我选择使用该函数,该函数可以接受“ callable”。但是,传递给它的对象是内部对象,我不确定如何处理它。我的函数接受数字或其字符串表示形式。 我应该如何编写一些辅助函数,该函数可用于将调用与执行所需处理的函