当前位置: 首页 > 知识库问答 >
问题:

如何在 PySpark 中联接两个数据帧时解析重复的列名?

公孙宏远
2023-03-14

我有一个完全相同的文件A和B。我试图在这两个数据帧上执行内部和外部连接。因为我将所有的列都作为重复的列,所以现有的答案没有任何帮助。我遇到的其他问题包含一两个重复的列,我的问题是整个文件都是彼此重复的:无论是数据还是列名。

我的代码

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql import DataFrameReader, DataFrameWriter
from datetime import datetime

import time

# @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

print("All imports were successful.")

df = spark.read.orc(
    's3://****'
)
print("First dataframe read with headers set to True")
df2 = spark.read.orc(
    's3://****'
)
print("Second dataframe read with headers set to True")

# df3 = df.join(df2, ['c_0'], "outer")

# df3 = df.join(
#     df2,
#     df["column_test_1"] == df2["column_1"],
#     "outer"
# )

df3 = df.alias('l').join(df2.alias('r'), on='c_0') #.collect()

print("Dataframes have been joined successfully.")
output_file_path = 's3://****'
)

df3.write.orc(
    output_file_path
)
print("Dataframe has been written to csv.")
job.commit()

我面临的错误是:

pyspark.sql.utils.AnalysisException: u'Duplicate column(s): "c_4", "c_38", "c_13", "c_27", "c_50", "c_16", "c_23", "c_24", "c_1", "c_35", "c_30", "c_56", "c_34", "c_7", "c_46", "c_49", "c_57", "c_45", "c_31", "c_53", "c_19", "c_25", "c_10", "c_8", "c_14", "c_42", "c_20", "c_47", "c_36", "c_29", "c_15", "c_43", "c_32", "c_5", "c_37", "c_18", "c_54", "c_3", "__created_at__", "c_51", "c_48", "c_9", "c_21", "c_26", "c_44", "c_55", "c_2", "c_17", "c_40", "c_28", "c_33", "c_41", "c_22", "c_11", "c_12", "c_52", "c_6", "c_39" found, cannot save to file.;'
End of LogType:stdout

共有3个答案

万修然
2023-03-14

我做了这样的事情,但在斯卡拉中,你也可以将其转换为pyspark...

>

  • 重命名每个数据帧中的列名

    dataFrame1.columns.foreach(columnName => {
      dataFrame1 = dataFrame1.select(dataFrame1.columns.head, dataFrame1.columns.tail: _*).withColumnRenamed(columnName, s"left_$columnName")
    })
    
    dataFrame1.columns.foreach(columnName => {
      dataFrame2 = dataFrame2.select(dataFrame2.columns.head, dataFrame2.columns.tail: _*).withColumnRenamed(columnName, s"right_$columnName")
    })
    

    现在,通过提及列名来< code>join

    resultDF = dataframe1.join(dataframe2, dataframe1("left_c_0") === dataframe2("right_c_0"))
    

  • 司马念
    2023-03-14

    下面是一个帮助函数,通过添加别名连接两个数据帧:

    def join_with_aliases(left, right, on, how, right_prefix):
        renamed_right = right.selectExpr(
            [
                col + f" as {col}_{right_prefix}"
                for col in df2.columns
                if col not in on
            ]
            + on
        )
        right_on = [f"{x}{right_prefix}" for x in on]
        return left.join(renamed_right, on=on, how=how)
    

    这里有一个如何使用它的例子:

    df1 = spark.createDataFrame([[1, "a"], [2, "b"], [3, "c"]], ("id", "value"))
    df2 = spark.createDataFrame([[1, "a"], [2, "b"], [3, "c"]], ("id", "value"))
    
    join_with_aliases(
       left=df1,
       right=df2,
       on=["id"],
       how="inner",
       right_prefix="_right"
    ).show()
    
    +---+-----+------------+
    | id|value|value_right|
    +---+-----+------------+
    |  1|    a|           a|
    |  3|    c|           c|
    |  2|    b|           b|
    +---+-----+------------+
    
    季凡
    2023-03-14

    这里没有捷径。Pyspark期望左数据帧和右数据帧具有不同的字段名集(连接键除外)。

    一种解决方案是在每个字段名称前面加上“left_”或“right_”,如下所示:

    # Obtain columns lists
    left_cols = df.columns
    right_cols = df2.columns
    
    # Prefix each dataframe's field with "left_" or "right_"
    df = df.selectExpr([col + ' as left_' + col for col in left_cols])
    df2 = df2.selectExpr([col + ' as right_' + col for col in right_cols])
    
    # Perform join
    df3 = df.alias('l').join(df2.alias('r'), on='c_0')
    
     类似资料:
    • 假设我有两个数据帧,具有不同级别的信息,如下所示: 我想加入df1和df2,并将“值”信息传递给df2:一天中的每一小时都将获得“日”值。 预期产出:

    • 我有两个数据帧df1(Employee表) 和 在我连接了df1.dept_id和df2.id上的这两个表之后: 同时将其保存在文件中, 它给出错误: 我读过有关使用字符串序列来避免列重复的信息,但这适用于要对其执行连接的列。我需要对未连接的列具有类似的功能。 有没有一种直接的方法可以将重复列嵌入表名以便保存? 我想出了一个解决方案,匹配两个df的列,并重命名重复的列,将表名附加到列名上。但是有直

    • 我有两个数据帧,DF1和DF2,DF1是存储来自DF2的任何附加信息的主机。 假设DF1是以下格式, DF2包含DF1中已经存在的2个条目和两个新条目。(itemId和item被视为一个组,可以被视为连接的键) 我需要合并两个数据框,以便增加现有项目计数并插入新项目。 结果应该是这样的: 我有一种方法可以做到这一点,但不确定这种方法是否有效或正确

    • 我有一个数据框,它是带有json字符串的json列。下面是一个例子。共有3列-a、b、c。c列为stringType 我想把它们变成数据帧(pivot)的列。下面的例子-

    • 我有两个数据帧,我需要连接一列,如果id包含在第二个数据帧的同一列中,则只从第一个数据帧中获取行: df1: 断续器: 期望输出: 我已经用df1.join(df2("id ")," left ")试过了,但是给我错误:“Dataframe”对象是不可调用的。

    • 问题内容: 我在Databricks工作。 我有一个包含500行的数据框,我想创建两个包含100行的数据框,另一个包含剩余的400行。 我尝试了以下操作,但收到错误消息 问题答案: 最初,我误会了,并以为您想分割这些列。如果要选择行的子集,一种方法是使用创建索引列。从文档: 保证生成的ID是单调递增且唯一的,但不是连续的。 您可以使用此ID对数据框进行排序,并使用该ID对其子集进行排序,以确保准确