当前位置: 首页 > 知识库问答 >
问题:

Spark emr重新分配计算合并

邢璞
2023-03-14

我有一个新手火花问题。下面是我试图执行的程序:

dataset
 .join(anotherDataset)
 .repartition(7000)
 .flatmap(doComputation)
 .persist(StorageLevel.MEMORY_AND_DISK)
 .saveToS3()

我试图做的计算(doComput)是昂贵的,因为内存限制,我决定将数据集重新分区为7000个分区(即使我有1200个执行器,所以一次只能执行1200个)。然而,在完成计算后,我尝试写入s3,它大部分工作正常,但很少有任务最终超时,工作被重试。

1) 为什么在我保存在进行昂贵计算后生成的RDD时,整个作业都会被重试?

2) 我试图在持久化之后合并,但spark只是忽略了重新分区,只执行了500个任务:

dataset
 .join(anotherDataset)
 .repartition(7000)
 .flatmap(doComputation)
 .persist(StorageLevel.MEMORY_AND_DISK)
 .coalesce(500)
 .saveToS3()

有什么方法可以进行重新分区(7000)-计算(在这7000个分区上)-合并(500),然后写入s3吗?

共有1个答案

韩晋
2023-03-14
  1. 共享作业的DAG可视化?您应该在stderr中看到一些异常(丢失的执行者)等。
  2. coalesc应该在重新分区时最小化数据移动。它忽略了重新分区,因为coalesc在重新分区时最小化了洗牌。
 类似资料:
  • 这是小提琴。我想要的是在删除一行时重新计算总数。我试过这个,但不起作用: 下面是我的代码: null null 以下是工作的内容:http://jsfidle.net/joansuriel/qmhdl/50//

  • 定义一个Java方法,该方法将三个整数值(一个月、一天和一年)作为参数,并返回一年中该天的数量(1到365之间的整数,或者闰年为366)。您的方法应具有以下标题:public static int dayNum(int month,int day,int year)要计算给定日期的天数,请使用以下公式: A. day Num=31*(月-1)天 b.如果月份在2月(2)之后,则从day Num中减

  • 我正在 pyspark 中对 (x,y) 点的 RDD 实现范围查询。我将 xy 空间划分为一个 16*16 的网格(256 个单元格),并将 RDD 中的每个点分配给其中一个单元格。gridMappedRDD 是一个 PairRDD: 我将这个RDD分区为256个分区,使用: 范围查询是一个矩形框。我为我的网格对象准备了一个方法,它可以返回与查询范围重叠的单元格id列表。所以,我用这个作为过滤器

  • 我想根据属性“amount”(空字符串)将对象分成两部分 } 对此 我即将解决它,但经过多次尝试(推送、分配、映射),它仍然不起作用。谢谢。

  • 问题内容: 我有一个问题,我通过沿行轴串联(垂直堆叠)来生成熊猫数据框。 每个组成数据帧都有一个自动生成的索引(升序编号)。 串联后,我的索引被搞砸了:它的计数最多为n(其中n是相应数据帧的shape [0]),并在下一个数据帧从零重新开始。 我正在尝试“根据给定的当前顺序重新计算索引”或“重新索引”(或者我认为)。事实证明,这似乎并没有在做。 这是我尝试做的事情: 它失败并显示“无法从重复的轴重

  • 示例-现在假设我们有一个输入RDD输入,它在第二步中被过滤。现在我想计算过滤后的RDD中的数据大小,并考虑到块大小为128MB,计算需要多少分区才能重新分区 这将帮助我将分区数传递给重新分区方法。 问题 1.如何计算XX的值? SparkSQL /DataFrame有Q2.What类似的方法?