Phi*_*kie 7 scala graph-theory apache-spark spark-graphx spark-shell
当我尝试使用Scala在Graphx中实现一个算法时,我没有发现可以激活下一个解决方案中的所有顶点.如何向所有图形顶点发送消息?在我的算法中,有一些超级步骤应该由所有顶点执行(无论它们是否接收到消息,因为即使没有接收消息也是应该在下一次迭代中处理的事件).
我在这里给出了在pregel的逻辑中实现的SSSP算法的官方代码,你可以看到只有接收消息的顶点将在下一次迭代中执行它们的程序但是对于我的情况,我希望pregel函数迭代运行,即每个超级步骤顶点执行他们的程序,他们可以投票停止,如果需要!这个例子中的推理看起来并不像Pregel的纸质逻辑.请问有关如何实现Pregel真实逻辑的任何想法?
val graph: Graph[Long, Double] =
GraphGenerators.logNormalGraph(sc, numVertices = 100).mapEdges(e => e.attr.toDouble)
val sourceId: VertexId = 42 // The ultimate source
// Initialize the graph such that all vertices except the root have distance infinity.
val initialGraph = graph.mapVertices((id, _) =>
if (id == sourceId) 0.0 else Double.PositiveInfinity)
val sssp = initialGraph.pregel(Double.PositiveInfinity)(
(id, dist, newDist) => math.min(dist, newDist), // Vertex Program
triplet => { // Send Message
if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {
Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))
} else {
Iterator.empty
}
},
(a, b) => math.min(a, b) // Merge Message
)
println(sssp.vertices.collect.mkString("\n"))
Run Code Online (Sandbox Code Playgroud)
}
在阅读了 @Mahmoud Hanafy 和 @Shaido 的两个回复并确认 GraphX 中无法激活顶点或投票停止后,我尝试在算法本身中实现此逻辑。所以,这就是我所做的:
init message在第一个超级步骤中向所有图顶点发送 一个 ,它们可以在变为非活动状态之前至少执行一次例程。v可以向其邻居发送消息并等待接收来自其他顶点的消息。我再说一遍,这是我解决问题的唯一方法,但我不鼓励您使用它。