当前位置: 首页 > 知识库问答 >
问题:

结构类型/行的火花UDF

有宏峻
2023-03-14

我在火花数据帧中有一个“结构类型”列,它有一个数组和一个字符串作为子字段。我想修改数组并返回相同类型的新列。我可以用UDF处理它吗?或者有什么替代方案?

import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
val sub_schema = StructType(StructField("col1",ArrayType(IntegerType,false),true) :: StructField("col2",StringType,true)::Nil)
val schema = StructType(StructField("subtable", sub_schema,true) :: Nil)
val data = Seq(Row(Row(Array(1,2),"eb")),  Row(Row(Array(3,2,1), "dsf")) )
val rd = sc.parallelize(data)
val df = spark.createDataFrame(rd, schema)
df.printSchema

root
 |-- subtable: struct (nullable = true)
 |    |-- col1: array (nullable = true)
 |    |    |-- element: integer (containsNull = false)
 |    |-- col2: string (nullable = true)

似乎我需要行类型的UDF,类似

val u =  udf((x:Row) => x)
       >> Schema for type org.apache.spark.sql.Row is not supported

这是有意义的,因为Spark不知道返回类型的模式。不幸的是,udf.register也失败了:

spark.udf.register("foo", (x:Row)=> Row, sub_schema)
     <console>:30: error: overloaded method value register with alternatives: ...

共有3个答案

微生欣怡
2023-03-14

是的,你可以和UDF一起这样做。为简单起见,我以您的case类为例,通过给每个值加2来更改数组:

case class Root(subtable: Subtable)
case class Subtable(col1: Seq[Int], col2: String)

val df = spark.createDataFrame(Seq(
  Root(Subtable(Seq(1, 2, 3), "toto")),
  Root(Subtable(Seq(10, 20, 30), "tata"))
))

val myUdf = udf((subtable: Row) =>
  Subtable(subtable.getSeq[Int](0).map(_ + 2), subtable.getString(1))
)
val result = df.withColumn("subtable_new", myUdf(df("subtable")))
result.printSchema()
result.show(false)

将打印 :

root
 |-- subtable: struct (nullable = true)
 |    |-- col1: array (nullable = true)
 |    |    |-- element: integer (containsNull = false)
 |    |-- col2: string (nullable = true)
 |-- subtable_new: struct (nullable = true)
 |    |-- col1: array (nullable = true)
 |    |    |-- element: integer (containsNull = false)
 |    |-- col2: string (nullable = true)

+-------------------------------+-------------------------------+
|subtable                       |subtable_new                   |
+-------------------------------+-------------------------------+
|[WrappedArray(1, 2, 3),toto]   |[WrappedArray(3, 4, 5),toto]   |
|[WrappedArray(10, 20, 30),tata]|[WrappedArray(12, 22, 32),tata]|
+-------------------------------+-------------------------------+
宗政燕七
2023-03-14

你在正确的轨道上。在这种情况下,UDF将使您的生活轻松。正如您已经遇到的,UDF不能返回spark不知道的类型。所以基本上你需要返回一些spark可以轻松序列化的东西。它可能是一个<code>case类。因此,这里是您的代码的修改版本:

def main(args: Array[String]): Unit = {
  import org.apache.spark.sql.Row
  import org.apache.spark.sql.functions._
  import org.apache.spark.sql.types._
  val sub_schema = StructType(StructField("col1", ArrayType(IntegerType, false), true) :: StructField("col2", StringType, true) :: Nil)
  val schema = StructType(StructField("subtable", sub_schema, true) :: Nil)
  val data = Seq(Row(Row(Array(1, 2), "eb")), Row(Row(Array(3, 2, 1), "dsf")))
  val rd = spark.sparkContext.parallelize(data)
  val df = spark.createDataFrame(rd, schema)

  df.printSchema()
  df.show(false)

  val mapArray = (subRows: Row) => {
    // I prefer reading values from row by specifying column names, you may use index also
    val col1 = subRows.getAs[Seq[Int]]("col1")
    val mappedCol1 = col1.map(x => x * x) // Use map based on your requirements
    (mappedCol1, subRows.getAs[String]("col2")) // now mapping is done for col2
  }
  val mapUdf = udf(mapArray)

  val newDf = df.withColumn("col1_mapped", mapUdf(df("subtable")))
  newDf.show(false)
  newDf.printSchema()
}

请看一下这些链接,这些可能会给你更多的见解。

  1. 关于使用复杂架构的最全面的答案:https://stackoverflow.com/a/33850490/4046067
  2. Spark 支持的数据类型:https://spark.apache.org/docs/latest/sql-programming-guide.html#data-types
昌博易
2023-03-14

事实证明,您可以将结果架构作为第二个UDF参数传递:

val u =  udf((x:Row) => x, sub_schema)
 类似资料:
  • 我在我们的项目中使用了HDFS上的Apache spark和MapR。我们正面临着运行火花工作的问题,因为它在数据小幅增加后失败了。我们正在从csv文件中读取数据,做一些转换,聚合,然后存储在HBASE中。 请建议,如果上面的配置看起来很好,因为am geting的错误看起来像是要离开内存。

  • 数据-我使用XML中的许多附加列获取此类数据,并使用com。databricks spark-xml\u 2.11库,用于将xml数据转换为数据帧。 要求-必须从数组(struct)类型或列custom\u属性转换数据。示例中的custom\u属性,如示例输出所示。My struct有三个字段,分别命名为“\u VALUE”、“属性\u id”、“值”。我需要将属性id转换为列名称,数据为-检查“

  • 我正在寻找(一些测试没有成功)在不使用UDF的情况下从case语句返回元组结构类型,有什么方法可以做到这一点吗? 用例是:我们有两列依赖于case表达式中的相同条件,因此我们看到两个选项: 写入相同条件两次,但返回不同列(不需要) 写一次条件,但每次都返回2个值,这可以通过一个元组,然后我们将其拆分 我知道这可以使用UDF来完成,但是我们避免了UDF,因为被火花视为黑盒,因此它们是不可优化的,所以

  • 一些脚本在工作时什么也不做,当我手动运行它们时,其中一个失败了,出现了以下消息: 错误SparkUI:未能绑定SparkUI java.net.bindexception:地址已在使用:服务“SparkUI”在重试16次后失败! 所以我想知道是否有一种特定的方法来并行运行脚本?

  • 我对Apache Spark很陌生,有时仍在努力。我正在尝试导入一个非常复杂的json文件,并在将其保存到拼花文件之前将其展平。 我的json文件是一个存储树。 每个商店都可以有一个字段,该字段是一个帐户数组。一个帐户有3个必填字段和两个可选字段。所以我有一个数据框,它的字段可以有3种不同的类型。 在数据帧中导入文件并没有什么大不了的,但在扁平化过程中,我可能希望对两个数据帧进行联合,这两个数据帧

  • 我正在运行以下scala代码: 我知道firstStruct是structType,StructFields的一个名称是“name”,但在尝试强制转换时似乎失败了。我被告知spark/hive结构与scala不同,但为了使用structType,我需要 所以我想他们应该是同一种类型的。 我看了看这里:https://github.com/apache/spark/blob/master/sql/c