我正在ApacheSpark上的数据库中构建一个族谱,使用递归搜索来查找数据库中每个人的最终父级(即族谱顶部的人)。
假设搜索id时返回的第一个人是正确的家长
val peopleById = peopleRDD.keyBy(f => f.id)
def findUltimateParentId(personId: String) : String = {
if((personId == null) || (personId.length() == 0))
return "-1"
val personSeq = peopleById.lookup(personId)
val person = personSeq(0)
if(person.personId == "0 "|| person.id == person.parentId) {
return person.id
}
else {
return findUltimateParentId(person.parentId)
}
}
val ultimateParentIds = peopleRDD.foreach(f => f.findUltimateParentId(f.parentId))
它给出以下错误
“原因:org.apache.spark.SparkException:RDD转换和操作只能由驱动程序调用,不能在其他转换中调用;例如,rdd1.map(x=
我从阅读其他类似的问题中了解到,问题是,我从Foreach循环中调用
findUltimate父母id
,如果我从shell中调用带有个人id的方法,它将返回正确的最终父id
然而,其他建议的解决方案都不适合我,或者至少我看不出如何在我的程序中实现它们,有人能帮忙吗?
通过使用SparkContext.broadcast修复此问题:
val peopleById = peopleRDD.keyBy(f => f.id)
val broadcastedPeople = sc.broadcast(peopleById.collectAsMap())
def findUltimateParentId(personId: String) : String = {
if((personId == null) || (personId.length() == 0))
return "-1"
val personOption = broadcastedPeople.value.get(personId)
if(personOption.isEmpty) {
return "0";
}
val person = personOption.get
if(person.personId == 0 || person.orgId == person.personId) {
return person.id
}
else {
return findUltimateParentId(person.parentId)
}
}
val ultimateParentIds = peopleRDD.foreach(f => f.findUltimateParentId(f.parentId))
现在工作得很棒!
如果我没弄错的话——这里有一个解决方案,它适用于任何大小的输入(尽管性能可能不太好)——它在RDD上执行N次迭代,其中N是输入中“最深的家族”(从祖先到孩子的最大距离):
// representation of input: each person has an ID and an optional parent ID
case class Person(id: Int, parentId: Option[Int])
// representation of result: each person is optionally attached its "ultimate" ancestor,
// or none if it had no parent id in the first place
case class WithAncestor(person: Person, ancestor: Option[Person]) {
def hasGrandparent: Boolean = ancestor.exists(_.parentId.isDefined)
}
object RecursiveParentLookup {
// requested method
def findUltimateParent(rdd: RDD[Person]): RDD[WithAncestor] = {
// all persons keyed by id
def byId = rdd.keyBy(_.id).cache()
// recursive function that "climbs" one generation at each iteration
def climbOneGeneration(persons: RDD[WithAncestor]): RDD[WithAncestor] = {
val cached = persons.cache()
// find which persons can climb further up family tree
val haveGrandparents = cached.filter(_.hasGrandparent)
if (haveGrandparents.isEmpty()) {
cached // we're done, return result
} else {
val done = cached.filter(!_.hasGrandparent) // these are done, we'll return them as-is
// for those who can - join with persons to find the grandparent and attach it instead of parent
val withGrandparents = haveGrandparents
.keyBy(_.ancestor.get.parentId.get) // grandparent id
.join(byId)
.values
.map({ case (withAncestor, grandparent) => WithAncestor(withAncestor.person, Some(grandparent)) })
// call this method recursively on the result
done ++ climbOneGeneration(withGrandparents)
}
}
// call recursive method - start by assuming each person is its own parent, if it has one:
climbOneGeneration(rdd.map(p => WithAncestor(p, p.parentId.map(i => p))))
}
}
这里有一个测试来更好地理解这是如何工作的:
/**
* Example input tree:
*
* 1 5
* | |
* ----- 2 ----- 6
* | |
* 3 4
*
*/
val person1 = Person(1, None)
val person2 = Person(2, Some(1))
val person3 = Person(3, Some(2))
val person4 = Person(4, Some(2))
val person5 = Person(5, None)
val person6 = Person(6, Some(5))
test("find ultimate parent") {
val input = sc.parallelize(Seq(person1, person2, person3, person4, person5, person6))
val result = RecursiveParentLookup.findUltimateParent(input).collect()
result should contain theSameElementsAs Seq(
WithAncestor(person1, None),
WithAncestor(person2, Some(person1)),
WithAncestor(person3, Some(person1)),
WithAncestor(person4, Some(person1)),
WithAncestor(person5, None),
WithAncestor(person6, Some(person5))
)
}
将您的输入映射到这些Person
对象中,并将输出与祖先
对象映射到您需要的任何对象中应该很容易。请注意,此代码假定如果任何人具有parentId X,则输入中实际存在另一个具有该id的人
问题内容: 我的问题是是否有一些调试复杂的递归算法的聪明方法。假设我们有一个复杂的例子(在每个“嵌套迭代”中递归计数器都减少时,这不是简单的情况)。 我的意思是在可能发生循环时类似图的递归遍历。 我需要检查我是否在某处没有无限循环。而且仅使用调试器执行此操作并不能给出肯定的答案(因为我不确定算法是否处于无限循环中,还是只是按需进行处理)。 没有具体的例子很难解释。但是我需要的是… “要检查复杂的递
我是一名大学生,学习球拍/方案和C作为我的CS学位的入门课程。 我在网上读到,在C语言中使用迭代而不是递归通常是最佳实践,因为递归由于将堆栈帧保存到调用堆栈上而代价高昂。。。 现在,在类似于Scheme的函数式语言中,递归一直在使用。我知道尾部递归在Scheme中是一个巨大的优势,据我所知,它只需要一个堆栈帧(有人能澄清这一点吗?)无论递归有多深。 我的问题是:非尾递归呢?每个函数应用程序是否保存
我有两个非递归方法,其中一个读取字符串中的总“e”字符,另一个检查 ArrayList 是否按字母顺序排列。 递归方法的定义是方法调用自身。我相信我理解这个概念,但要实现它或将其转换为递归方法确实很困难。我怎样才能将这些方法转化为递归方法,同时我应该如何思考?此外,这是我的另一种方法,它只打印出指定数字大小的数字。 条件方法检查数字的第一个数字(从右起)是否大于第二个数字,并再次检查第二个是否大于
我正在处理我当前的任务,即创建一个LinkedList数据结构,我已经创建了它以及其他方法,它工作得非常好。我正在处理我的最后一个问题,即制作一个toString方法。它应该: toString方法返回列表的字符串表示形式。用逗号分隔每个项目,并用大括号括住这些项目,例如{1,4,7,5}。公共toString方法必须调用私有递归方法来生成以逗号分隔的项目列表。(但您可以在公共方法中添加大括号。)
我很难确定简单递归方法的大O。我不知道当一个方法被多次调用时会发生什么。我想更具体地谈谈我的困惑领域,但目前我正试图回答一些硬件问题,为了不想作弊,我要求任何回复本文的人提出一个简单的递归方法,并对所述方法的大O进行简单解释。(最好是Java语言……我正在学习的一种语言。) 谢谢你。
我在深度优先搜索算法实现的递归方法方面遇到了一些麻烦。这是二叉树照片: 该方法在树的右侧(55、89、144)工作得很好,但是当它来到左侧时,它返回nil,即使它输入“是”。那么,代码有什么问题呢?节点是Node类的一个实例,它具有值(整数)并链接到左右子级(Node类的其他实例),如果它没有来自该侧的子级,则为nil。 下面是方法代码: