我使用Spark Dataframe API从NFS共享加载/读取文件,然后将该文件的数据保存/写入HDFS。
import java.io.File
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.SQLContext
object TestMoreThan1CSV2DF {
private val source: String = "file:///data/files/"
private val destination = "hdfs://<myHostIP>:8020/raw/"
private val fileFormat : String = "com.databricks.spark.csv"
def main(args:Array[String]):Unit={
val conf = new SparkConf().setAppName("TestMoreThan1CSV2DF").setMaster("local")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val fileArray: Array[File] = new java.io.File(source).listFiles.filter(_.getName.endsWith(".csv"))
for(file<-fileArray){
// reading csv file from shared location and taking whole data in a dataframe
var df = loadCSV2DF(sqlContext, fileFormat, "true", "true", file.getName)
// variable for holding destination location : HDFS Location
var finalDestination: String = destination+file.getName
// saving data into HDFS
writeDF2HDFS(df,fileFormat,"true",finalDestination) /// saved using default number of partition = 1
}
}
def loadCSV2DF(sqlContext : SQLContext, fileFormat: String, header : String, inferSchema: String, source: String) : DataFrame = {
try{
sqlContext.read.format(fileFormat)
.option("header", header) // Use first line of all files as header
.option("inferSchema", inferSchema) // Automatically infer data types
.load(source)
}
catch{
case ex: OnboardingException => {
throw ex;
}
}
}
def writeDF2HDFS(df: DataFrame, fileFormat: String, header: String, destination: String, partitions: Integer = 1){
try{
df.repartition(partitions).write.format(fileFormat).option("header",header).save(destination)
}
catch{
Case ez : OnboardingException => {
throw ez;
}
}
}
}
1)整个代码在哪里运行?它是在驱动程序上运行?还是同时使用两个worker?
2)load()和save()API是否在worker节点上运行,它是否并行工作?如果是,那么两个worker如何跟踪while的读写部分?
3)到现在为止,我在“for”循环中顺序地读取每个文件,并顺序地处理每个文件,有没有可能使它成为一个多线程应用程序,其中每个文件被分配给一个线程,以并行地执行端到端的读写。在这样做的时候,磁盘IO会有任何限制吗?
从另一个线程复制了一个很好的解释,用于我的查询:区分Apache Spark中的驱动程序代码和工作代码
在这里也复制它的一部分:转换创建的闭包中发生的所有事情都发生在worker上。它的意思是,如果在map(...)、filter(...)、mapPartitions(...)、groupby*(...)、aggregateby*(...)中传递了一些内容对工人执行死刑。它包括从持久存储或远程源读取数据。
像计数、减少(...)、折叠(...)这样的操作通常在司机和工人身上执行。沉重的起重是由工人并行执行的,一些最后的步骤,如减少从工人那里收到的输出,是在司机上依次执行的。
3)还在努力做到这一点,一旦做到就会回答这一点。
谢谢
我是一名spark/纱线新手,在提交纱线集群上的spark作业时遇到exitCode=13。当spark作业在本地模式下运行时,一切正常。 我使用的命令是: Spark错误日志:
Saving data using afterChange callback Saving data locally using persistentState Why should I use persistentState rather than regular LocalStorage API? Saving data using afterChange callback Use the a
save 用法 Usage: docker save [OPTIONS] IMAGE [IMAGE...] Save an image(s) to a tar archive (streamed to STDOUT by default) --help=false Print usage -o, --output= Write to an file, instea
并编写下面的查询,它是只在我的master上运行,还是将所有10个节点都用作worker? 如果不是,我必须做什么才能让我的Spark Sql使用完整的集群?
在k8s集群中。如何配置zeppelin在现有spark集群中运行spark作业,而不是旋转一个新的Pod? 我有一个k8s集群正在运行,我想在其中运行与齐柏林飞艇的火花。 Spark使用官方的Bitnami/Spark helm chart(v3.0.0)进行部署。我有一个主舱和两个工人舱运转良好,一切都很好。 短伪DockerFile: 我稍微修改了。(Image,imagePullSecre
我的Spark 2.3.3集群运行良好。我在“http://master-address:8080”上看到了GUI,其中有2个空闲的工作人员。 我有一个Scala应用程序,它创建上下文并启动作业。我不使用spark-submit,我以编程方式开始工作,这是许多答案与我的问题不同的地方。 在“my-app”中,我创建了一个新的SparkConf,代码如下(略有缩写): 司机跑到哪里去了?我如何找到它