当前位置: 首页 > 知识库问答 >
问题:

对源Spark数据帧列重新排序,以匹配PySpark中目标数据帧的顺序

颛孙炜
2023-03-14

我有一个来自目标表的固定火花数据帧顺序:

目标火花数据帧(列1字符串、列2整数、列3字符串、列4双精度)

现在,如果源数据顺序混乱:

源火花数据帧(列3字符串、列2整数、列4双精度、列1字符串)。

如何使用PySpark重新排列源DataFrame以匹配目标DataFrame的列顺序?

源Spark数据帧应按如下方式重新排序,以匹配目标数据帧:

输出:

更新了源 Spark 数据帧(col1 字符串、列 2 整型、col3 字符串、col4 双精度型)

方案 2:

源数据帧=[a、c、d、e]

目标数据帧=[a,b,c,d]

在这种情况下,源数据帧应重新排列为[a,b,c,d,e]

    < li >保持目标列的顺序 < li >更改源列的数据类型以匹配目标数据框架 < li >在末尾添加新列 < li >如果源列中不存在目标列,则仍应添加该列,但用< code>null值填充。

在上面的示例中,在重新排列源数据帧后,它将添加一个带有值的 b 列。

这将确保当我们使用<code>saveAsTable

共有1个答案

戚衡
2023-03-14

假设您有以下两个DataFrames:

source.show()
#+---+---+---+---+
#|  a|  c|  d|  e|
#+---+---+---+---+
#|  A|  C|  0|  E|
#+---+---+---+---+

target.show()
#+---+---+---+---+
#|  a|  b|  c|  d|
#+---+---+---+---+
#|  A|  B|  C|  1|
#+---+---+---+---+

具有以下数据类型:

print(source.dtypes)
#[('a', 'string'), ('c', 'string'), ('d', 'string'), ('e', 'string')]

print(target.dtypes)
#[('a', 'string'), ('b', 'string'), ('c', 'string'), ('d', 'int')]

如果我正确理解您的逻辑,以下列表理解应该适用于您:

from pyspark.sql.functions import col, lit

new_source = source.select(
    *(
        [
            col(t).cast(d) if t in source.columns else lit(None).alias(t) 
            for t, d in target.dtypes
        ] +
        [s for s in source.columns if s not in target.columns]
    )
)

new_source.show()

new_source.show()
#+---+----+---+---+---+
#|  a|   b|  c|  d|  e|
#+---+----+---+---+---+
#|  A|null|  C|  0|  E|
#+---+----+---+---+---+

结果输出将具有以下模式:

new_source.printSchema()
#root
# |-- a: string (nullable = true)
# |-- b: null (nullable = true)
# |-- c: string (nullable = true)
# |-- d: integer (nullable = true)
# |-- e: string (nullable = true)

如您所见,列d的数据类型从string更改为integer以匹配目标表的模式。

逻辑是首先遍历< code>target中的列,如果它们存在于< code>source.columns中,则选择它们;如果不存在,则创建一个< code>null的列。然后添加< code>source中不存在于< code>target中的列。

 类似资料:
  • 我有一个超过200列的。问题在于订单生成时的状态 我需要按如下方式重新排列这些列: 在Python中有什么方法可以做到这一点吗?

  • 我有一个像这样的数据框- 我有一个这样的列表- 现在,我想根据列名列表对数据框进行排序 因此,新的数据框将有列名称-

  • 我想使用PySpark创建spark数据帧,为此我在PyCharm中运行了以下代码: 但是,它会返回此错误: 使用 Spark 的默认 log4j 配置文件:组织/缓存/火花/log4j-defaults.属性 将默认日志级别设置为“WARN”。要调整日志记录级别,请使用 sc.setLogLevel(新级别)。对于 SparkR,请使用 setLogLevel(新级别)。18/01/08 10:

  • 我正在用PySpark DataFrames分析一些数据。假设我有一个正在聚合的数据帧< code>df: 这将给我: 聚合工作得很好,但我不喜欢新的列名。有没有办法将此列重命名为人类可以从方法中读取的内容?也许更类似于中的操作:

  • 基本上,它应该在步骤中找到指标为43且步骤=1的行,然后将该值放在新列中,在这种情况下,它将是“Gross value Added”。任何帮助都将非常感谢!

  • 我正在使用数据帧读取。拼花地板文件,但不是将它们转换为rdd来进行我的正常处理,我想对它们进行处理。 所以我有我的文件: 即使从数据帧转换为RDD,我也会收到以下错误: :26:错误:值zipWithIndex不是组织的成员。阿帕奇。火花sql。一行 任何人都知道如何做我正在尝试做的事情,本质上是尝试获取值和列索引。 我在想这样的事情: 但最后一部分被卡住了,因为不知道如何做zipWithInde