当前位置: 首页 > 面试题库 >

Spark-将具有不同架构(列名称和序列)的DataFrame合并/合并到具有Master通用架构的DataFrame

皇甫鸿远
2023-03-14
问题内容

我尝试通过df.schema()将模式作为通用模式并将所有CSV文件加载到该模式,但是对于分配的模式失败,其他CSV文件的标题不匹配

任何建议,将不胜感激。如函数或Spark脚本中一样


问题答案:

据我了解。您想要合并/合并具有不同架构的文件(尽管是一个主架构的子集)..我编写了此函数UnionPro,我认为它很适合您的要求-

编辑 -添加了Pyspark版本

def unionPro(DFList: List[DataFrame], spark: org.apache.spark.sql.SparkSession): DataFrame = {

    /**
     * This Function Accepts DataFrame with same or Different Schema/Column Order.With some or none common columns
     * Creates a Unioned DataFrame
     */

    import spark.implicits._

    val MasterColList: Array[String] = DFList.map(_.columns).reduce((x, y) => (x.union(y))).distinct

    def unionExpr(myCols: Seq[String], allCols: Seq[String]): Seq[org.apache.spark.sql.Column] = {
      allCols.toList.map(x => x match {
        case x if myCols.contains(x) => col(x)
        case _                       => lit(null).as(x)
      })
    }

    // Create EmptyDF , ignoring different Datatype in StructField and treating them same based on Name ignoring cases

    val masterSchema = StructType(DFList.map(_.schema.fields).reduce((x, y) => (x.union(y))).groupBy(_.name.toUpperCase).map(_._2.head).toArray)

    val masterEmptyDF = spark.createDataFrame(spark.sparkContext.emptyRDD[Row], masterSchema).select(MasterColList.head, MasterColList.tail: _*)

    DFList.map(df => df.select(unionExpr(df.columns, MasterColList): _*)).foldLeft(masterEmptyDF)((x, y) => x.union(y))

  }

这是它的样本测试-

    val aDF = Seq(("A", 1), ("B", 2)).toDF("Name", "ID")
    val bDF = Seq(("C", 1), ("D", 2)).toDF("Name", "Sal")
    unionPro(List(aDF, bDF), spark).show

输出为-

+----+----+----+
|Name|  ID| Sal|
+----+----+----+
|   A|   1|null|
|   B|   2|null|
|   C|null|   1|
|   D|null|   2|
+----+----+----+

这是它的Pyspark版本-

def unionPro(DFList: List[DataFrame], caseDiff: str = "N") -> DataFrame:
    """
    :param DFList:
    :param caseDiff:
    :return:
    This Function Accepts DataFrame with same or Different Schema/Column Order.With some or none common columns
    Creates a Unioned DataFrame
    """
    inputDFList = DFList if caseDiff == "N" else [df.select([F.col(x.lower) for x in df.columns]) for df in DFList]

    # "This Preserves Order ( OrderedDict0-----------------------------------"
    from collections import OrderedDict
    ## As columnNames ( String) are hashable
    masterColStrList = list(OrderedDict.fromkeys(reduce(lambda x, y: x + y, [df.columns for df in inputDFList])))

    # Create masterSchema ignoring different Datatype & Nullable  in StructField and treating them same based on Name ignoring cases
    ignoreNullable = lambda x: StructField(x.name, x.dataType, True)

    import itertools


    # to get reliable results by groupby iterable must be sorted by grouping key
    # in sorted function key function( lambda) must be passed as named argument ( keyword argument)
    # but by Sorting now, I lost original order of columns. Hence I'll use masterColStrList while returning final DF
    masterSchema = StructType([list(y)[0] for x, y in itertools.groupby(
        sorted(reduce(lambda x, y: x + y, [[ignoreNullable(x) for x in df.schema.fields] for df in inputDFList]),
               key=lambda x: x.name),
        lambda x: x.name)])

    def unionExpr(myCols: List[str], allCols: List[str]) -> List[Column]:
        return [F.col(x) if x in myCols else F.lit(None).alias(x) for x in allCols]

    # Create Empty Dataframe
    masterEmptyDF = spark.createDataFrame([], masterSchema)

    return reduce(lambda x, y: x.unionByName(y),
                  [df.select(unionExpr(df.columns, masterColStrList)) for df in inputDFList], masterEmptyDF).select(
        masterColStrList)


 类似资料:
  • 问题内容: 我有两个表(表A和表B)。 它们具有不同的列数-假设表A具有更多列。 如何合并这两个表,并为表B没有的列获取空值? 问题答案: 为具有较少列的表添加额外的列作为null

  • 如何将两个熊猫DataFrames合并到两个具有不同名称的列上,并保留其中一个列? 这提供了一个像这样的数据帧 但是很明显,我正在合并和,所以它们是相同的。我想让它看起来像这样。有什么干净的方法可以做到这一点吗? 我唯一能想到的方法是在合并之前将列重新命名为相同的列,或者在合并之后删除其中一个列。如果熊猫自动掉落其中一只,我会很高兴,或者我可以做类似的事情

  • 问题内容: 我可以在其他具有相同列名的数据框的右边追加一个数据框吗 问题答案: 您可以像这样连接两个数据框。 如果您正在寻找联盟,则可以执行以下操作。 Spark 2.0,已重命名为

  • 问题内容: 我肯定在这里错过了一些简单的事情。尝试在熊猫中合并具有相同列名的两个数据框,但右侧的数据框具有一些左侧没有的列,反之亦然。 我试着加入外部联接: 但这产生了: 我还指定了一个要连接的单列(例如on =“ id”),但是它复制了除“ id”以外的所有列,例如attr_1_x,attr_1_y,这并不理想。我也将整个列列表(有很多)传递给了“ on”: 产生: 我想念什么?我想获得一个带有

  • 我正在尝试创建一个传递给from_jsonAPI的structType模式,以便解析存储为JSON字符串的列。JSON数据包含一个Map,其中包含String键和struct类型的值,但每个struct的模式取决于键。 考虑这个JSON示例,其中“数据”列是一个具有值和的Map,并且每个值的架构都不同: 对于键“名称”,结构值有一个成员字段“first”。对于键“地址”,结构值有两个成员字段“St

  • 我讨论这个问题已经有一段时间了,但没有结果。这几乎是一个重复的问题,至少有一个其他的问题在这里,但我不能完全弄清楚如何做,确切地说,我正在寻找从网上相关的答案。 我有一个熊猫数据帧(我们称之为),看起来像: 其中是索引。我想将其转换为类似以下内容: 因此,基本上,每个对应于相同索引的都应该组合到一个列表(或一个集合,或一个元组)中,该列表成为对应索引的。并且,如图所示,在相似的索引行之间是相同的,