我正试图创建一个Spark df,它包含顶级字段和嵌套字段,这些字段来自包含与json对象的键和值相对应的键和值的字典列表,我在选择嵌套列时遇到了问题。
以下是我目前掌握的情况:
输入是包含JSON值的字典列表:
[{
"uid": 98763,
"estimatedGrade": {
"science": 10.03,
"english": 20.5,
},
"actualGrade": {
"science": 10.03,
"english": 20.5,
}
}]
printed schema:
|-- uid: long (nullable = true)
|-- actualGrade: struct (nullable = true)
| |-- science: double(nullable = true)
| |-- english: double (nullable = true)
|-- estimatedGrade: struct (nullable = true)
| |-- science: double(nullable = true)
| |-- english: double (nullable = true)
期望输出:
*请注意,我不需要重命名列,但必须缩短它们以适应一行
这是我到目前为止的代码:
#jsons contains list of dict with the json key/values
df = self._spark.sparkContext.parallelize(jsons).map(lambda x: json.dumps(x))
df = self._spark.read.json(df, multiLine=True)
logger.info("Df count: %s", df.count())
logger.info("Df table schema: %s", df.printSchema())
columns = ['uid', 'estimatedGrade.science', 'estimatedGrade.english', 'actualGrade.science', 'actualGrade.english']
df.select([column_header for column_header in df.columns if column_header in columns])
我只能选择uid,这是顶级字段,所以我猜我在选择嵌套值时出错了。
请帮忙。
df.columns只返回顶级列名。您可以通过在使用您提供的数据示例创建的df上运行它来检查这一点。它返回:['有值','估计值','uid']
。
目前我知道的唯一好方法是迭代df.schema。递归地,如果字段为StructType,则检查字段的嵌套列。
这里有一些代码,应该可以帮助您在路上。首先,导入结构类型
from pyspark.sql.types import StructType
然后,设置一些辅助函数。第一个是递归返回所有列名,包括使用点表示法的嵌套列。第二个辅助函数将列表展平。
def get_schema_field_name(field, parent=None):
if type(field.dataType) == StructType:
if parent == None:
prt = field.name
else:
prt = parent+"."+field.name # using dot notation
res = []
for i in field.dataType.fields:
res.append(get_schema_field_name(i, prt))
return res
else:
if parent==None:
res = field.name
else:
res = parent+"."+field.name
return res
def flatten(S):
if S == []:
return S
if isinstance(S[0], list):
return flatten(S[0]) + flatten(S[1:])
return S[:1] + flatten(S[1:])
然后,循环遍历您的模式并使用上面的方法获取所有列(包括嵌套列)。
column_list = []
for j in df.schema:
column_list.append(get_schema_field_name(j))
column_list = flatten(column_list)
最后,替换select语句中的df.columns部分。
columns = ['uid', 'estimatedGrade.science', 'estimatedGrade.english', 'actualGrade.science', 'actualGrade.english']
df.select([column_header for column_header in column_list if column_header in columns])
在测试Databricks 8.2(Spark 3.1.1)时工作。
我还使用这种方法来列出Spark中所有表中的所有列名,因此请随时查看本文以获得进一步参考:https://medium.com/helmes-people/how-to-view-all-databases-tables-and-columns-in-databricks-9683b12fee10
这是我在mongo的文档: 我需要更新内部房间阵列中的对象。我尝试了一个选择匹配元素的查询没有语法错误,但出现了一个错误: “errmsg”:“字段“calendar.0.rooms.0.price”必须是数组,但在文档{u id:ObjectId('5cd26a886458720f7a66a3b8')中为字符串类型”, 这是我的疑问: 这是我在StackOverflow中找到的一些参考,但没有帮
我有一个带有varchar类型电子邮件字段的表。 除表中的一封电子邮件外,其他所有电子邮件都能正常工作。
问题内容: 我有一个看起来像这样的mysql表: 我希望能够查询得到多行,每一列一个。例如: 这会给我: 我需要用什么来制定这样的查询? 问题答案: 您必须首先将每个指定字段的数据应用于结果。 询问 签 出 SQLFIDDLE
问题内容: 我创建了一个客户c#DropDownList控件,可以将其内容呈现为optgroup(不是从头开始,我编辑了一些在Internet上找到的代码,尽管我确切地了解了它的作用),并且工作正常。 但是,我现在遇到一种情况,我需要在下拉菜单中有两个缩进级别,即 但是,在上面的示例代码段中,它呈现的缩进量与相同。 有没有一种方法可以产生我想要的嵌套optgroup行为? 问题答案: 好的,如果有
我的文档具有如下所示的嵌套字段: 嵌套字段的映射如下所示: 在切换到ElasticSearch2之前,我使用aggs查询了没有结果的文档。以下是查询的聚合部分: 现在我切换到了ElasticSerach2,它只计算所有文档。我已经尝试了不同的方法,比如计算所有文档和计算结果,这样我就可以减去结果,但是 总是0 如何正确筛选/计算嵌套字段?
我有以下xml 我需要将其反序列化为以下POJO: 这里的问题是被包装在元素中