当前位置: 首页 > 知识库问答 >
问题:

Spark集群上sqlContext.read...load()和sqlContext.write...save()代码在哪里运行?

王修为
2023-03-14

我使用Spark Dataframe API从NFS共享加载/读取文件,然后将该文件的数据保存/写入HDFS。

import java.io.File
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.SQLContext

object TestMoreThan1CSV2DF {
  private val source: String = "file:///data/files/"
  private val destination = "hdfs://<myHostIP>:8020/raw/"
  private val fileFormat : String = "com.databricks.spark.csv"

  def main(args:Array[String]):Unit={
    val conf = new SparkConf().setAppName("TestMoreThan1CSV2DF").setMaster("local")
    val sc = new SparkContext(conf)

    val sqlContext = new SQLContext(sc)

    val fileArray: Array[File] = new java.io.File(source).listFiles.filter(_.getName.endsWith(".csv"))

    for(file<-fileArray){
//  reading csv file from shared location and taking whole data in a dataframe
    var df = loadCSV2DF(sqlContext, fileFormat, "true", "true", file.getName)

//      variable for holding destination location : HDFS Location
    var finalDestination: String = destination+file.getName

//  saving data into HDFS
    writeDF2HDFS(df,fileFormat,"true",finalDestination) /// saved using default number of partition = 1
    }
  }

 def loadCSV2DF(sqlContext : SQLContext, fileFormat: String, header : String, inferSchema: String, source: String) : DataFrame = {
   try{
       sqlContext.read.format(fileFormat)
                       .option("header", header) // Use first line of all files as header
                       .option("inferSchema", inferSchema) // Automatically infer data types
                       .load(source)
   }
   catch{
     case ex: OnboardingException => {
            throw ex;
        }
   }
 }

 def writeDF2HDFS(df: DataFrame, fileFormat: String, header: String, destination: String, partitions: Integer = 1){
   try{
       df.repartition(partitions).write.format(fileFormat).option("header",header).save(destination)
   }
   catch{
     Case ez : OnboardingException => {
            throw ez;
        }
   }
 }
}

1)整个代码在哪里运行?它是在驱动程序上运行?还是同时使用两个worker?

2)load()和save()API是否在worker节点上运行,它是否并行工作?如果是,那么两个worker如何跟踪while的读写部分?

3)到现在为止,我在“for”循环中顺序地读取每个文件,并顺序地处理每个文件,有没有可能使它成为一个多线程应用程序,其中每个文件被分配给一个线程,以并行地执行端到端的读写。在这样做的时候,磁盘IO会有任何限制吗?

共有1个答案

童宏富
2023-03-14

从另一个线程复制了一个很好的解释,用于我的查询:区分Apache Spark中的驱动程序代码和工作代码

在这里也复制它的一部分:转换创建的闭包中发生的所有事情都发生在worker上。它的意思是,如果在map(...)、filter(...)、mapPartitions(...)、groupby*(...)、aggregateby*(...)中传递了一些内容对工人执行死刑。它包括从持久存储或远程源读取数据。

像计数、减少(...)、折叠(...)这样的操作通常在司机和工人身上执行。沉重的起重是由工人并行执行的,一些最后的步骤,如减少从工人那里收到的输出,是在司机上依次执行的。

3)还在努力做到这一点,一旦做到就会回答这一点。

谢谢

 类似资料:
  • 我是一名spark/纱线新手,在提交纱线集群上的spark作业时遇到exitCode=13。当spark作业在本地模式下运行时,一切正常。 我使用的命令是: Spark错误日志:

  • Saving data using afterChange callback Saving data locally using persistentState Why should I use persistentState rather than regular LocalStorage API? Saving data using afterChange callback Use the a

  • save 用法 Usage: docker save [OPTIONS] IMAGE [IMAGE...] Save an image(s) to a tar archive (streamed to STDOUT by default) --help=false Print usage -o, --output= Write to an file, instea

  • 并编写下面的查询,它是只在我的master上运行,还是将所有10个节点都用作worker? 如果不是,我必须做什么才能让我的Spark Sql使用完整的集群?

  • 在k8s集群中。如何配置zeppelin在现有spark集群中运行spark作业,而不是旋转一个新的Pod? 我有一个k8s集群正在运行,我想在其中运行与齐柏林飞艇的火花。 Spark使用官方的Bitnami/Spark helm chart(v3.0.0)进行部署。我有一个主舱和两个工人舱运转良好,一切都很好。 短伪DockerFile: 我稍微修改了。(Image,imagePullSecre

  • 我的Spark 2.3.3集群运行良好。我在“http://master-address:8080”上看到了GUI,其中有2个空闲的工作人员。 我有一个Scala应用程序,它创建上下文并启动作业。我不使用spark-submit,我以编程方式开始工作,这是许多答案与我的问题不同的地方。 在“my-app”中,我创建了一个新的SparkConf,代码如下(略有缩写): 司机跑到哪里去了?我如何找到它