我知道在GraphX中我们可以合并两个图形以便例如更新现有网络......但是,作为更新网络的通常操作是将单个节点插入其中.怎么可以在GraphX中进行这样的更新操作?谢谢 !
我在Spark GraphX(Scala)中有一个有向图G. 我想找到应该从已知顶点v1
开始到达另一个顶点的边数v2
.换句话说,我需要从顶点v1
到顶点的最短路径以v2
边数计算(不使用边的权重).
我正在查看GraphX文档,但我无法找到方法来执行此操作.如果图形具有树结构,则还需要这样来计算图形的深度.他们是一个简单的方法吗?
我正在运行教程 http://ampcamp.berkeley.edu/big-data-mini-course/graph-analytics-with-graphx.html
在某些时候我们会使用mapReduceTriplets 操作。这将返回预期结果
// Find the oldest follower for each user
val oldestFollower: VertexRDD[(String, Int)] = userGraph.mapReduceTriplets[(String, Int)](
// For each edge send a message to the destination vertex with the attribute of the source vertex
edge => Iterator((edge.dstId, (edge.srcAttr.name, edge.srcAttr.age))),
// To combine messages take the message for the older follower
(a, b) => if (a._2 > b._2) a else b
)
Run Code Online (Sandbox Code Playgroud)
但 IntelliJ 指出 mapReduceTriplets 已被弃用(从 1.2.0 开始),应该由aggregateMessages 替换
// Find the oldest follower for each user …
Run Code Online (Sandbox Code Playgroud) Spark 的 Graphx 中的 mapTriplet 操作可以将三元组转换为定义描述的其他形式:
def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]
Run Code Online (Sandbox Code Playgroud)
我的数据是稀疏二部图,边的顶点数据会在每次迭代中更新。例如,这里是一个边缘(srcAttr, dstAttr, attr)
,的顶点srcAttr
和dstAttr
将根据被修改attr
。因此,我需要的是获取所有(srcAttr, dstAttr, attr)
组合,并用于attr
更新vertices
.
Graphx 提供了mapTriplets
可以转换所有(srcAttr, dstAttr, attr) 组合的方法,但是在执行这个方法时我不知道如何修改顶点。
那么,有没有什么策略可以在遍历所有边时修改顶点呢?
可以在GraphX顶点中存储多少属性(属性键:值对)?
val vertexArray = Array(
(1L, ("Name", "Alice"), ("age", 28), ("major", "ECE")),
(2L, ("Name", "John"), ("age", 23), ("major", "History")),
(3L, ("Name", "Mark"), ("age", 34), ("major", "Education"))
)
val edgeArray = Array(
Edge(1L, 3L, "cousin"),
Edge(1L, 2L, "spouse")
)
val vertexRDD = sc.parallelize(vertexArray)
val edgeRDD = sc.parallelize(edgeArray)
val graph = Graph(vertexRDD, edgeRDD)
Run Code Online (Sandbox Code Playgroud)
上面的代码在创建图形时给出了错误.
Error:(28, 21) type mismatch;
found : org.apache.spark.rdd.RDD[(Long, (String, String), (String, Int), (String, String))]
required: org.apache.spark.rdd.RDD[(org.apache.spark.graphx.VertexId, ?)]
(which expands to) org.apache.spark.rdd.RDD[(Long, ?)]
Error occurred in an application involving default …
Run Code Online (Sandbox Code Playgroud)