>
我正在使用Kafka Spark流媒体来获取流媒体数据。
val lines = KafkaUtils.createDirectStream[Array[Byte], String, DefaultDecoder, StringDecoder](ssc, kafkaConf, Set(topic)).map(_._2)
我正在使用此数据流并处理RDD
val output = lines.foreachRDD(rdd =>
rdd.foreachPartition { partition =>
partition.foreach { file => runConfigParser(file)}
})
runConfigParser是一种JAVA方法,它解析文件并生成必须保存在HDFS中的输出。因此,多个节点将处理RDD并将输出写入一个HDFS文件。因为我想把这五个装进蜂箱。
我应该输出runConfigParser的结果并使用sc.parallze(输出)。保存ASTEXTFILE(path),以便所有节点将RDD输出写入单个HDFS文件。?这种设计有效吗?
我将在HIVE中加载这个HDFS文件(它将不断更新为其流数据),并使用Impala进行查询。
您可以使用一个函数“合并”saveAsTextFile
的结果。像这样:
import org.apache.hadoop.fs._
def saveAsTextFileAndMerge[T](hdfsServer: String, fileName: String, rdd: RDD[T]) = {
val sourceFile = hdfsServer + "/tmp/"
rdd.saveAsTextFile(sourceFile)
val dstPath = hdfsServer + "/final/"
merge(sourceFile, dstPath, fileName)
}
def merge(srcPath: String, dstPath: String, fileName: String): Unit = {
val hadoopConfig = new Configuration()
val hdfs = FileSystem.get(hadoopConfig)
val destinationPath = new Path(dstPath)
if (!hdfs.exists(destinationPath)) {
hdfs.mkdirs(destinationPath)
}
FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath + "/" + fileName), false, hadoopConfig, null)
}
不需要。因为您需要一个HDFS文件,所以为RDD分区创建许多HDFS文件的saveAsTextFile不能满足您的要求。
为了得到一个HDFS文件,duce
/收集
输出并调用HDFSJavaAPI来创建HDFS文件。这种方法效率低下,因为所有输出都需要在最后一个Spark操作时到达Spark驱动程序。
一、介绍 HDFS (Hadoop Distributed File System)是 Hadoop 下的分布式文件系统,具有高容错、高吞吐量等特性,可以部署在低成本的硬件上。 二、HDFS 设计原理 2.1 HDFS 架构 HDFS 遵循主/从架构,由单个 NameNode(NN) 和多个 DataNode(DN) 组成: NameNode : 负责执行有关 文件系统命名空间 的操作,例如打开,
我正在尝试运行批处理文件, 它将转到祖父母文件夹,并对其所有子存储库目录执行git pull。出于某种原因,这是行不通的。我怎样才能让它正常工作? 参考以下内容: 如何git拉多个repos上的窗口? 使用Windows 10
我正在使用azure databricks和blob存储。我有一个存储帐户,每小时存储来自物联网设备的数据。因此,文件夹结构是{年/月/日/小时},它将数据存储为csv文件。我的要求是,需要每天从azure databricks访问文件(因此从0-23开始将有24个文件夹),并需要执行一些计算。
简介 注意:Xiaomi Cloud-ML服务访问HDFS数据,由于各个机房和用户网络环境差别,请首先联系Cloud-ML开发人员,咨询Cloud-ML服务是否可以访问特定的HDFS集群。 使用Docker容器 我们已经制作了Docker镜像,可以直接访问c3prc-hadoop集群。 sudo docker run -i -t --net=host -e PASSWORD=mypassword
我的问题陈述。读取包含1000万数据的csv文件,并将其存储在数据库中。用尽可能少的时间 我使用java的简单多线程执行器实现了它,其逻辑几乎与spring batch的chunk相似。从csv文件中读取预配置数量的数据,然后创建一个线程,并将数据传递给线程,该线程验证数据,然后写入多线程运行的文件。完成所有任务后,我将调用sql loader来加载每个文件。现在我想把这段代码移到spring b