textFile方法用于读取文本文档并返回一个RDD对象.
使用的时候只需要传入文件路径和分区数量即可,分区数量也可以不传.
不传的话会有默认值,默认值为 defaultMinPartitions
. 至于这个值是多少,要查看其余的代码
def textFile(
//表示要加载读取的文件的路径
path: String,
//表示分区数量,可以不赋值, 不赋值时,会使用默认值 local模式不指定时会使用1
//cluster模式不指定时, 默认使用的是CPU的核数,如果所有worker的core加起来大于2,则使用2.
//当指定值时,分区数量一定是指定时的数量
minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
assertNotStopped()
//读取文件,返回RDD对象
//底层是用haoopFile算子去读取hdfs上的文件,封装成pair形式的RDD,然后取出pair._2的值.
hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
minPartitions).map(pair => pair._2.toString).setName(path)
}
hadoopFile如何读取文件返回RDD对象,源码如下
// @return RDD of tuples of key and corresponding value
def hadoopFile[K, V](
path: String,
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope {
assertNotStopped()
// This is a hack to enforce loading hdfs-site.xml.
// See SPARK-11227 for details.
FileSystem.getLocal(hadoopConfiguration)
// A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
val confBroadcast = broadcast(new SerializableConfiguration(hadoopConfiguration))
val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path)
new HadoopRDD(
this,
confBroadcast,
Some(setInputPathsFunc),
inputFormatClass,
keyClass,
valueClass,
minPartitions).setName(path)
}
3 分区数量会赋值给底层的getSplit(job conf,numpartitions)方法.
因此,分区数量由开发人员决定.
4 每个分区 对应的真正的原始数据,是由getSplit()方法里的逻辑来决定的.