当前位置: 首页 > 知识库问答 >
问题:

合并排序连接如何在Spark中工作,为什么它会抛出OOM?

燕经国
2023-03-14

我想深入理解Spark中合并排序连接的概念。我理解总体思路:这与合并排序算法中的方法相同:取2个排序的数据集,比较第一行,写最小的一行,重复。我还了解如何实现分布式合并排序。

但我无法了解它是如何在Spark中实现的,涉及到分区和执行器的概念。

这是我的看法。

  1. 考虑到我需要连接2个表A和B。如果这很重要,可以通过Spark SQL从配置单元读取表
  2. 默认情况下,Spark使用200个分区
  3. Spark然后将计算连接键范围(从minKey(A,B)到maxKey(A,B)),并将其拆分为200个部分。这两个数据集将按键范围划分为200个部分:A分区和B分区
  4. 与同一密钥相关的每个A分区和每个B分区都被发送到同一执行器,并在那里相互分离进行排序
  5. 现在,200个执行者可以将200个A分区与200个B分区连接起来,并保证它们共享相同的密钥范围
  6. 通过合并排序算法进行连接:从A分区中获取最小的键,与B分区中的最小键进行比较,写入匹配或迭代
  7. 最后,我有200个连接的数据分区

这有意义吗?

问题:键偏斜。如果某个键范围包含50%的数据集键,一些执行程序会受到影响,因为太多的行会进入同一个分区。它甚至可能在OOM中失败,同时尝试在内存中对太大的A分区或B分区进行排序(我不明白为什么Spark不能像Hadoop那样对磁盘溢出进行排序?...)或者它失败是因为它试图将两个分区读入内存以进行连接?

所以,这是我的猜测。你能纠正我并帮助理解Spark的工作方式吗?

共有2个答案

曹光霁
2023-03-14

为什么Spark不能像Hadoop那样对磁盘溢出进行排序?

Spark merge sort join不会溢出到磁盘。看看Spark SortMergeJoinExec类,它使用ExternalAppendOnlyUnsafeRowArray,描述如下:

一个仅追加的数组,它严格地将内容保存在内存数组中,直到numRowsInMemoryBufferThreshold达到后,它将切换到一种模式,该模式将在numRowsSpillThreshold满足后刷新到磁盘(如果内存消耗过多,则在此之前)

这与在Web UI的连接操作期间看到任务溢出到磁盘的体验是一致的。

为什么[合并排序连接]可以抛出OOM?

从Spark内存管理概述:

Spark的洗牌操作(sortByKey、groupByKey、reduceByKey、join等)在每个任务中构建一个哈希表来执行分组,分组通常可能很大。这里最简单的修复方法是提高并行度,以便每个任务的输入集更小。

i、 e.在连接的情况下,增加火花。sql。洗牌分区(partitions),以减小分区和生成的哈希表的大小,并相应地降低OOM的风险。

孙莫希
2023-03-14

这是MPP数据库上连接的常见问题,Spark也不例外。正如您所说,要执行连接,同一个连接键值的所有数据都必须进行同位,因此,如果连接键值上的分布不均匀,则数据分布不均匀,一个节点过载。

如果连接的一侧很小,则可以使用地图侧连接。Spark query planner确实应该为您做到这一点,但它是可调的-不确定这是否最新,但它看起来很有用。

您是否在两个表上都运行了分析表?

如果两边都有一个不会破坏连接语义的键,那么可以将其包含在连接中。

 类似资料:
  • 我的任务是使用用户填充的int数组合并两个数组,我们必须假设用户最多有10000个输入,用户输入负数停止。然后将数组从最小到最大排序并打印出来。起初我以为这很容易,但当我完成时,我开始得到如下输出: 正如你所看到的,这六个是不合适的,我不知道如何修复它。这是源代码,我已经包括了大量的评论,因为我真的希望你们能帮助我尽你们最大的能力。如果可以使用相同的技术而不在代码中实现新的技术和方法,请这样做。我

  • 我正在复习快速排序的实现(来自CLRS第3版)。我发现数组的递归除法从低索引到中1,然后从中1到高。 合并排序的实现如下所示: 由于它们都使用相同的除法策略,为什么快速排序忽略中间元素从到和到没有包含,而mergesort包含?

  • 我正在维基百科上阅读关于外部排序的文章,我需要理解为什么两阶段合并比一阶段合并更有效。 Wiki:但是,单次合并有一个限制。随着区块数量的增加,我们将内存分成更多的缓冲区,因此每个缓冲区都较小,因此我们必须进行许多较小的读取,而不是较少的较大读取。 因此,对于100 MB内存中的50 GB的排序,使用单个合并过程是没有效率的:磁盘需要用500个数据块中的每个数据块(我们一次从每个数据块读取100M

  • 我正在尝试实现一个不能正常工作的mergesort算法。合并排序的工作方式如下: i、 将未排序的列表划分为n个子列表,每个子列表包含1个元素(1个元素的列表被视为已排序)。 ii.重复合并子列表以产生新排序的子列表,直到只剩下1个子列表。这将是已排序的列表。下面提供了实现。 最初,递归调用此方法,直到只有一个元素。 这是提供的合并方法。 这里的问题是什么? 输入是输出是

  • 我第一次用一个辅助数组实现了合并排序,以尝试使用JavaScript实现可视化。这似乎应该是有效的,但它不是。任何帮助或提示将不胜感激。 编辑:我忘了包括它不起作用的情况。它们是: 输入:[4, 2, 5, 6, 7, 7]输出:[4, 2, 5, 6, 7, 7] 输入:[6,6,6,4,6,2]输出:[4,6,6,6,6,2] 输入:[6, 7, 3, 10, 7, 9, 6, 3, 4, 6

  • 在index.hpp中,我创建了一个具有多个数据成员的类,如、等。我在类外部定义了一个构造函数。在program.cpp中,我创建了一个名为SAM的对象。当我试图编译它时,它显示错误。什么原因? Program.cpp index.hpp 错误信息