当前位置: 首页 > 面试题库 >

重命名Spark数据框中的嵌套字段

姚胡媚
2023-03-14
问题内容

df在Spark中有一个数据框:

 |-- array_field: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- a: string (nullable = true)
 |    |    |-- b: long (nullable = true)
 |    |    |-- c: long (nullable = true)

如何将字段重命名array_field.aarray_field.a_renamed

[更新]:

.withColumnRenamed() 不适用于嵌套字段,因此我尝试了这种hacky和不安全的方法:

# First alter the schema:
schema = df.schema
schema['array_field'].dataType.elementType['a'].name = 'a_renamed'

ind = schema['array_field'].dataType.elementType.names.index('a')
schema['array_field'].dataType.elementType.names[ind] = 'a_renamed'

# Then set dataframe's schema with altered schema
df._schema = schema

我知道设置私有属性不是一个好习惯,但我不知道其他为df设置架构的方法

我觉得我是在一个正确的轨道,但df.printSchema()仍显示为旧名array_field.a,虽然df.schema == schemaTrue


问题答案:

蟒蛇

无法修改单个嵌套字段。您必须重新创建一个整体结构。在这种特殊情况下,最简单的解决方案是使用cast

首先是一堆进口商品:

from collections import namedtuple
from pyspark.sql.functions import col
from pyspark.sql.types import (
    ArrayType, LongType, StringType, StructField, StructType)

和示例数据:

Record = namedtuple("Record", ["a", "b", "c"])

df = sc.parallelize([([Record("foo", 1, 3)], )]).toDF(["array_field"])

让我们确认模式与您的情况相同:

df.printSchema()



root
 |-- array_field: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- a: string (nullable = true)
 |    |    |-- b: long (nullable = true)
 |    |    |-- c: long (nullable = true)

您可以将新模式定义为例如字符串:

str_schema = "array<struct<a_renamed:string,b:bigint,c:bigint>>"

df.select(col("array_field").cast(str_schema)).printSchema()



root
 |-- array_field: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- a_renamed: string (nullable = true)
 |    |    |-- b: long (nullable = true)
 |    |    |-- c: long (nullable = true)

DataType

struct_schema = ArrayType(StructType([
    StructField("a_renamed", StringType()),
    StructField("b", LongType()),
    StructField("c", LongType())
]))

 df.select(col("array_field").cast(struct_schema)).printSchema()



root
 |-- array_field: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- a_renamed: string (nullable = true)
 |    |    |-- b: long (nullable = true)
 |    |    |-- c: long (nullable = true)

斯卡拉

可以在Scala中使用相同的技术:

case class Record(a: String, b: Long, c: Long)

val df = Seq(Tuple1(Seq(Record("foo", 1, 3)))).toDF("array_field")

val strSchema = "array<struct<a_renamed:string,b:bigint,c:bigint>>"

df.select($"array_field".cast(strSchema))

要么

import org.apache.spark.sql.types._

val structSchema = ArrayType(StructType(Seq(
    StructField("a_renamed", StringType),
    StructField("b", LongType),
    StructField("c", LongType)
)))

df.select($"array_field".cast(structSchema))

可能的改进

如果您使用表现力的数据操作或JSON处理库,则将数据类型转储到dictJSON字符串并从那里获取数据会更容易,例如(Python /
toolz):

from toolz.curried import pipe, assoc_in, update_in, map
from operator import attrgetter

# Update name to "a_updated" if name is "a"
rename_field = update_in(
    keys=["name"], func=lambda x: "a_updated" if x == "a" else x)

updated_schema = pipe(
   #  Get schema of the field as a dict
   df.schema["array_field"].jsonValue(),
   # Update fields with rename
   update_in(
       keys=["type", "elementType", "fields"],
       func=lambda x: pipe(x, map(rename_field), list)),
   # Load schema from dict
   StructField.fromJson,
   # Get data type
   attrgetter("dataType"))

df.select(col("array_field").cast(updated_schema)).printSchema()


 类似资料:
  • 我需要展平一个数据帧,以便将其与Spark(Scala)中的另一个数据帧连接起来。 基本上,我的2个数据帧有以下模式: 数据流1 DF2 老实说,我不知道如何使DF2变平。最后,我需要连接DF.field4 = DF2.field9上的2个数据帧 我用的是2.1.0 我的第一个想法是使用爆炸,但在Spark 2.1.0中已经被否决了,有人能给我一点提示吗?

  • 问题内容: 我需要在此重命名: 我试过了 但返回:。 问题答案: 如文档中所述,无法使用单个命令直接重命名数组中的字段。您唯一的选择是遍历收集文档,阅读它们并使用$ unset old / $ set new操作更新每个文档。

  • 我正在处理一个项目,它有多个进程,每个进程都有不同的数据项要处理。数据项(对于不同的进程)有不同的列(但对于相同的进程总是相同的列)。 一开始,我假设为所有流程创建一个表是很好的,然后,每当创建一个新流程时,也可以创建另一个包含项目数据的表,但事实证明,将会有一种新的流程方法来经常创建新的表。然后我研究了嵌套表,但发现MySQL中没有嵌套表的概念。(我听说这可以用MariaDB完成,有人用过吗?)

  • 问题内容: 我可以使用此查询找出重复的数据 我能够获取重复数据。我只需要知道如何使用名称将重复数据重命名为 new 问题答案: 假设您在表上有某种主键,例如自动增量ID,则可以执行以下操作。 为了说明,它将查找重复的任何内容,为该集中的所有内容获取最大ID,并在其末尾附加“副本1”。如果您有3次或多次使用某些名称,则可能仍会留下一些重复项。只需再次运行它,这次使用“副本2”而不是“副本1”。继续重

  • 我正试图创建一个Spark df,它包含顶级字段和嵌套字段,这些字段来自包含与json对象的键和值相对应的键和值的字典列表,我在选择嵌套列时遇到了问题。 以下是我目前掌握的情况: 输入是包含JSON值的字典列表: 期望输出: *请注意,我不需要重命名列,但必须缩短它们以适应一行 这是我到目前为止的代码: 我只能选择uid,这是顶级字段,所以我猜我在选择嵌套值时出错了。 请帮忙。

  • 有没有比调用多个帧更好的方法来同时为给定的 SparkSQL 添加前缀或重命名所有或多个列? 例如,如果我想检测更改(使用完全外连接)。然后我剩下两个具有相同结构的< code >数据帧。