当前位置: 首页 > 知识库问答 >
问题:

用spark和graphx中的mapReduceTriplets对图形数据应用函数

殳俊
2023-03-14

我一直在遵循教程,并读取了我自己的数据,这些数据被组合成[array[String],int],因此,例如,我的顶点是:

org.apache.spark.graphx.vertexrdd[Array[String]]例如(3999,Array(17,Low,9))

我的边缘是:

因此,您最终会得到一个匹配或不匹配数量的计数列表。

我遇到的问题是使用mapReduceTriplets应用任何函数,我对scala很陌生,所以这可能非常明显,但在graphx教程中有一个示例,它使用了一个格式为graph[Double,Int]的图,而我的图的格式是graph[array[String],Int],所以作为第一步,我只是试图弄清楚如何在示例中使用我的图,然后从那里开始工作。

graphx网站上的示例如下:

    val olderFollowers: VertexRDD[(Int, Double)] = graph.mapReduceTriplets[(Int, Double)](
  triplet => { // Map Function
    if (triplet.srcAttr > triplet.dstAttr) {
      // Send message to destination vertex containing counter and age
      Iterator((triplet.dstId, (1, triplet.srcAttr)))
    } else {
      // Don't send a message for this triplet
      Iterator.empty
    }
  },
  // Add counter and age
  (a, b) => (a._1 + b._1, a._2 + b._2) // Reduce Function
)
val nodes = (sc.textFile("C~nodeData.csv")
.map(line => line.split(",")).map( parts => (parts.head.toLong, parts.tail) ))

val edges = GraphLoader.edgeListFile(sc, "C:~edges.txt")


val graph = edges.outerJoinVertices(nodes) {
case (uid, deg, Some(attrList)) => attrList
case (uid, deg, None) => Array.empty[String]
}


val countsRdd = graph.collectNeighbors(EdgeDirection.Either).leftOuterJoin(graph.vertices).map {
  case (id, t) => {
    val neighbors: Array[(VertexId, Array[String])] = t._1
    val nodeAttr = (t._2)
    neighbors.map(_._2).count( x => x.apply(x.size - 1) == nodeAttr(0))

  }
}

共有1个答案

夔宏深
2023-03-14

我认为您应该使用grapops.collectneights而不是MapReduceTripletsAggregateMessages

CollectNeighbors将为您提供一个RDD,对于图中的每个顶点,将连接的节点作为一个数组。只需根据您的需要减少数组。类似于:

val countsRdd = graph.collectNeighbors(EdgeDirection.Either)
  .join(graph.vertices)
  .map{ case (vid,t) => {
    val neighbors = t._1
    val nodeAttr = t._2
    neighbors.map(_._2).filter( <add logic here> ).size
  }

如果这不能让你朝着正确的方向前进,或者你被卡住了,让我知道(例如“”部分)。

 类似资料:
  • 我是熊猫数据框的新手,我想应用一个函数,在同一列中取几行。就像当你应用函数diff(),但我想计算文本之间的距离。所以我定义了一个测量距离的函数,我试图使用应用,但我不知道如何选择几行。下面我展示了一个我尝试过的例子和我所期望的: 但它不起作用。我想得到的是: 提前感谢您为我提供的任何帮助。

  • 希望有人能帮忙。 我试图编写一个程序,它需要在Graphx上对连接到网络中每个节点的每个边缘ID执行一个函数。 但是,只有在添加collect函数从rdd中收集图形数据时,它才会起作用。 网络太大,无法收集边缘数据,因此任何帮助都将非常感谢。

  • 我希望有人能对以下问题提出一些建议,我最近在类似的问题上得到了很大的帮助,并想进一步讨论它。 我目前有一个使用graphx构建的网络,如下所示(只有更多的顶点和边) 1002,1,0 1003,2,1 1004、3、2 1003、1004、7 1004、1005、3 1002、1006、5 null

  • 主要内容:1.PageRank,2.Pregel1.PageRank 历史上,PageRank算法作为计算互联网网页重要度的算法被提出。PageRank是定义在网页集合上的一个函数,它对每个网页给出一个正实数,表示网页的重要程度,整体构成一个向量,PageRank值越高,网页就越重要,在互联网搜索的排序中可能就被排在前面。 直观上,一个网页,如果指向该网页的超链接越多,随机跳转到该网页的概率也就越高,该网页的PageRank值就越高,这个网页也

  • 主要内容:1.基本概念,2.GraphX简介,3.创建Graph 对象,4.Graph Api1.基本概念 图(Graph)由顶点(Vertex)和边(Edge)组成 图根据边是否有方向,可以分为有向图和无向图 有环图和无环图: 2.GraphX简介 SparkGraphX是Spark提供的分布式图计算API,通过弹性分布式属性图(Property Graph)统一了图试图和表视图,可以与Spark Streaming、Spark SQL和Spark MLlib无缝衔接。 对graph视图