当前位置: 首页 > 知识库问答 >
问题:

处理Spark Streaming rdd并存储到单个HDFS文件

唐兴思
2023-03-14

>

  • 我正在使用Kafka Spark流媒体来获取流媒体数据。

    val lines = KafkaUtils.createDirectStream[Array[Byte], String, DefaultDecoder, StringDecoder](ssc, kafkaConf, Set(topic)).map(_._2)
    

    我正在使用此数据流并处理RDD

    val output = lines.foreachRDD(rdd => 
            rdd.foreachPartition { partition => 
                partition.foreach { file => runConfigParser(file)}
    })
    

    runConfigParser是一种JAVA方法,它解析文件并生成必须保存在HDFS中的输出。因此,多个节点将处理RDD并将输出写入一个HDFS文件。因为我想把这五个装进蜂箱。

    我应该输出runConfigParser的结果并使用sc.parallze(输出)。保存ASTEXTFILE(path),以便所有节点将RDD输出写入单个HDFS文件。?这种设计有效吗?

    我将在HIVE中加载这个HDFS文件(它将不断更新为其流数据),并使用Impala进行查询。

  • 共有2个答案

    柳德义
    2023-03-14

    您可以使用一个函数“合并”saveAsTextFile的结果。像这样:

    import org.apache.hadoop.fs._
    
    def saveAsTextFileAndMerge[T](hdfsServer: String, fileName: String, rdd: RDD[T]) = {
        val sourceFile = hdfsServer + "/tmp/" 
        rdd.saveAsTextFile(sourceFile)
        val dstPath = hdfsServer + "/final/" 
        merge(sourceFile, dstPath, fileName)
      }
    
      def merge(srcPath: String, dstPath: String, fileName: String): Unit = {
        val hadoopConfig = new Configuration()
        val hdfs = FileSystem.get(hadoopConfig)
        val destinationPath = new Path(dstPath)
        if (!hdfs.exists(destinationPath)) {
          hdfs.mkdirs(destinationPath)
        }
        FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath + "/" + fileName), false, hadoopConfig, null)
      }
    
    宣意致
    2023-03-14

    不需要。因为您需要一个HDFS文件,所以为RDD分区创建许多HDFS文件的saveAsTextFile不能满足您的要求。

    为了得到一个HDFS文件,duce/收集输出并调用HDFSJavaAPI来创建HDFS文件。这种方法效率低下,因为所有输出都需要在最后一个Spark操作时到达Spark驱动程序。

     类似资料:
    • 一、介绍 HDFS (Hadoop Distributed File System)是 Hadoop 下的分布式文件系统,具有高容错、高吞吐量等特性,可以部署在低成本的硬件上。 二、HDFS 设计原理 2.1 HDFS 架构 HDFS 遵循主/从架构,由单个 NameNode(NN) 和多个 DataNode(DN) 组成: NameNode : 负责执行有关 文件系统命名空间 的操作,例如打开,

    • 我正在尝试运行批处理文件, 它将转到祖父母文件夹,并对其所有子存储库目录执行git pull。出于某种原因,这是行不通的。我怎样才能让它正常工作? 参考以下内容: 如何git拉多个repos上的窗口? 使用Windows 10

    • 我正在使用azure databricks和blob存储。我有一个存储帐户,每小时存储来自物联网设备的数据。因此,文件夹结构是{年/月/日/小时},它将数据存储为csv文件。我的要求是,需要每天从azure databricks访问文件(因此从0-23开始将有24个文件夹),并需要执行一些计算。

    • 简介 注意:Xiaomi Cloud-ML服务访问HDFS数据,由于各个机房和用户网络环境差别,请首先联系Cloud-ML开发人员,咨询Cloud-ML服务是否可以访问特定的HDFS集群。 使用Docker容器 我们已经制作了Docker镜像,可以直接访问c3prc-hadoop集群。 sudo docker run -i -t --net=host -e PASSWORD=mypassword

    • 我的问题陈述。读取包含1000万数据的csv文件,并将其存储在数据库中。用尽可能少的时间 我使用java的简单多线程执行器实现了它,其逻辑几乎与spring batch的chunk相似。从csv文件中读取预配置数量的数据,然后创建一个线程,并将数据传递给线程,该线程验证数据,然后写入多线程运行的文件。完成所有任务后,我将调用sql loader来加载每个文件。现在我想把这段代码移到spring b