当前位置: 首页 > 知识库问答 >
问题:

如何在pyspark中按字母顺序排序嵌套结构的列?

章青青
2023-03-14

我有以下模式的数据。我想所有的列都应该按字母顺序排序。我希望它在pyspark数据帧中。

root
 |-- _id: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- address: struct (nullable = true)
 |    |-- pin: integer (nullable = true)
 |    |-- city: string (nullable = true)
 |    |-- street: string (nullable = true)

下面的代码仅对外部列进行排序,而不对嵌套列进行排序。

>>> cols = df.columns
>>> df2=df[sorted(cols)]
>>> df2.printSchema()

此代码后面的模式如下所示

root
 |-- _id: string (nullable = true)
 |-- address: struct (nullable = true)
 |    |-- pin: integer (nullable = true)
 |    |-- city: string (nullable = true)
 |    |-- street: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)

(因为id处有下划线,所以它首先出现)

我想要的架构如下。(甚至地址内的列也应该排序)

root
 |-- _id: string (nullable = true)
 |-- address: struct (nullable = true)
 |    |-- city: string (nullable = true)
 |    |-- pin: integer (nullable = true)
 |    |-- street: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)

提前谢谢。

共有2个答案

卫逸春
2023-03-14

您可以首先用nicecolname展平DF* synthax。对于嵌套展平,您可以使用以下内容:如何展平Spark数据帧中的结构?。

然后,在扁平的数据框上,您可以创建一个新的structCol,输入列排序为:

from pyspark.sql import functions as F
address_cols = df.select(F.col("address.*")).columns
df = df.withColumn(address, F.struct(*sorted([F.col(c) for c in address_cols])))
df2 = df[sorted(df.columns)]

彭鸿文
2023-03-14

这是一个解决方案,应该适用于任意深度嵌套的structType,它不依赖于对任何列名进行硬编码。

为了演示,我创建了以下稍微复杂一些的模式,其中在地址列中有第二级嵌套。假设您的数据帧模式如下:

df.printSchema()
#root
# |-- _id: string (nullable = true)
# |-- first_name: string (nullable = true)
# |-- last_name: string (nullable = true)
# |-- address: struct (nullable = true)
# |    |-- pin: integer (nullable = true)
# |    |-- city: string (nullable = true)
# |    |-- zip: struct (nullable = true)
# |    |    |-- last4: integer (nullable = true)
# |    |    |-- first5: integer (nullable = true)
# |    |-- street: string (nullable = true)

注意地址。包含2个无序子字段的zip字段。

您可以定义一个函数,该函数将递归地逐步遍历您的模式,并对字段进行排序以构建一个SQL选择表达式:

from pyspark.sql.types import StructType, StructField

def schemaToSelectExpr(schema, baseField=""):
    select_cols = []
    for structField in sorted(schema, key=lambda x: x.name):
        if structField.dataType.typeName() == 'struct':

            subFields = []
            for fld in sorted(structField.jsonValue()['type']['fields'], 
                              key=lambda x: x['name']):
                newStruct = StructType([StructField.fromJson(fld)])
                newBaseField = structField.name
                if baseField:
                    newBaseField = baseField + "." + newBaseField
                subFields.extend(schemaToSelectExpr(newStruct, baseField=newBaseField))

            select_cols.append(
                "struct(" + ",".join(subFields) + ") AS {}".format(structField.name)
            )
        else:
            if baseField:
                select_cols.append(baseField + "." + structField.name)
            else:
                select_cols.append(structField.name)
    return select_cols

在这个DataFrame的模式上运行它会产生(为了易读性,我将长的“地址”字符串分成两行):

print(schemaToSelectExpr(df.schema))
#['_id',
#'struct(address.city,address.pin,address.street,
#        struct(address.zip.first5,address.zip.last4) AS zip) AS address',
# 'first_name',
# 'last_name']

现在使用selectExpr对列进行排序:

df = df.selectExpr(schemaToSelectExpr(df.schema))
df.printSchema()
#root
# |-- _id: string (nullable = true)
# |-- address: struct (nullable = false)
# |    |-- city: string (nullable = true)
# |    |-- pin: integer (nullable = true)
# |    |-- street: string (nullable = true)
# |    |-- zip: struct (nullable = false)
# |    |    |-- first5: integer (nullable = true)
# |    |    |-- last4: integer (nullable = true)
# |-- first_name: string (nullable = true)
# |-- last_name: string (nullable = true)
 类似资料:
  • 问题内容: 如何获得按字段排序的struct输出? 问题答案: A 是字段的 有序 集合。该包使用反射来获取值的字段和值,并按照定义它们的顺序生成输出。 因此,最简单的解决方案是在已经按字母顺序排列字段的位置声明类型: 如果您不能修改字段的顺序(例如,内存布局很重要),则可以通过为结构类型指定一个方法来实现接口: 所述包检查所传递的值工具,并且如果是的话,调用它的方法,以产生输出。 该解决方案的缺

  • 我有以下模式的数据框架。我希望包括嵌套字段在内的所有列都应该按字母顺序排序。我想要scala spark。 当我使用schema.sort(_. name)排序时,我得到以下模式(嵌套数组和结构类型字段没有排序) 我想要的模式如下所示。(即使是metadata1(ArrayType)和metadata2(structType)内的列也应该排序) 提前感谢。

  • 问题内容: 我对python中的数据结构有些困惑;,和。我正在尝试对一个简单的列表进行排序,可能是因为无法识别无法排序的数据类型。 我的清单很简单: 我的问题是这是什么类型的数据,以及如何按字母顺序对单词进行排序? 问题答案: 表示列表,表示元组和表示字典。您应该看一下官方的Python教程,因为这些是Python编程的基础知识。 您所拥有的是一个字符串列表。您可以像这样对它进行排序: 如您所见,

  • 问题内容: 我是Java的新手,正在尝试按字母顺序排列术语的arrayList。(一个术语定义为一个字符和一个整数)(例如 我的代码如下: 为什么这不起作用?以及我该如何完成呢?我的arrayList称为术语,填充有Term类型 问题答案: 您在这行代码中遇到的问题。您的课程不是So 的类型,这两个对象将基于哪个属性或条件方法? 您必须使您的类为Comparable类型。和,根据您的需要覆盖该方法

  • 问题内容: 我有一些带有名称字段的文档。我正在使用名称字段的分析版本进行搜索和排序。排序是在一个级别上进行的,即名称首先是按字母顺序排序的。但是在字母列表中,名称是按字典顺序而不是按字母顺序排序的。这是我使用的映射: 谁能提供相同的解决方案? 问题答案: 深入研究Elasticsearch文档,我偶然发现了这一点: 排序和排序规则 不区分大小写的排序 假设我们有三个用户文档,其名称字段分别包含Bo