假设我创建了这样一个RDD(我使用的是Pyspark):
list_rdd = sc.parallelize(xrange(0, 20, 2), 6)
然后我用glom()
方法打印分区元素并获得
[[0], [2, 4], [6, 8], [10], [12, 14], [16, 18]]
Spark是如何决定如何划分我的列表的?元素的特定选择来自哪里?它可以以不同的方式耦合它们,只留下0和10以外的一些其他元素,以创建6个请求的分区。在第二次运行中,分区是相同的。
使用更大的范围,有29个元素,我得到2个元素后跟3个元素的模式的分区:
list_rdd = sc.parallelize(xrange(0, 30, 2), 6)
[[0, 2], [4, 6, 8], [10, 12], [14, 16, 18], [20, 22], [24, 26, 28]]
使用更小范围的9个元素,我得到
list_rdd = sc.parallelize(xrange(0, 10, 2), 6)
[[], [0], [2], [4], [6], [8]]
因此,我推断Spark是通过将列表拆分为一个配置来生成分区的,其中尽可能小的集合后面跟着更大的集合,并重复。
问题是,这个选择背后是否有一个原因,它非常优雅,但它是否也提供了性能优势?
除非您指定特定的分区程序,否则这是“随机的”,因为它取决于该RDD的特定实现。在这种情况下,您可以前往ParallelCollectionsRDD进一步深入研究它。
getPartitions
定义为:
val slices = ParallelCollectionRDD.slice(data, numSlices).toArray
slices.indices.map(i => new ParallelCollectionPartition(id, i, slices(i))).toArray
其中切片
被注释为(重新格式化以更适合):
/**
* Slice a collection into numSlices sub-collections.
* One extra thing we do here is to treat Range collections specially,
* encoding the slices as other Ranges to minimize memory cost.
* This makes it efficient to run Spark over RDDs representing large sets of numbers.
* And if the collection is an inclusive Range,
* we use inclusive range for the last slice.
*/
请注意,有一些关于内存的注意事项。因此,同样,这将特定于实现。
假设我希望根据的对其进行分区。 通过覆盖方法对进行分区,并且只使用的hashcode是否正确? 但是,鉴于接受了许多分区参数,我不知道是否需要事先知道种类的数量,如果种类多于分区,会发生什么? 我的目标是打电话 并且在迭代器中只有具有相同的值。
从本质上说,我想最小化覆盖消费的成本。这些规则或多或少有以下几点: 每一个客户(我们有几百万个客户)消费不同的物品,这是不同的每一个客户。我们有许多产品,每一个包括一个或多个项目,在给定的成本。产品和成本对所有客户来说都是共同的。此外,还有一些附加的限制因素将哪些产品可以为每个客户组合在一起,但这些限制因素对所有客户来说都是相同的。我正在计划使用Spark解决这个问题,我不熟悉它的算法在这个问题上
这是我的剧本。我已经尝试了所有的组合,甚至只使用一个本地节点。但是看起来没有加载log4j.properties,所有调试级别的信息都被转储了。 log4j属性:
问题内容: 我目前正在使用jconsole监视Java应用程序。内存选项卡使你可以选择: 它们之间有什么区别? 问题答案: 堆内存 堆内存是Java VM从中为所有类实例和数组分配内存的运行时数据区。堆的大小可以固定或可变。垃圾收集器是一个自动内存管理系统,可以回收对象的堆内存。 Eden Space:最初为大多数对象分配内存的池。 幸存者空间:包含在Eden空间的垃圾回收中幸存的对象的池。 终身
问题内容: 我的代码示例: 结果是: 我究竟做错了什么? 问题答案: 您尚未为结果指定比例。请尝试这个 2019编辑:已更新JDK 13的答案。因为希望您现在已从JDK 1.5迁移。 请阅读JDK 13 文档。 JDK 1.5的旧答案 : 结果将为0.33。请阅读API
我的代码示例: 结果是:<代码>1/3=0 我做错了什么?