当前位置: 首页 > 知识库问答 >
问题:

火花RDD中的分区数

田丰
2023-03-14

我通过指定分区的数量从文本文件创建RDD(Spark 1.6)。但它给我的分区数与指定的分区数不同。

案例1

scala> val people = sc.textFile("file:///home/pvikash/data/test.txt", 1)
people: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[50] at textFile at <console>:27

scala> people.getNumPartitions
res36: Int = 1

scala> val people = sc.textFile("file:///home/pvikash/data/test.txt", 2)
people: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[52] at textFile at <console>:27

scala> people.getNumPartitions
res37: Int = 2

scala> val people = sc.textFile("file:///home/pvikash/data/test.txt", 3)
people: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[54] at textFile at <console>:27

scala> people.getNumPartitions
res38: Int = 3

scala> val people = sc.textFile("file:///home/pvikash/data/test.txt", 4)
people: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[56] at textFile at <console>:27

scala> people.getNumPartitions
res39: Int = 4

案例2

scala> val people = sc.textFile("file:///home/pvikash/data/test.txt", 0)
people: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[72] at textFile at <console>:27

scala> people.getNumPartitions
res47: Int = 1

案例3

scala> val people = sc.textFile("file:///home/pvikash/data/test.txt", 5)
people: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[58] at textFile at <console>:27

scala> people.getNumPartitions
res40: Int = 6

scala> val people = sc.textFile("file:///home/pvikash/data/test.txt", 6)
people: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[60] at textFile at <console>:27

scala> people.getNumPartitions
res41: Int = 7

scala> val people = sc.textFile("file:///home/pvikash/data/test.txt", 7)
people: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[62] at textFile at <console>:27

scala> people.getNumPartitions
res42: Int = 8

scala> val people = sc.textFile("file:///home/pvikash/data/test.txt", 8)
people: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[64] at textFile at <console>:27

scala> people.getNumPartitions
res43: Int = 9

scala> val people = sc.textFile("file:///home/pvikash/data/test.txt", 10)
people: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[68] at textFile at <console>:27

scala> people.getNumPartitions
res45: Int = 11

案例4

scala> val people = sc.textFile("file:///home/pvikash/data/test.txt", 9)
people: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[66] at textFile at <console>:27

scala> people.getNumPartitions
res44: Int = 11

scala> val people = sc.textFile("file:///home/pvikash/data/test.txt", 11)
people: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[70] at textFile at <console>:27

scala> people.getNumPartitions
res46: Int = 13

文件/home/pvikash/data/test的内容。txt是:

这是一个测试文件。将用于rdd分区

基于以上案例,我有几个问题。

  1. 对于案例2,显式指定的分区数为0,但实际分区数为1(即使默认最小分区为2),为什么实际分区数为1?
  2. 对于案例3,为什么在指定数量的分区上实际分区数更改了1?
  3. 对于案例4,为什么在指定数量的分区上实际分区数更改了2?
  4. 为什么火花在案例1、案例2、案例3和案例4中表现不同?
  5. 如果输入数据很小(可以很容易地放入单个分区),那么为什么Spark会创建空分区?

任何解释都将不胜感激。

共有1个答案

司宏伯
2023-03-14

不是一个完整的答案,但它可能会让你更接近它。

您传入的数字称为minSplits。它对最小分区数有影响,仅此而已。

def textFile(path: String, minSplits: Int = defaultMinSplits): RDD[String]

拆分数量应由< code>getSplits方法(docs)控制

这篇SO帖子应该回答问题5

 类似资料:
  • 我有RDD,其中每个记录都是int: 我所需要做的就是将这个RDD拆分成批。即。制作另一个RDD,其中每个元素都是固定大小的元素列表: 这听起来微不足道,然而,最近几天我很困惑,除了下面的解决方案之外,什么也找不到: > 使用ZipWithIndex枚举RDD中的记录: 这将得到我所需要的,然而,我不想在这里使用组。当您使用普通映射Reduce或一些抽象(如Apache Crunch)时,它是微不

  • 我想了解以下关于火花概念的RDD的事情。 > RDD仅仅是从HDFS存储中复制某个节点RAM中的所需数据以加快执行的概念吗? 如果一个文件在集群中被拆分,那么对于单个flie来说,RDD从其他节点带来所有所需的数据? 如果第二点是正确的,那么它如何决定它必须执行哪个节点的JVM?数据局部性在这里是如何工作的?

  • 我对spark有疑问:HDFS块vs集群核心vs rdd分区。 假设我正在尝试在HDFS中处理一个文件(例如块大小为64MB,文件为6400MB)。所以理想情况下它确实有100个分裂。 我的集群总共有 200 个核心,我提交了包含 25 个执行程序的作业,每个执行程序有 4 个核心(意味着可以运行 100 个并行任务)。 简而言之,我在rdd中默认有100个分区,100个内核将运行。 这是一个好方

  • 谁能给我解释一下吗? 然而,另一方面是,对于不能保证产生已知分区的转换,输出RDD将没有分区器集。例如,如果对哈希分区的键/值对RDD调用map(),则传递给map()的函数在理论上可以更改每个元素的键,因此结果将不会有分区器。Spark不会分析函数以检查它们是否保留密钥。相反,它提供了另外两个操作,mapValues()和flatMap Values(),它们保证每个元组的键保持不变。 Mate

  • 我想了解火花流中的一个基本的东西。我有50个Kafka主题分区和5个执行者的数字,我正在使用DirectAPI所以没有。的RDD分区将为50个。这个分区将如何在5个执行器上处理?将在每个执行器上一次触发进程1个分区,或者如果执行器有足够的内存和内核,它将在每个执行器上并行处理超过1个分区。

  • 问题内容: 在我的猪代码中,我这样做: 我想用spark做同样的事情。但是,不幸的是,我看到我必须成对进行: 是否有联合运算符可以让我一次对多个rdds进行操作: 例如 这是一个方便的问题。 问题答案: 如果这些是RDD,则可以使用方法: 没有等效项,但这只是一个简单的问题: 如果要在RDD上大量使用和重新创建,可能是避免与准备执行计划的成本相关的问题的更好选择: