当前位置: 首页 > 工具软件 > Text_storage > 使用案例 >

spark textFile方法

百里阳平
2023-12-01

textFile方法作用

textFile方法用于读取文本文档并返回一个RDD对象.

使用的时候只需要传入文件路径和分区数量即可,分区数量也可以不传.
不传的话会有默认值,默认值为 defaultMinPartitions. 至于这个值是多少,要查看其余的代码

  def textFile(
  //表示要加载读取的文件的路径
      path: String,
      //表示分区数量,可以不赋值, 不赋值时,会使用默认值 local模式不指定时会使用1
      //cluster模式不指定时, 默认使用的是CPU的核数,如果所有worker的core加起来大于2,则使用2.
      //当指定值时,分区数量一定是指定时的数量
      minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
    assertNotStopped()
 //读取文件,返回RDD对象
 //底层是用haoopFile算子去读取hdfs上的文件,封装成pair形式的RDD,然后取出pair._2的值.
    hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
      minPartitions).map(pair => pair._2.toString).setName(path)
  }

hadoopFile如何读取文件返回RDD对象,源码如下

   // @return RDD of tuples of key and corresponding value
  def hadoopFile[K, V](
      path: String,
      inputFormatClass: Class[_ <: InputFormat[K, V]],
      keyClass: Class[K],
      valueClass: Class[V],
      minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope {
    assertNotStopped()

    // This is a hack to enforce loading hdfs-site.xml.
    // See SPARK-11227 for details.
    FileSystem.getLocal(hadoopConfiguration)

    // A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
    val confBroadcast = broadcast(new SerializableConfiguration(hadoopConfiguration))
    val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path)
    new HadoopRDD(
      this,
      confBroadcast,
      Some(setInputPathsFunc),
      inputFormatClass,
      keyClass,
      valueClass,
      minPartitions).setName(path)
  }

3 分区数量会赋值给底层的getSplit(job conf,numpartitions)方法.
因此,分区数量由开发人员决定.

4 每个分区 对应的真正的原始数据,是由getSplit()方法里的逻辑来决定的.

 类似资料: