当前位置: 首页 > 知识库问答 >
问题:

从UDF中的SparkSQL行中提取嵌套数组

苏嘉歆
2023-03-14

我正在处理数据帧,需要提取数据。我有许多嵌套的级别,所以我使用分解和选择来创建第一个级别,但随后我对嵌套级别使用UDF。

我有一个UDF,它取Root.Obj,这是一个数组,我希望它返回一个数组[MyObj]
我的输出类:

case class MyObj(fieldA: Boolean, fieldB: String, fieldC: Array[MyNested])
case class MyNested(field1: Long, field2: String)

简而言之,这是输入模式:

 |-- Obj: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- FieldA: boolean (nullable = true)
 |    |    |-- FieldB: string (nullable = true)
 |    |    |-- FieldC: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- Field1: long (nullable = true)
 |    |    |    |    |-- Field2: string (nullable = true)
 |    |    |-- FieldD: boolean (nullable = true)

我的自定义项:

def extractObjs: UserDefinedFunction = udf {
  (objs: Seq[Row]) ⇒
    objs.map {
      obj ⇒
        MyObj(
          obj.getAs[Boolean]("FieldA"),
          obj.getAs[String]("FieldB"),
          extractNested(obj.get???("FieldC"))
        )
    }
}

def extractNested(nesteds: ???): Array[MyNested] = {
  ???
}

这是更复杂的IRL,因为我需要从其他地方检索值,并且有更多的嵌套数组。此外,Obj和FieldC的输入结构比这里复杂得多,我不能(或不想)为它们创建case类。因为我需要在多个地方这样做,所以假设我不知道FieldC元素的“结构”。

我的问题是提取“FieldC”数组。我想要一个Seq[Row],但我无法实现,getSTRt只给我一个Row,getSeq[Row]在抛出错误之后,因为scala.collection.mutable.WrapedArray$ofRef不能强制转换为org.apache.spark.sql.Row

共有1个答案

花阳秋
2023-03-14

结构映射到UDF中的行,因此可以通过以下方式访问结构数组:

def extractObjs: UserDefinedFunction = udf {
  (objs: Seq[Row]) ⇒
    objs.map {
      obj ⇒
        MyObj(
          obj.getAs[Boolean]("FieldA"),
          obj.getAs[String]("FieldB"),
          extractNested(obj.getAs[Seq[Row]]("FieldC"))
        )
    }
}

def extractNested(nesteds: Seq[Row]): Array[MyNested] = {
  nesteds.map(r => MyNested(r.getAs[Long]("Field1"),r.getAs[String]("Field2"))).toArray
}
 类似资料:
  • 我将json数据流(从ConvertAvroToJSON处理器驱动)转义为: 我只需要“JSON”值,如下所示[unescapeJson之后]: $...*

  • 从嵌套列表中提取数据的Jolt规范是什么

  • 这里我有两个文档,我需要查询以获得和我只想获得符合以上条件的注释。并非所有评论。 ' { “_id”: ObjectId(“53b7f2383ed7755c2400002e”), “title”: “Post One”, “author”: “bob”, “posted”: ISODate(“2014-07-05T12:40:24.0Z”), “pageViews”: NumberInt(5),

  • 问题内容: 问题 我想在Java中创建一个用户定义函数,可以将其称为Apache Spark运算符链中的Java方法。我在查找不需要UDF存在于SQL查询中的Java示例时遇到了麻烦。 版本号 Java 8 斯卡拉2.10.6 为Hadoop 2.6.0预先构建的Apache Spark 1.6.0 我尝试过的方法 我可以用Java成功创建UDF。但是,除非在SQL查询中,否则无法使用它: 我被困

  • 我有一个xml文档,它有多个。我能够得到和帐户的详细信息(,等。我很难得到像card_type、年、月、first_six等的东西。 这个文档中有200个事务,因此是循环。 我在尝试代码时遇到以下错误: 以下是我正在尝试的: