当前位置: 首页 > 知识库问答 >
问题:

Spark源代码:如何理解withScope方法

程和煦
2023-03-14

我无法理解与Scope方法的功能(实际上,我真的不知道RDD操作范围类的含义)

特别是,(正文:=)的含义是什么

private[spark] def withScope[T](
  sc: SparkContext,
  name: String,
  allowNesting: Boolean,
  ignoreParent: Boolean)(body: => T): T = {
// Save the old scope to restore it later
val scopeKey = SparkContext.RDD_SCOPE_KEY
val noOverrideKey = SparkContext.RDD_SCOPE_NO_OVERRIDE_KEY
val oldScopeJson = sc.getLocalProperty(scopeKey)
val oldScope = Option(oldScopeJson).map(RDDOperationScope.fromJson)
val oldNoOverride = sc.getLocalProperty(noOverrideKey)
try {
  if (ignoreParent) {
    // Ignore all parent settings and scopes and start afresh with our own root scope
    sc.setLocalProperty(scopeKey, new RDDOperationScope(name).toJson)
  } else if (sc.getLocalProperty(noOverrideKey) == null) {
    // Otherwise, set the scope only if the higher level caller allows us to do so
    sc.setLocalProperty(scopeKey, new RDDOperationScope(name, oldScope).toJson)
  }
  // Optionally disallow the child body to override our scope
  if (!allowNesting) {
    sc.setLocalProperty(noOverrideKey, "true")
  }
  body
} finally {
  // Remember to restore any state that was modified before exiting
  sc.setLocalProperty(scopeKey, oldScopeJson)
  sc.setLocalProperty(noOverrideKey, oldNoOverride)
}
}

您可以通过以下链接找到源代码:https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDDOperationScope.scala

任何人都可以帮我吗?谢谢,我对此感到困惑了很长一段时间。

共有2个答案

穆俊哲
2023-03-14

您需要了解如何调用withScope。这是RDD.scala中的一个示例

     /**
   *  Return a new RDD by first applying a function to all elements of this
   *  RDD, and then flattening the results.
   */
  def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))
  }

基本上,它创建了一个新的作用域(代码块),以便前一个函数中的变量不会与当前函数混合。作用域的主体是在withScope之后传递的内容,在本例中是

      {
        val cleanF = sc.clean(f)
        new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))
      }

我还没有达到恢复旧范围的地步。

沈子昂
2023-03-14

以下代码可能会对您有所帮助

object TestWithScope {
    def withScope(func: => String) = {
        println("withscope")
        func
    }

    def bar(foo: String) = withScope {
        println("Bar: " + foo)
        "BBBB"
    }

    def main(args: Array[String]): Unit = {
        println(bar("AAAA"));
    }
}

可能的产出

withscope
Bar: AAAA
BBBB
 类似资料:
  • 今天在研究Java中Integer.parseInt的源码时,对于int multmin = limit / radix;这一句代码不太理解,请教一下为什么通过 result < multmin可以判断是否越界? 参考资料:https://www.jianshu.com/p/da80a793dd57

  • 我想通过将spark Java/Scala api转换为dll文件来运行C#中的apache spark源代码。我引用了IKVM/IKVMC将spark jar文件转换为dll文件,但无法得到正确的结果。有没有办法在C#中运行spark源?

  • 本文向大家介绍如何把C++的源代码改写成C代码的方法,包括了如何把C++的源代码改写成C代码的方法的使用技巧和注意事项,需要的朋友参考一下 C++解释器比C语言解释器占用的存储空间要大,想要在某些特定场合兼容C++代码,同时为了节省有限的存储空间,降低成本,也为了提高效率,将用C++语言写的源程序用C语言改写是很有必要的。 C++与C区别最大的就是C++中的类的概念和特性,将C++改为C的问题,就

  • 本部分试图从专题和业务流程的角度来剖析 Neutron 代码,以便理解如此设计的内涵。

  • 问题内容: 我希望解析Java源代码文件,并提取方法源代码。 我需要这样的方法: 有没有简单的方法可以做到这一点,有一个库可以帮助我构建方法,等等? 问题答案: 从https://javaparser.org/下载Java解析器 您必须编写一些代码。此代码将调用解析器…它将返回一个CompilationUnit: 注意:SEDInputStream是输入流的子类。您可以根据需要使用FileInpu

  • 我正在努力学习Spark,到目前为止进展顺利,除了我需要在值为列表的一对RDD上使用诸如还原ByKey或组合ByKey之类的函数的问题。 我一直试图为这些函数找到详细的留档,这可以解释参数实际上是什么,这样我就可以自己解决它,而不必去Stack Overflow,但我就是找不到任何好的Spark留档。我读过Learning Spark的第3章和第4章,但老实说,对最复杂函数的解释非常糟糕。 我现在