val graph=GraphLoader.edgeListFile(sc,"/home/spark/spark/graphx/data/followers.txt")//加载边时顶点是边上出现的点,定点默认数据是1,注意文件格式:1 2,中间是空格graphx只会读取两列分别作为源顶点和目标顶点,如:1 2 other,第三列的other直接被忽略
val users = sc.textFile("/home/spark/spark/graphx/data/users.txt").map { line=>
val fields = line.split(",")
(fields(0).toLong,(fields(1),fields(2)))//解析顶点数据:ID(一定转成Long型),fisrt name,full name
}
val myGraph=Graph.apply(users,graph.edges)//由于graph默认将顶点数据设为1,将顶点数据users和边数据graph.edges重构为新图,如果边edges中的顶点A在顶点集合users中没有,则该顶点A将会以默认值初始化,可以添加默认值
如:val defaultUsers=("first name","full name")
val myGraph=Graph.apply(users,graph.edges,defaultUsers)
graph.vertices.filter{case(id,(firstName,fullName)=>firstName==”BarackObama”}//针对每个顶点进行filter操作,id是顶点ID,(firstName,fullName)是顶点数据
graph.edges.filter{case(src,dst,prop)=>prop==1}//边数据的过滤,src表示源顶点ID,dst表示目标顶点ID,prop表示边上数据
graph.triplet.map(tri=>tri.srcId+” “+tri.srcAttr+” “+tri.dstId+” “+tri.dstAttr+” “+tri.attr+”\n”) //每个triplet进行map操作,srcId,srcAttr,dstId,dstAttr,attr分别表示源顶点标号ID、源顶点数据、目标顶点ID、目标顶点数据、边数据
graph.vertices.mapValues[Int]((id:VertexId,attr:(String,String))=>10)//针对VertexRDD操作
graph.vertices.saveAsTextFile(“pathToFile”) //顶点数据存入图文件,参数是一个目录,类似graph.edges.saveAsTextFile
graph.mapVertices[VD2])((id:VertexId,attr:VD)=>VD2) //VD2要和图的VD匹配(VD是图的顶点数据不包括ID,ED是边的数据不包括srcID和dstID)
如:myGraph.mapVertices[(String,String)]((id,(fistName,fullName))=>(firstName.toUpperCase,fullName))//将顶点数据的firstName改为大写
graph.mapEdges(e=>e.attr+10) //针对每条边进行map操作并返回边集合,e的类型类似于(src,dst,prop)包含了源顶点ID,目标顶点ID,边数据
graph.mapTriplets(triplet=>triplet.attr+10) //针对图中每个triplet进行map操作
graph.reverse //所有边反向
graph.mask[VD2,ED2](other:Graph[VD2,ED2]):Graph[Vd,ED]//返回graph和other的交集
graph.subgraph(edge=>true,(id,prop)=>prop==1)//返回边满足条件(这里是true全部边都满足)及顶点数据为1的子图
graph.joinVertices(other:RDD[(VertexId,U))(map:(VertexId,VD,U)=>VD):Graph[VD,ED] //graph和other相交的顶点执行map函数,在graph中但是不在other中的地点保持不变
graph.outerJoinVertices[U,VD2](other:RDD[(VertexId,U))(map:(VertexId,VD,option)=>VD2):Graph[VD2,ED] //和joinVertices类似,但不同的是在graph中但不在other中的顶点也要执行map函数
graph.mapReduceTriplets[A](map:EdgeTriplet[VD,ED]=>Iterator[(VertexId,A)],reduce:(A,A)=>A):VertexRDD[A] //针对每个triplet执行map函数(发送消息,暂时只能单向传递消息,即所有的triplet执行map时要么都是想源顶点发送消息,要么都向目标顶点发送消息),并由reduce收集发送给顶点的消息
graph.inDegrees //返回图的入度,类型为VertexRDD[Int]
graph.outDegrees //返回出度,(VertexID,Int)
graph.Degrees //返回顶点的度
graph.collectNeighbors(edgeDirection:EdgeDirection):VertexRDD[Array[(VertexId,VD)]] //收集每个顶点的邻居顶点数据,返回的是一个数组,数组元素是邻居顶点ID和其顶点数据
EdgeDirection.Out //出边方向
EdgeDirection.In //入边方向
EdgeDirection.Either //出边或入边方向
EdgeDirection.Both //出边和入边方向
graph.pregel[A]
(
initialMsg:A,//初始消息
maxIter:Int=Int.MaxValue,//最大迭代次数
activeDir:EdgeDirection=EdgeDirection.Out
)//消息传递方向
(
vprog:(VertexId,VD,A)=>VD,//顶点程序
sendMsg:EdgeTriplet[VD,ED]=>Iterator[(VertexId,A)],//发送消息
mergeMsg:(A,A)=>A
):Graph[VD,ED]//汇集消息
graph.pageRank(0.0001) //计算PageRank值,针对非联通图也可以
前面我所引用的数据格式如下:
users.txt: 第一列为ID,第二列为fistName,第二列为fullName
1,BarackObama,Barack Obama
2,ladygaga,Goddess of Love
3,jeresig,John Resig
4,justinbieber,Justin Bieber
6,matei_zaharia,Matei Zaharia
7,odersky,Martin Odersky
8,anonsys,xxoo
followers.txt如下:
2 1
4 1
1 2
6 3
7 3
7 6
6 7
3 7