当前位置: 首页 > 知识库问答 >
问题:

仅在一个节点上运行的Dataproc Pyspark作业

赵景曜
2023-03-14

我的问题是我的pyspark作业没有并行运行。

代码和数据格式:
我的PySpark如下所示(显然是简化的):

class TheThing:
    def __init__(self, dInputData, lDataInstance):
        # ...
    def does_the_thing(self):
        """About 0.01 seconds calculation time per row"""
        # ...
        return lProcessedData

#contains input data pre-processed from other RDDs
#done like this because one RDD cannot work with others inside its transformation
#is about 20-40MB in size
#everything in here loads and processes from BigQuery in about 7 minutes
dInputData = {'dPreloadedData': dPreloadedData}

#rddData contains about 3M rows
#is about 200MB large in csv format
#rddCalculated is about the same size as rddData
rddCalculated = (
    rddData
        .map(
            lambda l, dInputData=dInputData: TheThing(dInputData, l).does_the_thing()
        )
)

llCalculated = rddCalculated.collect()
#save as csv, export to storage

PySpark的全部要点是并行运行这个东西,显然不是这样。我在各种集群配置中运行了这些数据,最后一个配置是大量的,这时我注意到它是单一节点使用的。因此,为什么我的工作需要很长时间才能完成,而时间似乎与集群规模无关。

所有较小数据集的测试在我的本地机器和集群上都没有问题。我真的只是需要高档。

编辑
我更改了
llcalculated=rddcalculated.collect()
#...保存到csv并导出

rddcalculated.saveastextfile(“gs://storage-bucket/results”)

而且只有一个节点还在做这项工作。

共有1个答案

呼延承平
2023-03-14

根据您是从GCS还是HDFS加载rdddata,默认拆分大小可能是64MB或128MB,这意味着您的200MB数据集只有2-4个分区。Spark这样做是因为典型的基本数据并行任务在数据中的搅动速度足够快,64MB-128MB意味着可能需要几十秒的处理时间,所以将并行度分成更小的块没有好处,因为启动开销将占主导地位。

在您的示例中,由于您与其他数据集连接,并且可能对每个记录执行相当重量级的计算,因此每MB的处理时间似乎要高得多。因此,您需要更多的分区,否则无论您有多少节点,Spark都不知道将其拆分为2-4个工作单元(如果每台机器都有多个核心,那么这些工作单元也可能被打包到一台机器上)。

因此,只需调用repartition:

rddCalculated = (
    rddData
        .repartition(200)
        .map(
            lambda l, dInputData=dInputData: TheThing(dInputData, l).does_the_thing()
        )
)
rddData = rddData.repartition(200)

或者,如果在读取时重新分区,您可能会有更好的效率:

rddData = sc.textFile("gs://storage-bucket/your-input-data", minPartitions=200)
 类似资料:
  • 我们有一个项目,其中我们有几个Jenkins作业:一种类型的作业运行交付(a), 一个只进行编译和单元测试的程序(B) 和 运行集成测试、静态代码分析等(C)的人。 我们在四个 Jenkins 节点(主节点三个从节点)上运行,我们的作业是声明性管道作业的混合,并在 Jenkins 作业中手动单击。 我们一次只想为每个节点运行一个集成测试构建。然而,我们希望运行尽可能多的交付(A)和代码质量(B)构

  • 不幸的是,我有一个工作是对RAM中的数据进行操作,但没有同步设置。我能看到的最简单的解决方案是让一个作业在所有节点上运行而不进行协调,就像使用一样。 是否有方法将作业配置为在LocalDataSourceJobStore下的所有节点上运行? 精确的定时并不重要,但作业必须每30分钟在每个节点上运行一次

  • 我编写了一个通用管道,它接受几个参数,以便将预定义GitHub存储库中的版本部署到特定节点。我想将这个管道托管在GitHub上的Jenkinsfile上,所以我将作业配置为使用“来自SCM的管道脚本”。事实是,当我尝试构建作业时,Jenkinsfile在每个节点上都被签出。是否可以仅在主节点上签出和执行Jenkinsfile,并按预期运行管道? 编辑:正如我之前所说,管道工作得很好,并且按照预期将

  • 问题内容: 我有一大堆具有相同标签的节点。我希望能够在Jenkins中运行一个作业,该作业在具有相同标签的 所有 节点上执行并同时执行。 我看到了在詹金斯中使用矩阵配置选项的建议,但我只能想到一个轴(标签组)。当我尝试运行该作业时,似乎它只执行一次而不是300次(该标签组中的每个节点1次)。 我的另一条轴应该是什么?还是…有一些插件可以做到这一点?我曾经尝试过NodeLabel参数插件,然后选择“

  • 问题内容: 我有一台运行带有220 GB内存的Ubuntu 14.04的服务器,我想在该服务器上运行elasticsearch。根据文档,一个节点不应具有超过32 GB的RAM,因此我想我必须在一台计算机上运行多个节点才能利用所有RAM。我正在考虑运行4个节点,每个节点具有28 GB的内存。 如何将其设置为ubuntu服务,以便例如在系统重新引导后自动恢复所有节点?我想我必须以某种方式编辑/etc

  • 我对在Mesos上测试Spark运行感兴趣。我在Virtualbox中创建了一个Hadoop2.6.0单节点集群,并在其上安装了Spark。我可以使用Spark成功地处理HDFS中的文件。