我通过指定分区的数量从文本文件创建RDD。但它给我的分区数与指定的分区数不同。
scala> val people = sc.textFile("file:///home/pvikash/data/test.txt", 0)
people: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[72] at textFile at <console>:27
scala> people.getNumPartitions
res47: Int = 1
scala> val people = sc.textFile("file:///home/pvikash/data/test.txt", 1)
people: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[50] at textFile at <console>:27
scala> people.getNumPartitions
res36: Int = 1
scala> val people = sc.textFile("file:///home/pvikash/data/test.txt", 2)
people: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[52] at textFile at <console>:27
scala> people.getNumPartitions
res37: Int = 2
scala> val people = sc.textFile("file:///home/pvikash/data/test.txt", 3)
people: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[54] at textFile at <console>:27
scala> people.getNumPartitions
res38: Int = 3
scala> val people = sc.textFile("file:///home/pvikash/data/test.txt", 4)
people: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[56] at textFile at <console>:27
scala> people.getNumPartitions
res39: Int = 4
scala> val people = sc.textFile("file:///home/pvikash/data/test.txt", 5)
people: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[58] at textFile at <console>:27
scala> people.getNumPartitions
res40: Int = 6
scala> val people = sc.textFile("file:///home/pvikash/data/test.txt", 6)
people: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[60] at textFile at <console>:27
scala> people.getNumPartitions
res41: Int = 7
scala> val people = sc.textFile("file:///home/pvikash/data/test.txt", 7)
people: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[62] at textFile at <console>:27
scala> people.getNumPartitions
res42: Int = 8
scala> val people = sc.textFile("file:///home/pvikash/data/test.txt", 8)
people: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[64] at textFile at <console>:27
scala> people.getNumPartitions
res43: Int = 9
scala> val people = sc.textFile("file:///home/pvikash/data/test.txt", 9)
people: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[66] at textFile at <console>:27
scala> people.getNumPartitions
res44: Int = 11
scala> val people = sc.textFile("file:///home/pvikash/data/test.txt", 10)
people: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[68] at textFile at <console>:27
scala> people.getNumPartitions
res45: Int = 11
scala> val people = sc.textFile("file:///home/pvikash/data/test.txt", 11)
people: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[70] at textFile at <console>:27
scala> people.getNumPartitions
res46: Int = 13
文件 /home/pvikash/data/test.txt的内容是:
This is a test file.
Will be used for rdd partition.
我试图理解为什么这里的分区数量在变化,如果我们有小数据(可以容纳一个分区),那么为什么spark会创建空分区?
任何解释都将不胜感激。
在spark中,函数textFile调用hadoopFile函数。
如果你检查hadoopFile的签名,看起来像
def hadoopFile[K, V](path: String,
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = {
因此,您指定的分区是RDD将拥有的最小分区数。然而,每个分区的大小将由文件输入格式中的不同函数< code>computeSplitSize决定。
因此,当您设置并行性时,您可以保证至少获得那么多分区,但是确切的数字可能比您拥有的要大。
有一个不错的博客与此相关。
我有一个关于RDD中默认分区的问题。 我不知道为什么会这样。你能帮忙吗。 谢了!
这是将Spark dataframe保存为Hive中的动态分区表的后续操作。我试图在答案中使用建议,但无法在Spark 1.6.1中使用 任何推动这一进程的帮助都是感激的。 编辑:还创建了SPARK-14927
我有以下制表符分隔的示例数据集: 我正在对此数据运行一些转换,最终数据位于spark dataset中。之后,我用“period”分区将该数据集写入s3。因为我也希望在s3文件中使用period,所以我正在从from period列创建另一列“datasetperiod”。 我的scala函数来保存TSV数据集。 在S3上保存数据集的Scala代码。为S3上的分区添加新列datasetPeriod
我试着用谷歌搜索,但找不到答案。 取自ApacheSpark:map vs mapPartitions? RDD的map和mapPartitions有什么区别 map在每个元素级别运行正在使用的函数,而mapPartitions在分区级别运行该函数。 在这种情况下,什么是元素级别?这只是一行吗?
假设我有一个1.2 GB的文件,那么考虑到128 MB的块大小,它将创建10个分区。现在,如果我将其重新分区(或合并)为4个分区,这意味着每个分区肯定会超过128 MB。在这种情况下,每个分区必须容纳320 MB的数据,但块大小是128 MB。我有点糊涂了。这怎么可能?我们如何创建一个大于块大小的分区?
我正试图用ANT构建脚本编译我的项目。当我更改Eclipse工作区的默认JRE时,假设JDK1.6.0_27 32位,ANT仍然使用JRE 1.7.0 64位(这是系统默认值) 有一个选项设置ANT构建文件使用不同的java版本通过设置外部工具配置,选择ANT构建文件和选择相关的java在JRE选项卡,但这是奇怪的,因为有选项在同一个地方使用运行在相同的JRE作为工作区,这对我不起作用(运行Ecl