找到给定RDD的每个分区大小的最佳方法是什么。我正在尝试调试一个扭曲的分区问题,我尝试了以下方法:
l = builder.rdd.glom().map(len).collect() # get length of each partition
print('Min Parition Size: ',min(l),'. Max Parition Size: ', max(l),'. Avg Parition Size: ', sum(l)/len(l),'. Total Partitions: ', len(l))
它适用于小型RDD,但对于大型RDD,它会产生OOM错误。我的想法是,glom()
导致了这种情况的发生。但不管怎样,我只是想知道有没有更好的方法?
而@LostInOverflow的答案非常有效。我用下面的代码找到了另一种方法来计算每个分区的大小和索引。多亏了这篇很棒的帖子。
以下是代码:
l = test_join.rdd.mapPartitionsWithIndex(lambda x,it: [(x,sum(1 for _ in it))]).collect()
然后,您可以使用以下代码获得最大和最小大小的分区:
min(l,key=lambda item:item[1])
max(l,key=lambda item:item[1])
如果需要,找到倾斜分区的键,我们可以进一步调试该分区的内容。
使用:
builder.rdd.mapPartitions(lambda it: [sum(1 for _ in it)])
我想在我的spark rdd上做一个映射, 但是,这给了我一个已经关闭的连接异常,正如预期的那样,因为在控件到达之前,我的是关闭的。我想为每个RDD分区创建一个连接,并正确地关闭它。我如何实现这一点? 谢谢
因此,如何跨辅助节点对RDD进行分区,是将被分区的单个RDD还是一个完整的批处理。 我可能拿错了。请指引我
在Spark流式传输中,是否可以将特定的RDD分区分配给集群中的特定节点(为了数据局部性?) 例如,我得到一个事件流[a,a,a,b,b],并有一个2节点的Spark集群。 我希望所有的a总是去节点1,所有的b总是去节点2。 谢啦!
我有一个RDD[标签点],我想找到标签的最小值和最大值,并应用一些转换,例如从所有这些标签中减去数字5。问题是我已经尝试了各种方法来获取标签,但没有任何工作正常。 如何仅访问 RDD 的标签和功能?有没有办法将它们作为列表[双精度]和列表[向量]例如? 我无法转到数据帧。
例如,我们假设RDD为: 当调用时,它将创建多个文件,如下所示: 我认为可以使用的手动写入S3,如下所示: 不幸的是,connection和bucket对象既不是可序列化的,也不是线程安全的(我推测),所以我不能像上面那样共享节点之间的连接。 我想我可以连接到S3&在函数本身中获取bucket,但是这个操作肯定会使AWS限制我的API使用,因为我有大量的RDD。 为了以我所描述的格式将数据快速持久
我通读了地图和地图分区之间的理论差异, 但我下面描述的问题更多地基于GC活动 = = 提前感谢。任何帮助都将不胜感激。