所以,我知道一般来说,在以下情况下应该使用 coalesce():
由于过滤器
或其他可能导致减少原始数据集(RDD、DF)的操作,分区数量减少。coalesce()
对于在过滤大型数据集后更有效地运行操作很有用。
我也知道它比< code>repartition更便宜,因为它通过仅在必要时移动数据来减少洗牌。我的问题是如何定义< code>coalesce采用的参数(< code > idealpartionno )。我正在做一个项目,这个项目是另一个工程师交给我的,他用下面的计算来计算这个参数的值。
// DEFINE OPTIMAL PARTITION NUMBER
implicit val NO_OF_EXECUTOR_INSTANCES = sc.getConf.getInt("spark.executor.instances", 5)
implicit val NO_OF_EXECUTOR_CORES = sc.getConf.getInt("spark.executor.cores", 2)
val idealPartionionNo = NO_OF_EXECUTOR_INSTANCES * NO_OF_EXECUTOR_CORES * REPARTITION_FACTOR
然后将其用于分区器
对象:
val partitioner = new HashPartitioner(idealPartionionNo)
但也用于:
RDD.filter(x=>x._3<30).coalesce(idealPartionionNo)
这是正确的方法吗?< code > idealpartionno 值计算背后的主要思想是什么?什么是< code >重新分配因子?我通常如何定义它?
此外,由于YARN负责实时识别可用的执行器,是否有一种方法可以实时获取该编号(available_EXECUTOR_INSTANCES
),并将其用于计算理想PartitionNo
(即将
替换为
理想情况下,以下形式的一些实际例子:
n
个执行器,其中m
个核,分区因子等于
然后:
另外,如果你能给我推荐一个很好的博客来解释这些,我将不胜感激。
正如其他人所回答的,没有公式可以计算你的要求。也就是说,你可以对第一部分进行有根据的猜测,然后随着时间的推移对其进行微调。
第一步是确保您有足够的分区。如果您没有_OF_EXECUTOR_INSTANCES个执行程序,并且每个执行程序没有_OF_EXECUTOR_CORES个核心,那么您可以同时处理NO _ OF _ EXECUTOR _ INSTANCES * NO _ OF _ EXECUTOR _ CORES个分区(每个分区将处理特定实例的特定核心)。也就是说,这是假设所有东西都在内核之间平均分配,并且所有东西都需要完全相同的时间来处理。这种情况很少发生。由于本地性(例如,数据需要来自不同的节点)或者仅仅是因为它们不平衡(例如,如果您有按根域分区的数据,那么包括google在内的分区可能会非常大),其中一些很可能会在其他分区之前完成。这就是REPARTITION_FACTOR发挥作用的地方。这个想法是,我们“超额预定”每个内核,因此如果一个内核完成得非常快,一个内核完成得很慢,我们可以选择在它们之间分配任务。系数2-3通常是个好主意。
现在让我们看一下单个分区的大小。假设您的整个数据大小为 X MB,并且您有 N 个分区。每个分区的平均 X/N MB 数。如果 N 相对于 X 很大,那么您可能具有非常小的平均分区大小(例如几 KB)。在这种情况下,降低 N 通常是一个好主意,因为管理每个分区的开销变得太高。另一方面,如果大小非常大(例如几GB),那么您需要同时保存大量数据,这会导致垃圾回收,高内存使用率等问题。
最佳大小是一个很好的问题,但通常人们似乎更喜欢100-1000MB的分区,但事实上,几十MB的分区可能也很好。
您应该注意的另一件事是,在计算分区如何变化时。例如,假设您从1000个分区开始,每个分区100MB,然后过滤数据,使每个分区变为1K,那么您可能应该合并。当您进行分组或加入时,可能会发生类似的问题。在这种情况下,分区的大小和分区的数量都会发生变化,可能会达到不期望的大小。
您的问题是正确的,但Spark分区优化完全取决于您正在运行的计算。你需要有一个很好的理由来重新划分/合并;如果您只是计算一个RDD(即使它有大量稀疏的分区),那么任何重新分区/合并步骤都会让您慢下来。
重新分区(n)
(与coalesce(n,shuffle=true)和
的区别与执行模型有关。shuffle模型获取原始RDD中的每个分区,将其数据随机发送给所有执行器,并生成具有新(更小或更大)分区数的RDD。无洗牌模型创建一个新的RDD,将多个分区作为一个任务加载。
让我们考虑这样的计算:
sc.textFile("massive_file.txt")
.filter(sparseFilterFunction) // leaves only 0.1% of the lines
.coalesce(numPartitions, shuffle = shuffle)
如果< code>shuffle
为< code>true,则文本文件/过滤器计算发生在由< code>textFile中的缺省值给定的许多任务中,微小的过滤结果被打乱。如果< code>shuffle为< code>false,则任务总数最多为< code>numPartitions。
如果numPartitions
为1,则差异非常明显。shuffle模型将并行处理和过滤数据,然后将0.1%的过滤结果发送给一个执行器,用于下游DAG操作。无混洗模型将从一开始就在一个核心上处理和过滤数据。
考虑您的下游操作。如果只使用一次该数据集,那么可能根本不需要重新分区。如果您要将过滤后的RDD保存以供以后使用(例如,保存到磁盘),那么请考虑上面的权衡。熟悉这些模型需要经验,当一个模型表现更好时,请尝试这两种模型,看看它们的表现如何!
在实践中,最佳分区数量更多地取决于您拥有的数据、您使用的转换和整体配置,而不是可用资源。
treeReduce
相比的 reduce
),则大量分区会导致驱动程序上的负载增加。您可以找到许多规则,这些规则建议超额订阅分区与内核数量(因子 2 或 3 似乎很常见)或将分区保持在一定大小,但这不考虑您自己的代码:
在我看来:
>
不要尝试根据执行器或核心的数量使用固定数量的分区。首先了解您的数据和代码,然后调整配置以反映您的理解。
通常,确定集群表现出稳定行为的每个分区的原始数据量相对容易(根据我的经验,它在几百兆字节的范围内,这取决于您用于加载数据的格式、数据结构和配置)。这就是你要找的“神奇数字”。
总的来说,有些事情你必须记住:
*byKey
,join
,RDD.partitionBy
,Dataset.repartition
)都可能导致数据分布不均匀。始终监视作业是否存在严重数据偏差的症状。联合
、coGroup
、联接
)的任何操作都可能影响分区数。问题陈述: 我需要得到一个给定数字的最佳面额组合。例如:我有三种面额,给定的数字是30,那么列表应该返回
示例-现在假设我们有一个输入RDD输入,它在第二步中被过滤。现在我想计算过滤后的RDD中的数据大小,并考虑到块大小为128MB,计算需要多少分区才能重新分区 这将帮助我将分区数传递给重新分区方法。 问题 1.如何计算XX的值? SparkSQL /DataFrame有Q2.What类似的方法?
我对Spark相当陌生,所以也许我只是错过或误解了一些基本的东西。如有任何帮助,不胜感激。
本文阐述了使用 TiDB Data Migration(以下简称 DM)对分库分表进行合并迁移的场景中,DM 相关功能的支持和限制,旨在给出一个业务的最佳实践(使用默认的“悲观协调”模式)。 独立的数据迁移任务 在分库分表合并迁移的实现原理部分,我们介绍了 sharding group 的概念,简单来说可以理解为需要合并到下游同一个表的所有上游表即组成一个 sharding group。 当前的
我需要能够将一组已知大小的对象分配到3组。例如,给定一个像下面这样的有序列表,我想要找到两个除法或分离点,使三个组具有相似的和。 每组的总和必须大致相等,组2和组3的总和不能超过第一组的总和超过一个规定的量(例如10)。理想情况下,第一组比其他组稍大一些。无法更改项目的顺序。每个组由原始列表的连续元素组成。每个元素都放在一个组中。 在这种情况下,预期的解决办法是: 用例是预先计算(服务器端)按优先
问题内容: 我有ID为的商品。现在我有如下数据。每行都有一个offerId。由数组中的组合组成。是那个的价值 现在,我必须选择所有给我提供最佳ID组合(即最大总折扣)的offerId。 例如,在上述情况下:可能的结果可能是: [o2,o4,o5]最大折扣为。 注意。结果offerId应该不会重复ID。id的示例为[1,3,4],[5],[6]都是不同的。 其他组合可以是: 其id为[1],[3,5