当前位置: 首页 > 知识库问答 >
问题:

apachespark中的递归方法调用

公冶峰
2023-03-14

我正在ApacheSpark上的数据库中构建一个族谱,使用递归搜索来查找数据库中每个人的最终父级(即族谱顶部的人)。

假设搜索id时返回的第一个人是正确的家长

val peopleById = peopleRDD.keyBy(f => f.id)
def findUltimateParentId(personId: String) : String = {

    if((personId == null) || (personId.length() == 0))
        return "-1"

    val personSeq = peopleById.lookup(personId)
    val person = personSeq(0)
    if(person.personId == "0 "|| person.id == person.parentId) {

        return person.id

    }
    else {

        return findUltimateParentId(person.parentId)

    }

}

val ultimateParentIds = peopleRDD.foreach(f => f.findUltimateParentId(f.parentId))

它给出以下错误

“原因:org.apache.spark.SparkException:RDD转换和操作只能由驱动程序调用,不能在其他转换中调用;例如,rdd1.map(x=

我从阅读其他类似的问题中了解到,问题是,我从Foreach循环中调用findUltimate父母id,如果我从shell中调用带有个人id的方法,它将返回正确的最终父id

然而,其他建议的解决方案都不适合我,或者至少我看不出如何在我的程序中实现它们,有人能帮忙吗?


共有2个答案

柴琦
2023-03-14

通过使用SparkContext.broadcast修复此问题:

val peopleById = peopleRDD.keyBy(f => f.id)
val broadcastedPeople = sc.broadcast(peopleById.collectAsMap())

def findUltimateParentId(personId: String) : String = {

    if((personId == null) || (personId.length() == 0))
        return "-1"

    val personOption = broadcastedPeople.value.get(personId)
    if(personOption.isEmpty) {

        return "0";

    }
    val person = personOption.get
    if(person.personId == 0 || person.orgId == person.personId) {

        return person.id

    }
    else {

        return findUltimateParentId(person.parentId)

    }

}

val ultimateParentIds = peopleRDD.foreach(f => f.findUltimateParentId(f.parentId))

现在工作得很棒!

西门胜涝
2023-03-14

如果我没弄错的话——这里有一个解决方案,它适用于任何大小的输入(尽管性能可能不太好)——它在RDD上执行N次迭代,其中N是输入中“最深的家族”(从祖先到孩子的最大距离):

// representation of input: each person has an ID and an optional parent ID
case class Person(id: Int, parentId: Option[Int])

// representation of result: each person is optionally attached its "ultimate" ancestor,
// or none if it had no parent id in the first place
case class WithAncestor(person: Person, ancestor: Option[Person]) {
  def hasGrandparent: Boolean = ancestor.exists(_.parentId.isDefined)
}

object RecursiveParentLookup {
  // requested method
  def findUltimateParent(rdd: RDD[Person]): RDD[WithAncestor] = {

    // all persons keyed by id
    def byId = rdd.keyBy(_.id).cache()

    // recursive function that "climbs" one generation at each iteration
    def climbOneGeneration(persons: RDD[WithAncestor]): RDD[WithAncestor] = {
      val cached = persons.cache()
      // find which persons can climb further up family tree
      val haveGrandparents = cached.filter(_.hasGrandparent)

      if (haveGrandparents.isEmpty()) {
        cached // we're done, return result
      } else {
        val done = cached.filter(!_.hasGrandparent) // these are done, we'll return them as-is
        // for those who can - join with persons to find the grandparent and attach it instead of parent
        val withGrandparents = haveGrandparents
          .keyBy(_.ancestor.get.parentId.get) // grandparent id
          .join(byId)
          .values
          .map({ case (withAncestor, grandparent) => WithAncestor(withAncestor.person, Some(grandparent)) })
        // call this method recursively on the result
        done ++ climbOneGeneration(withGrandparents)
      }
    }

    // call recursive method - start by assuming each person is its own parent, if it has one:
    climbOneGeneration(rdd.map(p => WithAncestor(p, p.parentId.map(i => p))))
  }

}

这里有一个测试来更好地理解这是如何工作的:

/**
  *     Example input tree:
  *
  *            1             5
  *            |             |
  *      ----- 2 -----       6
  *      |           |
  *      3           4
  *
  */

val person1 = Person(1, None)
val person2 = Person(2, Some(1))
val person3 = Person(3, Some(2))
val person4 = Person(4, Some(2))
val person5 = Person(5, None)
val person6 = Person(6, Some(5))

test("find ultimate parent") {
  val input = sc.parallelize(Seq(person1, person2, person3, person4, person5, person6))
  val result = RecursiveParentLookup.findUltimateParent(input).collect()
  result should contain theSameElementsAs Seq(
    WithAncestor(person1, None),
    WithAncestor(person2, Some(person1)),
    WithAncestor(person3, Some(person1)),
    WithAncestor(person4, Some(person1)),
    WithAncestor(person5, None),
    WithAncestor(person6, Some(person5))
  )
}

将您的输入映射到这些Person对象中,并将输出与祖先对象映射到您需要的任何对象中应该很容易。请注意,此代码假定如果任何人具有parentId X,则输入中实际存在另一个具有该id的人

 类似资料:
  • 问题内容: 我的问题是是否有一些调试复杂的递归算法的聪明方法。假设我们有一个复杂的例子(在每个“嵌套迭代”中递归计数器都减少时,这不是简单的情况)。 我的意思是在可能发生循环时类似图的递归遍历。 我需要检查我是否在某处没有无限循环。而且仅使用调试器执行此操作并不能给出肯定的答案(因为我不确定算法是否处于无限循环中,还是只是按需进行处理)。 没有具体的例子很难解释。但是我需要的是… “要检查复杂的递

  • 我是一名大学生,学习球拍/方案和C作为我的CS学位的入门课程。 我在网上读到,在C语言中使用迭代而不是递归通常是最佳实践,因为递归由于将堆栈帧保存到调用堆栈上而代价高昂。。。 现在,在类似于Scheme的函数式语言中,递归一直在使用。我知道尾部递归在Scheme中是一个巨大的优势,据我所知,它只需要一个堆栈帧(有人能澄清这一点吗?)无论递归有多深。 我的问题是:非尾递归呢?每个函数应用程序是否保存

  • 我有两个非递归方法,其中一个读取字符串中的总“e”字符,另一个检查 ArrayList 是否按字母顺序排列。 递归方法的定义是方法调用自身。我相信我理解这个概念,但要实现它或将其转换为递归方法确实很困难。我怎样才能将这些方法转化为递归方法,同时我应该如何思考?此外,这是我的另一种方法,它只打印出指定数字大小的数字。 条件方法检查数字的第一个数字(从右起)是否大于第二个数字,并再次检查第二个是否大于

  • 我正在处理我当前的任务,即创建一个LinkedList数据结构,我已经创建了它以及其他方法,它工作得非常好。我正在处理我的最后一个问题,即制作一个toString方法。它应该: toString方法返回列表的字符串表示形式。用逗号分隔每个项目,并用大括号括住这些项目,例如{1,4,7,5}。公共toString方法必须调用私有递归方法来生成以逗号分隔的项目列表。(但您可以在公共方法中添加大括号。)

  • 我很难确定简单递归方法的大O。我不知道当一个方法被多次调用时会发生什么。我想更具体地谈谈我的困惑领域,但目前我正试图回答一些硬件问题,为了不想作弊,我要求任何回复本文的人提出一个简单的递归方法,并对所述方法的大O进行简单解释。(最好是Java语言……我正在学习的一种语言。) 谢谢你。

  • 我在深度优先搜索算法实现的递归方法方面遇到了一些麻烦。这是二叉树照片: 该方法在树的右侧(55、89、144)工作得很好,但是当它来到左侧时,它返回nil,即使它输入“是”。那么,代码有什么问题呢?节点是Node类的一个实例,它具有值(整数)并链接到左右子级(Node类的其他实例),如果它没有来自该侧的子级,则为nil。 下面是方法代码: