当前位置: 首页 > 知识库问答 >
问题:

Spark如何并行化1TB文件的处理?

柳英豪
2023-03-14
    null
$ ./spark-shell --packages com.databricks:spark-csv_2.10:1.4.0
val dfBigLog = sqlContext.read
  .format("com.databricks.spark.csv")
  .option("header", "true")
  .load("/media/username/myUSBdrive/bogusBigLog1TB.log")

dfBigLog.select("Country")
  .groupBy("Country")
  .agg(count($"Country") as "CountryCount")
  .orderBy($"CountryCount".desc).show

问题1:Spark如何并行处理?

我想大部分的执行时间(99%?)上面的解决方案是从USB驱动器中读取1TB文件到Spark集群中。从USB驱动器读取文件是不可并行的。但是在读取整个文件之后,Spark在底层做了什么来并行处理呢?

>

  • 有多少节点用于创建DataFrame?(也许只有一个?)

    假设Snappy压缩的Parquet文件小10倍,大小=100GB,HDFS块大小=128 MB。总共782个HDFS块。

    但是Spark是如何使用所有20个节点来创建DataFrame和处理(groupbycount)的呢?Spark每次都使用所有的节点吗?

  • 共有1个答案

    邹嘉致
    2023-03-14

    问题1:Spark如何并行处理(从USB驱动器读取文件)?

    这种情况是不可能的。

    Spark依赖于hadoop兼容的文件系统来读取文件。当您挂载USB驱动器时,您只能从本地主机访问它。试图执行

    .load("/media/username/myUSBdrive/bogusBigLog1TB.log")
    

    问题2:如何使Spark应用程序尽可能快?

    分而治之。
    问题中概述的策略很好。使用Parquet将允许Spark对数据进行投影,并且只对.select(“country”)列进行投影,从而进一步减少需要摄取的数据量,从而加快速度。

    Spark中并行性的基石是分区。同样,当我们从文件中读取时,Spark依赖于Hadoop文件系统。当从HDFS读取时,分区将由文件在HDFS上的拆分来指定。那些分裂将在执行者中平均分配。这就是Spark最初将工作分配给所有可用执行器的方式。

     类似资料:
    • 问题内容: 我有一个程序处理大量文件,其中每个文件都需要做两件事:首先,读取并处理一部分文件,然后存储结果。第一部分可以并行化,第二部分不能并行化。 顺序执行所有操作非常慢,因为CPU必须等待磁盘,然后工作一点,然后发出另一个请求,然后再次等待… 我做了以下 这很有帮助。但是,我想改善两点: 在获取一个固定的顺序,而不是处理任何结果,请首先执行。我该如何更改? 有成千上万的文件要处理,启动成千上万

    • 我有一个处理大量文件的程序,其中每个文件需要做两件事:首先,读取并处理文件的一部分,然后存储生成的MyFileData。第一部分可以并行,第二部分不能并行。 按顺序做每件事都非常慢,因为CPU必须等待磁盘,然后工作一点,然后发出另一个请求,然后再次等待。。。 我做了以下事情 这很有帮助。然而,我想改进两件事: > 以固定顺序执行,而不是首先处理任何可用的结果。如何更改它? 有数千个文件需要处理,启

    • 当我使用spark API运行类似的代码时,它在许多不同的(分布式)作业中运行,并且成功运行。当我运行它时,我的代码(应该做与Spark代码相同的事情),我得到一个堆栈溢出错误。知道为什么吗? 代码如下: 我相信我正在使用与spark相同的所有并行化工作,但它对我不起作用。任何关于使我的代码分发/帮助了解为什么在我的代码中发生内存溢出的建议都将是非常有帮助的

    • 问题内容: 我可以一次下载一个文件: 我可以这样尝试: 有没有不使用或作弊的并行化方法? 鉴于我现在必须诉诸“作弊”,是否是下载数据的正确方法? 使用上述方法时,它使用的是多线程而不是多核的,是否正常?有没有办法使它成为多核而不是多线程? 问题答案: 您可以使用线程池并行下载文件: 您还可以使用以下命令在一个线程中一次下载多个文件: 这里定义在哪里。

    • 假设数组如下所示: 数组中最多可以有100.000个值。 另一方面,如果我这样做: 我得到serialization异常,因为spark正在尝试序列化spark上下文,而spark上下文是不可序列化的。 如何使这个工作,但仍然利用并行性。 这是我得到的咒语: