我有一个有 30 条记录的 RDD(键/值对:键是时间戳,值是 JPEG 字节数组),
我正在运行 30 个执行器。我想将此 RDD 重新分区为 30 个分区,以便每个分区获得一条记录并分配给一个执行器。
当我使用 rdd.repartition(
30) 时,它会在 30 个分区中重新分区我的 rdd,但有些分区得到 2 条记录,有些得到 1 条记录,有些没有得到任何记录。
在Spark中,有没有什么方法可以将我的记录平均分配到所有分区。
您可以使用分区通过
命令并提供多个分区来强制执行新的分区。默认情况下,分区器是基于哈希的,但您可以切换到基于范围的以获得更好的分布。如果您真的想强制重新分区,您可以使用随机数生成器作为分区函数(在PySpark中)。
my_rdd.partitionBy(pCount, partitionFunc = lambda x: np.random.randint(pCount))
但是,这经常会导致低效的洗牌(节点之间传输大量数据),但是如果您的进程计算有限,那么它是有意义的。
下面是一个将rdd重新分区为n分区
分区的示例,以便项目在分区中均匀分布。每个分区中的项目数量最多相差1个。
evenly_repartitioned = (
rdd
.zipWithIndex()
.map(lambda p: (p[1], p[0]))
.partitionBy(N, lambda p: p)
.values()
)
确实如此:
(项目,索引)
的元组,其中索引位于整个RDD上(index,项目)
N
分区,将项目移动到分区index%N
- 只取值,删除元组中的索引。
请注意,这比默认的基于哈希的重新分区要慢,因为在< code > ziptwithindex()
期间,它需要另一个Spark阶段来计算每个分区的大小。
可以使用盐渍技术,包括添加新的“假”密钥并与当前密钥一起使用以更好地分发数据。
(这里是腌制链接)
我们在AWS上运行16个节点kafka集群,每个节点是m4. xLargeEC2实例,具有2TB EBS(ST1)磁盘。Kafka版本0.10.1.0,目前我们有大约100个主题。一些繁忙的话题每天会有大约20亿个事件,一些低量的话题每天只有数千个。 我们的大多数主题在生成消息时使用UUID作为分区键,因此分区分布相当均匀。 我们有相当多的消费者使用消费群体从这个集群消费。每个使用者都有一个唯一的
0.1-0.2:********** 0.2-0.3:******** 0.3-0.4:********* 0.5-0.6:********* 0.6-0.7:********* 0.7-0.8:********* 0.4-0.5:********* 0.5-0.6:********* 0.6-0.7:********* 0.1-0.2:********* 0.2-0.3:********* 0.
我通过指定分区的数量从文本文件创建RDD(Spark 1.6)。但它给我的分区数与指定的分区数不同。 案例1 案例2 案例3 案例4 文件/home/pvikash/data/test的内容。txt是: 这是一个测试文件。将用于rdd分区 基于以上案例,我有几个问题。 对于案例2,显式指定的分区数为0,但实际分区数为1(即使默认最小分区为2),为什么实际分区数为1? 对于案例3,为什么在指定数量的
我在任何地方都找不到如何在RDD内部执行重新分区?我知道您可以在RDD上调用重新分区方法来增加分区数量,但它是如何在内部执行的呢? 假设,最初有5个分区,他们有- 第一个分区 - 100 个元素 第二个分区 - 200 个元素 第 3 个分区 - 500 个元素 第 4 个分区 - 5000 个元素 第 5 分区 - 200 个元素 一些分区是倾斜的,因为它们是从HBase加载的,并且数据没有正确
谁能给我解释一下吗? 然而,另一方面是,对于不能保证产生已知分区的转换,输出RDD将没有分区器集。例如,如果对哈希分区的键/值对RDD调用map(),则传递给map()的函数在理论上可以更改每个元素的键,因此结果将不会有分区器。Spark不会分析函数以检查它们是否保留密钥。相反,它提供了另外两个操作,mapValues()和flatMap Values(),它们保证每个元组的键保持不变。 Mate