当前位置: 首页 > 知识库问答 >
问题:

使用Python的duce()加入多个PySpark DataFrames

南宫浩皛
2023-03-14

有谁知道为什么使用Python3的fuctools.reduce()会导致在加入多个PySpark数据帧时的性能比仅仅使用for循环迭代加入相同的数据帧更差?具体来说,这会导致速度大幅变慢,然后出现内存不足错误:

def join_dataframes(list_of_join_columns, left_df, right_df):
    return left_df.join(right_df, on=list_of_join_columns)

joined_df = functools.reduce(
    functools.partial(join_dataframes, list_of_join_columns), list_of_dataframes,
)

然而这个没有:

joined_df = list_of_dataframes[0]
joined_df.cache()
for right_df in list_of_dataframes[1:]:
    joined_df = joined_df.join(right_df, on=list_of_join_columns)

任何想法将不胜感激。谢谢!

共有2个答案

东郭自珍
2023-03-14

一个原因是reduce或fold通常在功能上是纯的:每次累加操作的结果不会写入内存的同一部分,而是写入内存的一个新块。

原则上,垃圾收集器可以在每次累积后释放前一个块,但如果没有,您将为累加器的每个更新版本分配内存。

高宇定
2023-03-14

只要您使用CPython(不同的实现可以,但实际上不应该,在这种特定情况下表现出明显不同的行为)。如果你看一下< code>reduce的实现,你会发现它只是一个带有最少异常处理的for循环。

核心完全等效于您使用的循环

for element in it:
    value = function(value, element)

没有证据支持任何特殊行为的说法。

此外,简单的帧数测试Spark连接的实际限制(连接是Spark中最昂贵的操作之一)

dfs = [
    spark.range(10000).selectExpr(
        "rand({}) AS id".format(i), "id AS value",  "{} AS loop ".format(i)
    )
    for i in range(200)
]

显示直接for循环之间的定时没有显着差异

def f(dfs):
    df1 = dfs[0]
    for df2 in dfs[1:]:
        df1 = df1.join(df2, ["id"])
    return df1

%timeit -n3 f(dfs)                 
## 6.25 s ± 257 ms per loop (mean ± std. dev. of 7 runs, 3 loops each)

和< code>reduce调用

from functools import reduce

def g(dfs):
    return reduce(lambda x, y: x.join(y, ["id"]), dfs) 

%timeit -n3 g(dfs)
### 6.47 s ± 455 ms per loop (mean ± std. dev. of 7 runs, 3 loops each)

类似地,整体 JVM 行为模式在 for 循环之间是可比较的

用于循环CPU和内存使用-VisualVM

和< code >减少

减少 CPU 和内存使用率 - 可视化 VM

最后,两者都生成相同的执行计划

g(dfs)._jdf.queryExecution().optimizedPlan().equals( 
    f(dfs)._jdf.queryExecution().optimizedPlan()
)
## True

这表明在评估计划和可能发生OOM时没有区别。

换句话说,相关性并不意味着因果关系,并且观察到的性能问题不太可能与您用于组合 DataFrame 的方法相关。

 类似资料:
  • 我试图导入多个CSV文件在一个特定的路径到数据集HDF5文件使用此代码: 但我有一个错误: 第15行,在帧=pd.concat(li,轴=0,ignore_index=True)文件/usr/本地/lib/python3.7/site-包/熊猫/核心/重塑/concat.py,第281行,在统一排序=排序,文件/usr/本地/lib/python3.7/site-包/熊猫/Core/reshape

  • 我有4个rdd类型的RDD:((int,int,int),values),我的rdds是 如何加入RDD,比如rdd1加入“A”上的rdd2;rdd1加入“B”上的rdd2;rdd1加入“C”上的rdd3 那么在Scala中的输出是? 例 输出应该是

  • 我想知道是否可以创建一个从多个表中获取数据的实体。 我有一个基于TableA的实体。 特殊性是,我还需要TableD的数据,它链接到TableC,它链接到TableB最后链接到TableA,带有TableA id 我完全知道我可以在表B、C、D上创建一个实体。但我不需要这些表上的任何数据,除了TableD。B和C只是“一条要走的路”。 http://img11.hostingpics.net/pi

  • 问题内容: 我需要从数组中的多个位置提取数据。 一个简单的数组是: 我对切片很熟悉。例如:- 会给我:- 但是,我无法摆脱多个层面。例如:- 给我 尽管搜索了两本Python书籍和Internet,但我无法确定要使用的语法。 问题答案: 您可以切片两次并加入它们。

  • 问题内容: 我的计算机上同时安装了Python 3.3和Python 2.7。python 3.3工作正常,但是当我尝试使用python 2.7运行某些程序时,它仍然引用python 3.3。 例如:如果输入,它将在3.3上运行并正常工作,但是如果输入,则会出现此错误: 注意:我已将Python 2.7和3.3 .exe分别重命名为python27.exe和python33.exe。 任何帮助将不

  • 问题内容: 我试图将来自多个文件的所有测试合并到一个文件中,如下所示: 我很确定这不是参加测试的最佳方式,我很难找到如何执行此操作的示例: 问题答案: 如果你想包含多个模块 到 您的层次结构就像你在你的问题做什么,你在做什么是相当多的 它 ,除非你想要写摩卡自定义测试装载机。编写自定义加载器不会比已有代码容易或使代码更清晰。 这是我将如何更改某些事情的示例。本示例中的子目录组织为: : 该功能只是