当前位置: 首页 > 知识库问答 >
问题:

Spark df不选择嵌套字段作为列名

董良策
2023-03-14

我正试图创建一个Spark df,它包含顶级字段和嵌套字段,这些字段来自包含与json对象的键和值相对应的键和值的字典列表,我在选择嵌套列时遇到了问题。

以下是我目前掌握的情况:

输入是包含JSON值的字典列表:

[{
  "uid": 98763,
  "estimatedGrade": {
    "science": 10.03,
    "english": 20.5,
   },
  "actualGrade":  {
    "science": 10.03,
    "english": 20.5,
   }
}]

printed schema:
 |-- uid: long (nullable = true)
 |-- actualGrade: struct (nullable = true)
 |    |-- science: double(nullable = true)
 |    |-- english: double (nullable = true)
 |-- estimatedGrade: struct (nullable = true)
 |    |-- science: double(nullable = true)
 |    |-- english: double (nullable = true)

期望输出:

*请注意,我不需要重命名列,但必须缩短它们以适应一行

这是我到目前为止的代码:


    #jsons contains list of dict with the json key/values
    df = self._spark.sparkContext.parallelize(jsons).map(lambda x: json.dumps(x))
    df = self._spark.read.json(df, multiLine=True)
    
    logger.info("Df count: %s", df.count())
    logger.info("Df table schema: %s", df.printSchema())
    
    columns = ['uid', 'estimatedGrade.science', 'estimatedGrade.english', 'actualGrade.science', 'actualGrade.english']
    
    df.select([column_header for column_header in df.columns if column_header in columns])

我只能选择uid,这是顶级字段,所以我猜我在选择嵌套值时出错了。

请帮忙。

共有1个答案

颜修真
2023-03-14

df.columns只返回顶级列名。您可以通过在使用您提供的数据示例创建的df上运行它来检查这一点。它返回:['有值','估计值','uid']

目前我知道的唯一好方法是迭代df.schema。递归地,如果字段为StructType,则检查字段的嵌套列。

这里有一些代码,应该可以帮助您在路上。首先,导入结构类型

from pyspark.sql.types import StructType

然后,设置一些辅助函数。第一个是递归返回所有列名,包括使用点表示法的嵌套列。第二个辅助函数将列表展平。

def get_schema_field_name(field, parent=None):
  if type(field.dataType) == StructType:
    if parent == None:
      prt = field.name
    else:
      prt = parent+"."+field.name # using dot notation
    res = []
    for i in field.dataType.fields:
      res.append(get_schema_field_name(i, prt))
    return res
  else:
    if parent==None:
      res = field.name
    else:
      res = parent+"."+field.name
    return res

def flatten(S):
  if S == []:
    return S
  if isinstance(S[0], list):
    return flatten(S[0]) + flatten(S[1:])
  return S[:1] + flatten(S[1:])

然后,循环遍历您的模式并使用上面的方法获取所有列(包括嵌套列)。

column_list = []
for j in df.schema:
  column_list.append(get_schema_field_name(j))
column_list = flatten(column_list)

最后,替换select语句中的df.columns部分。

columns = ['uid', 'estimatedGrade.science', 'estimatedGrade.english', 'actualGrade.science', 'actualGrade.english']

df.select([column_header for column_header in column_list if column_header in columns])

在测试Databricks 8.2(Spark 3.1.1)时工作。

我还使用这种方法来列出Spark中所有表中的所有列名,因此请随时查看本文以获得进一步参考:https://medium.com/helmes-people/how-to-view-all-databases-tables-and-columns-in-databricks-9683b12fee10

 类似资料:
  • 这是我在mongo的文档: 我需要更新内部房间阵列中的对象。我尝试了一个选择匹配元素的查询没有语法错误,但出现了一个错误: “errmsg”:“字段“calendar.0.rooms.0.price”必须是数组,但在文档{u id:ObjectId('5cd26a886458720f7a66a3b8')中为字符串类型”, 这是我的疑问: 这是我在StackOverflow中找到的一些参考,但没有帮

  • 我有一个带有varchar类型电子邮件字段的表。 除表中的一封电子邮件外,其他所有电子邮件都能正常工作。

  • 问题内容: 我有一个看起来像这样的mysql表: 我希望能够查询得到多行,每一列一个。例如: 这会给我: 我需要用什么来制定这样的查询? 问题答案: 您必须首先将每个指定字段的数据应用于结果。 询问 签 出 SQLFIDDLE

  • 问题内容: 我创建了一个客户c#DropDownList控件,可以将其内容呈现为optgroup(不是从头开始,我编辑了一些在Internet上找到的代码,尽管我确切地了解了它的作用),并且工作正常。 但是,我现在遇到一种情况,我需要在下拉菜单中有两个缩进级别,即 但是,在上面的示例代码段中,它呈现的缩进量与相同。 有没有一种方法可以产生我想要的嵌套optgroup行为? 问题答案: 好的,如果有

  • 我的文档具有如下所示的嵌套字段: 嵌套字段的映射如下所示: 在切换到ElasticSearch2之前,我使用aggs查询了没有结果的文档。以下是查询的聚合部分: 现在我切换到了ElasticSerach2,它只计算所有文档。我已经尝试了不同的方法,比如计算所有文档和计算结果,这样我就可以减去结果,但是 总是0 如何正确筛选/计算嵌套字段?

  • 我有以下xml 我需要将其反序列化为以下POJO: 这里的问题是被包装在元素中