当前位置: 首页 > 知识库问答 >
问题:

如何使saveAsTextFile不将输出拆分为多个文件?

麻烨
2023-03-14

在Spark中使用Scala时,每当我使用saveAsTextFile转储结果时,它似乎将输出分成多个部分。我只是向它传递一个参数(路径)。

val year = sc.textFile("apat63_99.txt").map(_.split(",")(1)).flatMap(_.split(",")).map((_,1)).reduceByKey((_+_)).map(_.swap)
year.saveAsTextFile("year")
  1. 输出数量是否与其使用的减速器数量相对应
  2. 这是否意味着输出被压缩
  3. 我知道我可以使用bash将输出组合在一起,但是有没有一个选项可以将输出存储在单个文本文件中,而不进行拆分??我看了API文档,但它并没有说太多

共有3个答案

靳高明
2023-03-14

您可以调用coalesce(1),然后调用saveAsTextFile()——但如果您有大量数据,这可能是个坏主意。每次拆分都会生成单独的文件,就像在Hadoop中一样,以便让单独的映射器和还原器写入不同的文件。只有当您的数据很少时,才有一个单独的输出文件,在这种情况下,您还可以像@aaronman所说的那样执行collect()。

洪俊拔
2023-03-14

对于使用较大数据集的用户:

>

  • rdd.collect()在这种情况下不应该使用,因为它会将所有数据作为驱动程序中的Array收集,这是获取内存溢出的最简单方法。

    <代码>rdd。聚结(1)。也不应使用saveAsTextFile(),因为在存储数据的单个节点上执行上游阶段的并行将丢失。

    <代码>rdd。合并(1,shuffle=true)。saveAsTextFile()是最好的简单选项,因为它将使上游任务的处理保持并行,然后只对一个节点执行无序移动(rdd.repartition(1))。saveAsTextFile()是一个确切的同义词)。

    <代码>rdd。下面提供的saveAsSingleTextFile()还允许使用特定名称将rdd存储在单个文件中,同时保留rdd的并行性属性。合并(1,shuffle=true)。saveAsTextFile()。

    rdd.coalesce(1, shuffle=true). saveAsTextFile("path/to/file.txt")可能不方便的是,它实际上生成的文件的路径是path/to/file.txt/part-00000而不是path/to/file.txt

    以下解决方案是rdd。saveAsSingleTextFile(“path/to/file.txt”)将实际生成一个路径为path/to/file的文件。txt文件:

    package com.whatever.package
    
    import org.apache.spark.rdd.RDD
    import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
    import org.apache.hadoop.io.compress.CompressionCodec
    
    object SparkHelper {
    
      // This is an implicit class so that saveAsSingleTextFile can be attached to
      // SparkContext and be called like this: sc.saveAsSingleTextFile
      implicit class RDDExtensions(val rdd: RDD[String]) extends AnyVal {
    
        def saveAsSingleTextFile(path: String): Unit =
          saveAsSingleTextFileInternal(path, None)
    
        def saveAsSingleTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit =
          saveAsSingleTextFileInternal(path, Some(codec))
    
        private def saveAsSingleTextFileInternal(
            path: String, codec: Option[Class[_ <: CompressionCodec]]
        ): Unit = {
    
          // The interface with hdfs:
          val hdfs = FileSystem.get(rdd.sparkContext.hadoopConfiguration)
    
          // Classic saveAsTextFile in a temporary folder:
          hdfs.delete(new Path(s"$path.tmp"), true) // to make sure it's not there already
          codec match {
            case Some(codec) => rdd.saveAsTextFile(s"$path.tmp", codec)
            case None        => rdd.saveAsTextFile(s"$path.tmp")
          }
    
          // Merge the folder of resulting part-xxxxx into one file:
          hdfs.delete(new Path(path), true) // to make sure it's not there already
          FileUtil.copyMerge(
            hdfs, new Path(s"$path.tmp"),
            hdfs, new Path(path),
            true, rdd.sparkContext.hadoopConfiguration, null
          )
          // Working with Hadoop 3?: https://stackoverflow.com/a/50545815/9297144
    
          hdfs.delete(new Path(s"$path.tmp"), true)
        }
      }
    }
    

    可以这样使用:

    import com.whatever.package.SparkHelper.RDDExtensions
    
    rdd.saveAsSingleTextFile("path/to/file.txt")
    
    scala prettyprint-override">// Or if the produced file is to be compressed:
    import org.apache.hadoop.io.compress.GzipCodec
    rdd.saveAsSingleTextFile("path/to/file.txt.gz", classOf[GzipCodec])
    

    此片段:

    >

  • 首先将带有rdd.saveAsTextFile("path/to/file.txt")的rdd存储在临时文件夹path/to/file.txt.tmp中,就好像我们不想将数据存储在一个文件中一样(这使上游任务的处理保持并行)

    然后,仅使用hadoop文件系统api,我们继续对不同的输出文件进行合并(FileUtil.copyMerge()),以创建最终的输出单个文件路径/到/文件。txt。

  • 夹谷阳夏
    2023-03-14

    它将其保存为多个文件的原因是计算是分布式的。如果输出足够小,以至于您认为可以在一台机器上安装它,那么您可以使用

    val arr = year.collect()
    

    然后将生成的数组保存为文件,另一种方法是使用自定义分区器分区通过,并使其所有内容都进入一个分区,但这并不可取,因为您不会得到任何并行化。

    如果需要使用saveAsTextFile保存文件,可以使用coalesce(1,true)。saveAsTextFile()。这基本上意味着进行计算,然后合并到1个分区。您还可以使用重新分区(1),它只是将shuffle参数设置为true时合并的包装。查看RDD的来源。scala是我解决大部分问题的方法,你应该看看。

     类似资料:
    • 问题内容: 我有以下代码: 我将有许多服务(如一项服务),并且我不想将它们全部放在同一个文件中。 我在Stack Overflow中读了另一个问题,我可能需要这样的其他文件:在该文件中写入所有服务,但是当我启动Node时会抛出该错误。 如何分隔代码? 问题答案: 您可以在不同的文件(例如 test-routes.js)中 定义路由,如下所示: 现在在您的主文件中说出 server.js, 您可以像

    • 问题内容: 我有一个几分钟的.wav文件,我想分割成10秒的另一个.wav文件。 到目前为止,这是我的python代码: 印刷产量: 我知道这是框架列表。我如何为该列表中的每个元素制作一个wav文件(第一个.wav文件将是?Python的模块尚不清楚如何使用框架创建.wav文件。 编辑:这是一个重复的问题,如何在python中将音频文件(wav格式)拼接为1秒拼接? 但是,如果某人的答案不需要,我

    • 问题内容: 我有从mongodb导出的json文件,如下所示: 大约有30000行,我想将每一行拆分成自己的文件。 (我正在尝试将我的数据转移到榻榻米群集上) 我尝试这样做: 但是我发现它似乎减少了行的负载,而当我期望30000个奇数时,运行此命令的输出仅给了我50个奇数文件! 有没有一种逻辑方法可以使此操作不使用任何适合的方法删除任何数据? 问题答案: 假设您不在乎确切的文件名,如果要将输入拆分

    • 问题内容: 将Spring的配置拆分为多个xml文件的正确方法是什么? 此刻我有 /WEB-INF/foo-servlet.xml /WEB-INF/foo-service.xml /WEB-INF/foo-persistence.xml 我有以下内容: 实际问题: 这种方法正确/最佳吗? 我真的需要同时指定中的配置位置 和该板块? 我需要记住什么才能能够引用中定义的?这与 指定有关吗? 更新1:

    • 问题内容: 我的体积太大了,很难找到正确的视图。 如何将其拆分为多个文件,然后导入?是否涉及速度损失? 我可以这样吗? 问题答案: 在Django中,所有内容都是Python模块(* .py)。你可以创建一个具有内部视图的文件夹,并且仍然可以导入视图,因为这也实现了Python模块。但是一个例子会更好。 你的原始图片可能如下所示: 使用以下文件夹/文件结构,它将起到相同的作用: viewsa.py

    • 问题内容: 我需要将整个表从一个MySQL数据库移动到另一个数据库。我没有完全访问第二个权限,只有phpMyAdmin访问权限。我只能上传(压缩)小于2MB的sql文件。但是,第一个数据库表的mysqldump的压缩输出大于10MB。 有没有办法将mysqldump的输出分成较小的文件?我无法使用split(1),因为无法将文件分类(1)到远程服务器上。 还是我错过了其他解决方案? 编辑 第一个发