当前位置: 首页 > 知识库问答 >
问题:

从结构元素的嵌套数组创建Spark数据帧?

唐兴思
2023-03-14

我已经在Spark中读取了一个JSON文件。该文件具有以下结构:

   root
      |-- engagement: struct (nullable = true)
      |    |-- engagementItems: array (nullable = true)
      |    |    |-- element: struct (containsNull = true)
      |    |    |    |-- availabilityEngagement: struct (nullable = true)
      |    |    |    |    |-- dimapraUnit: struct (nullable = true)
      |    |    |    |    |    |-- code: string (nullable = true)
      |    |    |    |    |    |-- constrained: boolean (nullable = true)
      |    |    |    |    |    |-- id: long (nullable = true)
      |    |    |    |    |    |-- label: string (nullable = true)
      |    |    |    |    |    |-- ranking: long (nullable = true)
      |    |    |    |    |    |-- type: string (nullable = true)
      |    |    |    |    |    |-- version: long (nullable = true)
      |    |    |    |    |    |-- visible: boolean (nullable = true)

我创建了一个递归函数来使用嵌套结构类型的列来展平架构

def flattenSchema(schema: StructType, prefix: String = null):Array[Column]= 
        {
        schema.fields.flatMap(f => {
          val colName = if (prefix == null) f.name else (prefix + "." + f.name)

          f.dataType match {
            case st: StructType => flattenSchema(st, colName)
            case _ => Array(col(colName).alias(colName))
          }
        })
        }

val newDF=SIWINSDF.select(flattenSchema(SIWINSDF.schema):_*)

val secondDF=newDF.toDF(newDF.columns.map(_.replace(".", "_")): _*)

如何展平包含嵌套结构类型的ArrayType,例如engagementItems:数组(nullable=true)

感谢您的帮助。

共有1个答案

汪成仁
2023-03-14

这里的问题是您需要管理ArrayType的案例,并在将其转换为structType之后。因此,您可以为此使用Scala运行时转换。

首先,我将场景生成为下一个场景(顺便说一句,将其包含在您的问题中会非常有帮助,因为这样可以更容易地再现问题):

  case class DimapraUnit(code: String, constrained: Boolean, id: Long, label: String, ranking: Long, _type: String, version: Long, visible: Boolean)
  case class AvailabilityEngagement(dimapraUnit: DimapraUnit)
  case class Element(availabilityEngagement: AvailabilityEngagement)
  case class Engagement(engagementItems: Array[Element])
  case class root(engagement: Engagement)
  def getSchema(): StructType ={
    import org.apache.spark.sql.types._
    import org.apache.spark.sql.catalyst.ScalaReflection
    val schema = ScalaReflection.schemaFor[root].dataType.asInstanceOf[StructType]

    schema.printTreeString()
    schema
  }

这将打印出:

root
 |-- engagement: struct (nullable = true)
 |    |-- engagementItems: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- availabilityEngagement: struct (nullable = true)
 |    |    |    |    |-- dimapraUnit: struct (nullable = true)
 |    |    |    |    |    |-- code: string (nullable = true)
 |    |    |    |    |    |-- constrained: boolean (nullable = false)
 |    |    |    |    |    |-- id: long (nullable = false)
 |    |    |    |    |    |-- label: string (nullable = true)
 |    |    |    |    |    |-- ranking: long (nullable = false)
 |    |    |    |    |    |-- _type: string (nullable = true)
 |    |    |    |    |    |-- version: long (nullable = false)
 |    |    |    |    |    |-- visible: boolean (nullable = false)

然后我通过为ArrayType添加额外的检查并使用asInstanceOf将其转换为structType来修改您的函数:

  import org.apache.spark.sql.types._  
  def flattenSchema(schema: StructType, prefix: String = null):Array[Column]=
  {
    schema.fields.flatMap(f => {
      val colName = if (prefix == null) f.name else (prefix + "." + f.name)

      f.dataType match {
        case st: StructType => flattenSchema(st, colName)
        case at: ArrayType =>
          val st = at.elementType.asInstanceOf[StructType]
          flattenSchema(st, colName)
        case _ => Array(new Column(colName).alias(colName))
      }
    })
  }

最后是结果:

val s = getSchema()
val res = flattenSchema(s)

res.foreach(println(_))

输出:

engagement.engagementItems.availabilityEngagement.dimapraUnit.code AS `engagement.engagementItems.availabilityEngagement.dimapraUnit.code`
engagement.engagementItems.availabilityEngagement.dimapraUnit.constrained AS `engagement.engagementItems.availabilityEngagement.dimapraUnit.constrained`
engagement.engagementItems.availabilityEngagement.dimapraUnit.id AS `engagement.engagementItems.availabilityEngagement.dimapraUnit.id`
engagement.engagementItems.availabilityEngagement.dimapraUnit.label AS `engagement.engagementItems.availabilityEngagement.dimapraUnit.label`
engagement.engagementItems.availabilityEngagement.dimapraUnit.ranking AS `engagement.engagementItems.availabilityEngagement.dimapraUnit.ranking`
engagement.engagementItems.availabilityEngagement.dimapraUnit._type AS `engagement.engagementItems.availabilityEngagement.dimapraUnit._type`
engagement.engagementItems.availabilityEngagement.dimapraUnit.version AS `engagement.engagementItems.availabilityEngagement.dimapraUnit.version`
engagement.engagementItems.availabilityEngagement.dimapraUnit.visible AS `engagement.engagementItems.availabilityEngagement.dimapraUnit.visible`
 类似资料:
  • 我需要展平一个数据帧,以便将其与Spark(Scala)中的另一个数据帧连接起来。 基本上,我的2个数据帧有以下模式: 数据流1 DF2 老实说,我不知道如何使DF2变平。最后,我需要连接DF.field4 = DF2.field9上的2个数据帧 我用的是2.1.0 我的第一个想法是使用爆炸,但在Spark 2.1.0中已经被否决了,有人能给我一点提示吗?

  • 我需要创建一个嵌套数组,使用路径作为子节点的参考。例如:4.1是4岁的孩子,4.1.1是4.1岁的孩子,4.2是4岁的孩子...我有一个平面数组,里面有所有的数据和路径。如何创建嵌套数组的最佳方法,其中子数组根据其路径嵌套到其父数组。 输入: 输出: 最好的方法是递归的。对这个算法有什么建议吗?

  • 问题内容: 我已经开始使用https://mholt.github.io/json-to-go/将API JSON转换为go结构,但我真的很喜欢它,但是我仍然坚持如何在Report Definition结构中初始化Filters数组结构如下所示。 我似乎无法引用在Filters结构甚至是Filters结构中声明的项,以创建新的Filter项目并将其附加到Filters。 是否可以使用原样编写的Re

  • 我有下面的数据帧模式作为df.current模式,需要获得预期的模式作为df.expected模式,有没有一种方法,我可以在火花2.3实现这一点 df.current架构: df。预期架构: 示例数据: 注意:这里需要实现两件事: 为元素中的每个E、V对创建新字段SN,其值应为数组名称。例如:对于第一个数组列(ADA),SN的值=ADA 将阵列(ADA、ADW)合并为一个外部阵列(信号)

  • 元组和列表十分类似,只不过元组和字符串一样是 不可变的 即你不能修改元组。元组通过圆括号中用逗号分割的项目定义。元组通常用在使语句或用户定义的函数能够安全地采用一组值的时候,即被使用的元组的值不会改变。 使用元组 例9.2 使用元组 #!/usr/bin/python # Filename: using_tuple.py zoo = ('wolf','elephant','penguin') pr

  • 我的mongo数据库中有: 我想从这个数组中删除on元素。我用的是golang和mgo。v2驱动程序,下面是我的代码: 它继续成功,但不删除项目时,我检查mongob。有人能告诉我我做错了什么吗?谢谢你们