当前位置: 首页 > 知识库问答 >
问题:

Spark,Graphx程序不使用cpu和内存

胡天佑
2023-03-14

我有一个函数,它获取一个节点的邻居,对于邻居,我使用广播变量和节点本身的id,它计算该节点的贴近度中心度。我用该函数的结果映射图的每个节点。当我打开任务管理器时,cpu根本没有被利用,就像它没有并行工作一样,内存也是一样,但是每个节点都是并行执行功能的,而且数据很大,完成起来需要时间,并不是不需要资源。非常感谢大家的帮助,谢谢。对于加载图形,我使用val graph=graphloader.edgelistfile(sc,path).cache

object ClosenessCentrality {

  case class Vertex(id: VertexId)

  def run(graph: Graph[Int, Float],sc: SparkContext): Unit = {
    //Have to reverse edges and make graph undirected because is bipartite
    val neighbors = CollectNeighbors.collectWeightedNeighbors(graph).collectAsMap()
    val bNeighbors = sc.broadcast(neighbors)

    val result = graph.vertices.map(f => shortestPaths(f._1,bNeighbors.value))
    //result.coalesce(1)
    result.count()

  }

  def shortestPaths(source: VertexId,  neighbors: Map[VertexId, Map[VertexId, Float]]): Double ={
    val predecessors = new mutable.HashMap[VertexId, ListBuffer[VertexId]]()
    val distances = new mutable.HashMap[VertexId, Double]()
    val q = new FibonacciHeap[Vertex]
    val nodes = new mutable.HashMap[VertexId, FibonacciHeap.Node[Vertex]]()

    distances.put(source, 0)

    for (w <- neighbors) {
      if (w._1 != source)
        distances.put(w._1, Int.MaxValue)

      predecessors.put(w._1, ListBuffer[VertexId]())
      val node = q.insert(Vertex(w._1), distances(w._1))
      nodes.put(w._1, node)
    }

    while (!q.isEmpty) {
      val u = q.minNode
      val node = u.data.id
      q.removeMin()
      //discover paths
      //println("Current node is:"+node+" "+neighbors(node).size)
      for (w <- neighbors(node).keys) {
        //print("Neighbor is"+w)
        val alt = distances(node) + neighbors(node)(w)
//        if (distances(w) > alt) {
//          distances(w) = alt
//          q.decreaseKey(nodes(w), alt)
//        }
//        if (distances(w) == alt)
//          predecessors(w).+=(node)
         if(alt< distances(w)){
           distances(w) = alt
           predecessors(w).+=(node)
           q.decreaseKey(nodes(w), alt)
         }

      }//For
    }
    val sum = distances.values.sum
    sum
  }

共有1个答案

钱德海
2023-03-14

为了回答您最初的问题,我怀疑您的RDD只有一个分区,因此使用一个内核进行处理。

edgelistfile方法有一个参数来指定所需的最小分区数。此外,还可以使用repartition获得更多分区。

您提到了Coalesce,但这在默认情况下只会减少分区的数量,请参见以下问题:Spark Coalesce More分区

 类似资料:
  • 问题内容: 说“用cpu = 800和内存= 1024运行myApp.jar” 我从事Java编程已经很多年了,问这个问题很尴尬。我什至不知道这是否可能。如果是这样,怎么办? 我只想知道是否可以设置Java程序的最大内存和cpu使用率。我突然想到了这个,因为我最近开始开发移动应用程序。我想知道该应用程序在内存和处理器非常有限的设备上的表现。 我看到带有演示应用程序的物理引擎可以在浏览器上运行,也可

  • 主要内容:1.PageRank,2.Pregel1.PageRank 历史上,PageRank算法作为计算互联网网页重要度的算法被提出。PageRank是定义在网页集合上的一个函数,它对每个网页给出一个正实数,表示网页的重要程度,整体构成一个向量,PageRank值越高,网页就越重要,在互联网搜索的排序中可能就被排在前面。 直观上,一个网页,如果指向该网页的超链接越多,随机跳转到该网页的概率也就越高,该网页的PageRank值就越高,这个网页也

  • 主要内容:1.基本概念,2.GraphX简介,3.创建Graph 对象,4.Graph Api1.基本概念 图(Graph)由顶点(Vertex)和边(Edge)组成 图根据边是否有方向,可以分为有向图和无向图 有环图和无环图: 2.GraphX简介 SparkGraphX是Spark提供的分布式图计算API,通过弹性分布式属性图(Property Graph)统一了图试图和表视图,可以与Spark Streaming、Spark SQL和Spark MLlib无缝衔接。 对graph视图

  • 我是否正确理解了客户端模式的文档? 客户端模式与驱动程序在应用程序主程序中运行的集群模式相反? 在客户端模式下,驱动程序和应用程序主程序是独立的进程,因此+必须小于计算机的内存? 在客户端模式下,驱动程序内存不包括在应用程序主内存设置中吗?

  • 我是火花操作员的新手,我不知道如何在YAML文件中设置资源请求和限制,例如在我的情况下,我有驱动程序pod的内存请求512m,但限制呢,它是无界的? 规格:驱动程序:核心:1核心限制:200m内存:512m标签:版本:2.4.5服务帐户:火花

  • Spark GraphX是一个新的Spark API,它用于图和分布式图(graph-parallel)的计算。GraphX 综合了 Pregel 和 GraphLab 两者的优点,即接口相对简单,又保证性能,可以应对点分割的图存储模式,胜任符合幂律分布的自然图的大型计算。 本专题会详细介绍GraphX的实现原理,并对GraphX的存储结构以及部分操作作详细分析。