我想深入理解Spark中合并排序连接的概念。我理解总体思路:这与合并排序算法中的方法相同:取2个排序的数据集,比较第一行,写最小的一行,重复。我还了解如何实现分布式合并排序。
但我无法了解它是如何在Spark中实现的,涉及到分区和执行器的概念。
这是我的看法。
这有意义吗?
问题:键偏斜。如果某个键范围包含50%的数据集键,一些执行程序会受到影响,因为太多的行会进入同一个分区。它甚至可能在OOM中失败,同时尝试在内存中对太大的A分区或B分区进行排序(我不明白为什么Spark不能像Hadoop那样对磁盘溢出进行排序?...)或者它失败是因为它试图将两个分区读入内存以进行连接?
所以,这是我的猜测。你能纠正我并帮助理解Spark的工作方式吗?
为什么Spark不能像Hadoop那样对磁盘溢出进行排序?
Spark merge sort join不会溢出到磁盘。看看Spark SortMergeJoinExec类,它使用ExternalAppendOnlyUnsafeRowArray,描述如下:
一个仅追加的数组,它严格地将内容保存在内存数组中,直到numRowsInMemoryBufferThreshold达到后,它将切换到一种模式,该模式将在numRowsSpillThreshold满足后刷新到磁盘(如果内存消耗过多,则在此之前)
这与在Web UI的连接操作期间看到任务溢出到磁盘的体验是一致的。
为什么[合并排序连接]可以抛出OOM?
从Spark内存管理概述:
Spark的洗牌操作(sortByKey、groupByKey、reduceByKey、join等)在每个任务中构建一个哈希表来执行分组,分组通常可能很大。这里最简单的修复方法是提高并行度,以便每个任务的输入集更小。
i、 e.在连接的情况下,增加火花。sql。洗牌分区(partitions),以减小分区和生成的哈希表的大小,并相应地降低OOM的风险。
这是MPP数据库上连接的常见问题,Spark也不例外。正如您所说,要执行连接,同一个连接键值的所有数据都必须进行同位,因此,如果连接键值上的分布不均匀,则数据分布不均匀,一个节点过载。
如果连接的一侧很小,则可以使用地图侧连接。Spark query planner确实应该为您做到这一点,但它是可调的-不确定这是否最新,但它看起来很有用。
您是否在两个表上都运行了分析表?
如果两边都有一个不会破坏连接语义的键,那么可以将其包含在连接中。
我的任务是使用用户填充的int数组合并两个数组,我们必须假设用户最多有10000个输入,用户输入负数停止。然后将数组从最小到最大排序并打印出来。起初我以为这很容易,但当我完成时,我开始得到如下输出: 正如你所看到的,这六个是不合适的,我不知道如何修复它。这是源代码,我已经包括了大量的评论,因为我真的希望你们能帮助我尽你们最大的能力。如果可以使用相同的技术而不在代码中实现新的技术和方法,请这样做。我
我正在复习快速排序的实现(来自CLRS第3版)。我发现数组的递归除法从低索引到中1,然后从中1到高。 合并排序的实现如下所示: 由于它们都使用相同的除法策略,为什么快速排序忽略中间元素从到和到没有包含,而mergesort包含?
我正在维基百科上阅读关于外部排序的文章,我需要理解为什么两阶段合并比一阶段合并更有效。 Wiki:但是,单次合并有一个限制。随着区块数量的增加,我们将内存分成更多的缓冲区,因此每个缓冲区都较小,因此我们必须进行许多较小的读取,而不是较少的较大读取。 因此,对于100 MB内存中的50 GB的排序,使用单个合并过程是没有效率的:磁盘需要用500个数据块中的每个数据块(我们一次从每个数据块读取100M
我正在尝试实现一个不能正常工作的mergesort算法。合并排序的工作方式如下: i、 将未排序的列表划分为n个子列表,每个子列表包含1个元素(1个元素的列表被视为已排序)。 ii.重复合并子列表以产生新排序的子列表,直到只剩下1个子列表。这将是已排序的列表。下面提供了实现。 最初,递归调用此方法,直到只有一个元素。 这是提供的合并方法。 这里的问题是什么? 输入是输出是
我第一次用一个辅助数组实现了合并排序,以尝试使用JavaScript实现可视化。这似乎应该是有效的,但它不是。任何帮助或提示将不胜感激。 编辑:我忘了包括它不起作用的情况。它们是: 输入:[4, 2, 5, 6, 7, 7]输出:[4, 2, 5, 6, 7, 7] 输入:[6,6,6,4,6,2]输出:[4,6,6,6,6,2] 输入:[6, 7, 3, 10, 7, 9, 6, 3, 4, 6
在index.hpp中,我创建了一个具有多个数据成员的类,如、等。我在类外部定义了一个构造函数。在program.cpp中,我创建了一个名为SAM的对象。当我试图编译它时,它显示错误。什么原因? Program.cpp index.hpp 错误信息