我正在尝试使用 Pyspark 和 graphframes 的 pregel 包装器来实现 Rocha & Thatte 的算法(http://cdsid.org.br/sbpo2015/wp-content/uploads/2015/08/142825.pdf )。在这里,我遇到了消息聚合的正确语法问题。
这个想法是直截了当的:
...在每次传递中,G 的每个活动顶点都会向其外围邻居发送一组顶点序列,如下所述。在第一遍中,每个顶点 v 向其所有邻居发送消息 (v)。在后续迭代中,每个活动顶点 v 将 v 附加到它在上一次迭代中接收到的每个序列。然后它将所有更新的序列发送到其外围邻居。如果 v 在上一次迭代中没有收到任何消息,则 v 将自行停用。当所有顶点都已停用时,算法终止。...
我的想法是将顶点 id 发送到目标顶点(dst),并在聚合函数中将它们收集到一个列表中。然后,在我的顶点列“序列”中,我想将这个新列表项与现有列表项追加/合并,然后使用 when 语句检查当前顶点 id 是否已在序列中。然后我可以根据顶点列将顶点设置为 true 以将它们标记为循环。但我在 Spark 中找不到关于如何连接它的正确语法。有人有想法吗?或者实施类似的东西?
我当前的代码
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
import pyspark.sql.functions as f
from pyspark.sql.functions import coalesce, col, lit, sum, when
from graphframes import GraphFrame
from graphframes.lib import *
SimpleCycle=[
("1","2"),
("2","3"),
("3","4"),
("4","5"),
("5","2"),
("5","6") …Run Code Online (Sandbox Code Playgroud)