当前位置: 首页 > 知识库问答 >
问题:

(为什么)我们需要在RDD上调用缓存或持久化吗

高经艺
2023-03-14

当从文本文件或集合(或从另一个RDD)创建弹性分布式数据集(RDD)时,我们是否需要显式调用“缓存”或“持久化”来将RDD数据存储到内存中?或者RDD数据默认以分布式方式存储在内存中?

val textFile = sc.textFile("/user/emp.txt")

根据我的理解,在上述步骤之后,text File是一个RDD,并且在所有/部分节点的内存中可用。

如果是这样,那么为什么我们需要在文本文件RDD上调用“缓存”或“持久化”?

共有3个答案

长孙阳焱
2023-03-14

我们是否需要显式地调用“缓存”或“持久化”来将RDD数据存储到内存中?

是的,只有在需要的时候。

默认情况下,RDD数据以分布式方式存储在内存中?

不!

原因如下:

>

  • Spark支持两种类型的共享变量:广播变量,可用于在所有节点上缓存内存中的值,以及累加器,它们是仅“添加”到的变量,例如计数器和总和。

    RDD支持两种类型的操作:转换(从现有数据集创建新数据集)和操作(在数据集上运行计算后向驱动程序返回值)。例如,map是一种转换,它通过一个函数传递每个数据集元素,并返回一个表示结果的新RDD。另一方面,reduce是一个操作,它使用一些函数聚合RDD的所有元素,并将最终结果返回给驱动程序(尽管还有一个并行reduceByKey返回分布式数据集)。

    Spark中的所有转换都是懒惰的,因为它们不会立即计算结果。相反,他们只记得应用于某些基本数据集(例如文件)的转换。仅当操作需要将结果返回到驱动程序时,才计算转换。这种设计使Spark能够更高效地运行–例如,我们可以意识到,通过map创建的数据集将用于reduce,并且只将reduce的结果返回给驱动程序,而不是更大的映射数据集。

    默认情况下,每次对每个转换的RDD运行操作时,都可能会重新计算它。但是,您也可以使用persist(或cache)方法在内存中持久化RDD,在这种情况下,Spark将保留集群中的元素,以便下次查询时更快地访问它。还支持在磁盘上持久化RDD,或跨多个节点进行复制。

    有关更多详细信息,请查看《Spark编程指南》。

  • 蒋星雨
    2023-03-14

    我认为这个问题最好表述为:

    Spark进程是懒惰的,也就是说,在需要之前什么都不会发生。为了快速回答这个问题,在发出val textFile=sc.textFile(“/user/emp.txt”)后,数据不会发生任何变化,只构建了一个HadoopRDD,将该文件用作源。

    假设我们对数据进行了一点转换:

    val wordsRDD = textFile.flatMap(line => line.split("\\W"))
    

    同样,数据没有任何变化。现在有了一个新的RDD,它包含了对测试文件的引用和一个需要时应用的函数。

    只有当对RDD调用操作时,例如wordsRDD. count,才会执行称为沿袭的RDD链。也就是说,按分区分解的数据将由Spark集群的执行器加载,将应用平面图函数并计算结果。

    在线性沿袭上,如本例中的沿袭,不需要缓存()。数据将加载到执行器,应用所有转换,最后计算计数,全部在内存中-如果数据适合内存。

    缓存在RDD的沿袭分支时很有用。假设您想将上一个示例的单词过滤为正负单词的计数。您可以这样做:

    val positiveWordsCount = wordsRDD.filter(word => isPositive(word)).count()
    val negativeWordsCount = wordsRDD.filter(word => isNegative(word)).count()
    

    在这里,每个分支都会重新加载数据。添加显式cache语句将确保保留和重用以前完成的处理。该作业如下所示:

    val textFile = sc.textFile("/user/emp.txt")
    val wordsRDD = textFile.flatMap(line => line.split("\\W"))
    wordsRDD.cache()
    val positiveWordsCount = wordsRDD.filter(word => isPositive(word)).count()
    val negativeWordsCount = wordsRDD.filter(word => isNegative(word)).count()
    

    出于这个原因,cache被称为“破坏沿袭”,因为它创建了一个可以重用以进行进一步处理的检查点。

    经验法则:当RDD的沿袭分支出来或RDD像在循环中一样多次使用时,请使用缓存。

    糜正业
    2023-03-14

    大多数RDD操作都是懒惰的。可以将RDD视为一系列操作的描述。RDD不是数据。所以这一行:

    val textFile = sc.textFile("/user/emp.txt")
    

    它什么都不做。它创建了一个RDD,上面写着“我们需要加载这个文件”。此时未加载文件。

    需要观察数据内容的RDD操作不能是懒惰的。(这些称为操作。)例如,RDD。计数-要告诉您文件中的行数,需要读取文件。因此,如果您编写文本文件。计数,此时将读取文件,对行进行计数,并返回计数。

    如果您再次调用text File.count怎么办?同样的事情:文件将被读取并再次计数。什么都没有存储。RDD不是数据。

    那么RDD. cache是做什么的呢?如果您将text File.cache添加到上面的代码中:

    val textFile = sc.textFile("/user/emp.txt")
    textFile.cache
    

    它什么也不做。RDD. cache也是一个惰性操作。文件仍然没有被读取。但是现在RDD说“读取这个文件,然后缓存内容”。如果您第一次运行text File.count,文件将被加载、缓存和计数。如果您第二次调用text File.count,操作将使用缓存。它只是从缓存中获取数据并计数行。

    缓存行为取决于可用内存。例如,如果该文件在内存中不合适,则使用文本文件。计数将返回到通常的行为并重新读取文件。

     类似资料:
    • 和RDD相似,DStreams也允许开发者持久化流数据到内存中。在DStream上使用persist()方法可以自动地持久化DStream中的RDD到内存中。如果DStream中的数据需要计算多次,这是非常有用的。像reduceByWindow和reduceByKeyAndWindow这种窗口操作、updateStateByKey这种基于状态的操作,持久化是默认的,不需要开发者调用persist(

    • 问题内容: 训练期间需要调用该方法。但是文档不是很有帮助 为什么我们需要调用此方法? 问题答案: 在中,我们需要在开始进行反向传播之前将梯度设置为零,因为PyTorch 会 在随后的向后传递中 累积梯度 。在训练RNN时这很方便。因此,默认操作是在每次调用时累积(即求和)梯度。 因此,理想情况下,当您开始训练循环时,应该正确进行参数更新。否则,梯度将指向预期方向以外的其他方向,即朝向 最小值 (或

    • Spark通过在操作中将其持久保存在内存中,提供了一种处理数据集的便捷方式。在持久化RDD的同时,每个节点都存储它在内存中计算的任何分区。也可以在该数据集的其他任务中重用它们。 我们可以使用或方法来标记要保留的RDD。Spark的缓存是容错的。在任何情况下,如果RDD的分区丢失,它将使用最初创建它的转换自动重新计算。 存在可用于存储持久RDD的不同存储级别。通过将对象(Scala,Java,Pyt

    • Spark 有一个最重要的功能是在内存中_持久化_ (或 缓存)一个数据集。

    • Spark最重要的一个功能是它可以通过各种操作(operations)持久化(或者缓存)一个集合到内存中。当你持久化一个RDD的时候,每一个节点都将参与计算的所有分区数据存储到内存中,并且这些 数据可以被这个集合(以及这个集合衍生的其他集合)的动作(action)重复利用。这个能力使后续的动作速度更快(通常快10倍以上)。对应迭代算法和快速的交互使用来说,缓存是一个关键的工具。 你能通过persi

    • 问题内容: 我开始使用RxJS,但我不明白为什么在此示例中我们需要使用类似or 的函数;数组的数组在哪里? 如果有人可以直观地解释正在发生的事情,那将非常有帮助。 问题答案: 当您有一个Observable的结果是更多Observable时,可以使用flatMap。 如果您有一个由另一个可观察对象产生的可观察对象,则您不能直接过滤,缩小或映射它,因为您有一个可观察对象而不是数据。如果您生成一个可观