如果rdd1
和rdd2
具有相同的分区符,
join?<>
否。如果两个RDD具有相同的分区器,则连接
将不会导致混洗。您可以在CoGroupedRDD.scala
中看到这一点:
override def getDependencies: Seq[Dependency[_]] = {
rdds.map { rdd: RDD[_ <: Product2[K, _]] =>
if (rdd.partitioner == Some(part)) {
logDebug("Adding one-to-one dependency with " + rdd)
new OneToOneDependency(rdd)
} else {
logDebug("Adding shuffle dependency with " + rdd)
new ShuffleDependency[K, Any, CoGroupCombiner](rdd, part, serializer)
}
}
}
然而,请注意,没有混洗并不意味着节点之间不需要移动数据。两个RDD可能具有相同的分区器(被共同分区),但相应的分区位于不同的节点上(不被共同定位)。
这种情况仍然比洗牌好,但这是需要记住的。同一地点可以提高性能,但很难保证。
方法process()不更改记录的字段(键)值。假设所有算子的并行度都是2,那么keyBy()at(2)是否也会导致网络洗牌呢?也许keyBy()at(2)由于密钥值不变而具有前向策略避免网络通信代价的效果? 太好了~
我在火花变换函数中有一个简单的问题。 coalesce(numPartitions) - 将 RDD 中的分区数减少到 numPartitions。可用于在筛选大型数据集后更有效地运行操作。 我的问题是 > < Li > < p > coalesce(num partitions)真的会从filterRDD中删除空分区吗? coalesce(numPartitions)是否经历了洗牌?
我有一个项目的RDD,还有一个函数 。 收集RDD的两个小样本,然后这两个数组。这很好,但无法扩展。 有什么想法吗? 谢谢 编辑:下面是如何压缩每个分区中具有不同项数的两个示例: 关键是,虽然RDD的. zip方法不接受大小不等的分区,但迭代器的. zip方法接受(并丢弃较长迭代器的剩余部分)。
这只是出于好奇。 如果是这样,那么扩展到一个极端的情况,如果您的内存以某种方式结束了超级碎片(例如,每隔一个字节都被分配为ala 我想站台不重要? 很抱歉提出了一个扩展的问题,但这也会发生在其他语言中吗,比如Java/C#?
因此,如何跨辅助节点对RDD进行分区,是将被分区的单个RDD还是一个完整的批处理。 我可能拿错了。请指引我
我试图实现一个函数,我使用实现了它,该函数运行良好,但用于并行化。我正在将函数转换为可拆分的Do函数。我使用在本地运行了一个包含5000个元素的单元测试,而在DataFlow中运行了相同的单元测试,但失败了,错误如下。 下面给出了本地DirectRunner和云数据流运行器之间的数据差异。 本地中的DirectRunner: 在示例输入PCollection元素中有5000个ABC 云中的Data