当前位置: 首页 > 知识库问答 >
问题:

工人之间的平衡RDD划分-火花

晁砚
2023-03-14

我正在使用x:key,y:set(values)RDD,并将其调用为

#values: RDD of tuples (key, val)    
file = values.groupByKey().mapValues(set).cache()
info_file = array(file.map(lambda (x,y): len(y)).collect())
var = np.var(info_file) #extremely high
def f():
     ...
file.foreachPartition(f)

len(y)的方差非常高,以至于大约1%的对集(用百分位数方法验证)使得集合中的值总数的20%总数=np.sum(info_file)。如果Spark随机使用shuffle进行分区,那么很有可能会有1%的数据落入同一分区,从而导致工作人员之间的负载不平衡。

有没有办法确保“重”元组在分区中正常分布?我实际上将文件分成两个分区,,基于阈值 = np.percentile(info_file,99.9) 给出的 len(y) 阈值,以便分离这组元组,然后重新分区。

light = file.filter(lambda (x,y): len(y) < threshold).cache()
heavy = file.filter(lambda (x,y): len(y) >= threshold).cache()

light.foreachPartition(f)
heavy.foreachPartition(f)

但获得几乎相同的运行时间。负载可能已经优化,只是想检查我是否可以做更多/更好的事情。

共有1个答案

梅庆
2023-03-14

您可以使用Ganglia来监控集群负载。这将为您提供一个很好的指示,表明可能导致集群负载不均衡的任何数据倾斜。

如果您确实有一个不幸的数据偏差,有一些方法可以解决它,例如重组数据或对键加盐。例如,参见StackOverflow Q

请注意,您也可以像现在这样,将数据分为< code >重分区和< code >轻分区,但在这种情况下,您希望< code >缓存文件,而不是< code >重和< code >轻,因为您需要多次处理< code >文件。像这样:

cachedFile = file.cache()

light = cachedFile.filter(lambda (x,y): len(y) < threshold)
heavy = cachedFile.filter(lambda (x,y): len(y) >= threshold)

light.foreachPartition(f)
heavy.foreachPartition(f)

希望有帮助。

 类似资料:
  • 是否有一种方法可以在微服务的两个pod之间进行主动和被动负载平衡。假设我有两个运行微服务的实例(pod),它是使用K8s服务对象公开的。是否有一种方法来配置负载平衡,使一个pod始终获得请求,当该pod停机时,另一个po将开始接收请求? 我在该服务的顶部还有一个ingress对象。

  • 我通过指定分区的数量从文本文件创建RDD(Spark 1.6)。但它给我的分区数与指定的分区数不同。 案例1 案例2 案例3 案例4 文件/home/pvikash/data/test的内容。txt是: 这是一个测试文件。将用于rdd分区 基于以上案例,我有几个问题。 对于案例2,显式指定的分区数为0,但实际分区数为1(即使默认最小分区为2),为什么实际分区数为1? 对于案例3,为什么在指定数量的

  • 假设我有一个包含1,000个元素和10个执行器的RDD。现在我用10个分区并行化RDD,并由每个执行器处理100个元素(假设每个执行器1个任务)。 我的困难是,其中一些分区任务可能比其他任务花费的时间要长得多,所以说8个执行器将很快完成,而剩下的2个执行器则将被困在执行时间更长的任务中。因此,主进程将等待2完成后再继续,8将处于空闲状态。 有什么方法可以让无所事事的执行者从忙碌的执行者那里“拿走”

  • 我创建并持久化一个df1,然后在其上执行以下操作: 我有一个有16个节点的集群(每个节点有1个worker和1个executor,4个内核和24GB Ram)和一个master(有15GB Ram)。Spark.shuffle.Partitions也是192个。它挂了2个小时,什么也没发生。Spark UI中没有任何活动。为什么挂这么久?是dagscheduler吗?我怎么查?如果你需要更多的信息

  • 我有RDD,其中每个记录都是int: 我所需要做的就是将这个RDD拆分成批。即。制作另一个RDD,其中每个元素都是固定大小的元素列表: 这听起来微不足道,然而,最近几天我很困惑,除了下面的解决方案之外,什么也找不到: > 使用ZipWithIndex枚举RDD中的记录: 这将得到我所需要的,然而,我不想在这里使用组。当您使用普通映射Reduce或一些抽象(如Apache Crunch)时,它是微不

  • 我已经配置了连接到Cassandra集群的独立spark集群,其中有1个主服务器、1个从服务器和Thrift服务器,该服务器用作Tableau应用程序的JDBC连接器。无论怎样,当我启动任何查询时,从属服务器都会出现在工作者列表中。所有工作负载都由主执行器执行。同样在Thrift web控制台中,我观察到只有一个执行器处于活动状态。 基本上,我希望火花集群的两个执行器上的分布式工作负载能够实现更高