当前位置: 首页 > 知识库问答 >
问题:

Spark的过滤操作如何在GraphX边缘上工作?

翁和颂
2023-03-14

我对Spark很陌生,不太了解基础知识,我只是为了解决一个问题而跳入其中。该问题的解决方案包括制作一个边具有字符串属性的图(使用GraphX)。用户可能希望查询这个图,我通过只过滤那些具有字符串属性的边来处理查询,该属性等于用户的查询。

现在,我的图形有超过1600万条边;当我使用计算机的所有8个核心时,创建图形需要10分钟以上。然而,当我查询这个图时(就像我上面提到的那样),我会立即得到结果(令我惊喜)。

所以,我的问题是,过滤器操作到底是如何搜索我所查询的边缘的?它是否迭代地查看它们?是在多个核心上搜索边缘,而且看起来很快吗?还是涉及到某种散列?

下面是如何使用filter:mygraph.edges.filter(_.attr(0).equals(“cat”))的一个示例,这意味着我希望检索具有属性“cat”的边。边缘搜索得怎么样了?

共有1个答案

轩辕实
2023-03-14

运行语句返回速度如此之快,因为它实际上并不执行筛选。Spark使用惰性求值:直到您执行一个实际收集结果的操作,它才真正执行转换。调用转换方法(如filter)只需创建一个表示此转换及其结果的新RDD。您必须执行诸如collectcount之类的操作才能实际执行:

scala prettyprint-override">def myGraph: Graph = ???

// No filtering actually happens yet here, the results aren't needed yet so Spark is lazy and doesn't do anything
val filteredEdges = myGraph.edges.filter()

// Counting how many edges are left requires the results to actually be instantiated, so this fires off the actual filtering
println(filteredEdges.count)

// Actually gathering all results also requires the filtering to be done
val collectedFilteredEdges = filteredEdges.collect

请注意,在这些示例中,筛选结果不存储在两者之间:由于惰性,对两个操作都重复筛选。为了防止这种重复,在阅读了转换和操作的详细信息以及Spark在幕后的实际操作之后,您应该研究Spark的缓存功能:https://Spark.apache.org/docs/latest/programming-guide.html#rdd-operations。

在Spark GraphX中,边存储在edgerdd[ED]类型的RDD中,其中ED是边缘属性的类型,在您的例子中是string。这个特殊的RDD在后台进行了一些特殊的优化,但出于您的目的,它的行为类似于它的超类RDD[edge[ED]],筛选类似于筛选任何RDD:它将遍历所有项,并对每个项应用给定的谓词。然而,一个RDD被拆分为许多分区,Spark将并行过滤多个分区;在您的示例中,如果您似乎在本地运行Spark,它将并行执行与您拥有的内核数量一样多的操作,或者与您用-例如master local[4]显式指定的内核数量一样多的操作。

带边的RDD根据设置的PartitionStrategy进行分区,例如,如果您使用graph.FromedGetUples创建图形,或者通过调用图形的PartitionBy来创建图形。然而,所有策略都基于边缘的顶点,因此不需要了解您的属性,因此不会影响您的过滤操作,除非在集群上运行它,否则可能会出现一些不平衡的网络负载,所有“cat”边缘最终会出现在同一个分区/执行器中,然后执行collect或一些shuffle操作。有关如何表示和划分图的更多信息,请参见关于顶点和边RDDs的GraphX文档。

 类似资料:
  • 我试图使用Gremlin从一个起始节点向外遍历到连接X度内的所有连接节点。连接的方向无关紧要,所以我使用了函数。我还希望能够防止遍历与特定标签相交。这是一个示例图。 到目前为止,我进行的遍历如下所示: 然而,这并不是我所寻找的。我想要一些实际上可以防止遍历者在必须跨越指定边缘时触及顶点的东西。我当前的实现过滤具有传入边缘的顶点,但在某些情况下,如果遍历者跨越不同的边缘到达那里,我可能仍然希望该顶点

  • 这是怎么工作https://graph.microsoft.com/v1.0/me/onenote/notebooks?filter=tolower(名)eq'我的笔记本' 来自图形资源管理器的屏幕截图 当我尝试在C#中编写相同的代码时,我得到了以下错误代码:BadRequest消息:无效筛选子句内部错误:AdditionalData:日期:2020-09-02T20:01:24 我正在使用下面的

  • 我想知道是否有人能帮我,我遇到了一个问题,在spark中为graphx编写的函数,如果我有没有边的顶点,它总是给出错误消息。

  • TakeLast 发射Observable发射的最后N项数据 使用TakeLast操作符修改原始Observable,你可以只发射Observable’发射的后N项数据,忽略前面的数据。 taskLast.n 使用takeLast操作符,你可以只发射原始Observable发射的后N项数据,忽略之前的数据。注意:这会延迟原始Observable发射的任何数据项,直到它全部完成。 takeLast的

  • Take 只发射前面的N项数据 使用Take操作符让你可以修改Observable的行为,只返回前面的N项数据,然后发射完成通知,忽略剩余的数据。 RxJava将这个操作符实现为take函数。 如果你对一个Observable使用take(n)(或它的同义词limit(n))操作符,而那个Observable发射的数据少于N项,那么take操作生成的Observable不会抛异常或发射onErro

  • SkipLast 抑制Observable发射的后N项数据 使用SkipLast操作符修改原始Observable,你可以忽略Observable’发射的后N项数据,只保留前面的数据。 使用SkipLast操作符,你可以忽略原始Observable发射的后N项数据,只保留之前的数据。注意:这个机制是这样实现的:延迟原始Observable发射的任何数据项,直到它发射了N项数据。 skipLast的