当前位置: 首页 > 知识库问答 >
问题:

最后分区上的Spark转换非常慢

蒲昀
2023-03-14
dataRDD_of_20000_partitions.aggregateByKey(zeroOp)(seqOp, mergeOp)
    .mapValues(...)
    .coalesce(1000, true)
    .collect()

这里,aggregatebykey对我前面分配的键(1到N)进行聚合。我可以合并分区,因为我知道我需要的分区数量,并将coalesce shuffle设置为true,以平衡分区。

有人能指出这些转换可能导致RDD最后几个分区处理缓慢的一些原因吗?我想知道这是否与数据偏斜有关。

共有1个答案

闾丘博
2023-03-14

我有一些观察。

>

  • 您应该有正确数量的分区,以避免数据偏斜。我怀疑您的分区少于所需的分区数。看看这个博客。

    collect()调用,将整个RDD提取到单个驱动程序节点中。

  •  类似资料:
    • 我正在查看代码中的一个错误,其中一个数据框被分成了太多的分区(超过700个),当我试图将它们重新分区为48个时,这会导致太多的洗牌操作。我不能在这里使用coalesce(),因为我想在重新分区之前首先拥有更少的分区。 我正在寻找减少分区数量的方法。假设我有一个 spark 数据帧(具有多个列),分为 10 个分区。我需要根据其中一列进行 orderBy 转换。完成此操作后,生成的数据帧是否具有相同

    • 我有一个带有复合分区键的 cassandra 表(time_bucket 时间戳,节点 int)。time_bucket值是插入数据的时间,秒转换为 00,节点值范围为 0 到 100 spark作业每分钟运行一次,从表中提取数据。该表包含近2500万条记录,每分钟都有记录被添加。 如果我的 Spark 作业每次运行时都选择所有记录,则作业将在 2 分钟内完成。但是如果我使用: s < code

    • 我在代码上收到一条错误消息,以查找支付200美元佣金的员工的总工资。一旦输入了所有员工的总销售额,就应该打印出属于每个不同薪酬类别的员工销售额。下面是代码: 这是我收到的确切错误消息: 我相信这与双重转换有关,但我不确定这有什么问题?有没有人能帮我搞清楚哪里出问题了(它编译没有错误)?我也尝试过只有双精度(包括数组),但这并没有解决问题。

    • 我有以下制表符分隔的示例数据集: 我正在对此数据运行一些转换,最终数据位于spark dataset中。之后,我用“period”分区将该数据集写入s3。因为我也希望在s3文件中使用period,所以我正在从from period列创建另一列“datasetperiod”。 我的scala函数来保存TSV数据集。 在S3上保存数据集的Scala代码。为S3上的分区添加新列datasetPeriod

    • 问题内容: 简单的问题,但我花了一个多小时。我的代码如下。我需要使SomeClass sc动态化。因此,您将类名作为字符串传递给函数,然后使用该类代替静态someClass。怎么做呢? 我想要的是 编辑:上面是简化。实际的代码是这个 另一个简单的改写:我使用request.getRequest()得到一个对象。我不知道那个物体是什么。因此,我需要将其强制转换为提供的类字符串名称。怎么做?就这样。–

    • 我是scala/sark世界的新手,最近开始了一项任务,它读取一些数据,处理数据并将其保存在S3上。我阅读了一些关于stackoverflow的主题/问题,这些主题/问题涉及重分区/合并性能和最佳分区数(如本例)。假设我有正确的分区数,我的问题是,在将rdd转换为数据帧时,对它进行重新分区是个好主意吗?下面是我的代码目前的样子: 这是我打算做的(过滤后重新分区数据): 我的问题是,这样做是个好主意