Ale*_*ner 3 graph-theory spark-graphx pyspark graphframes pregel
我正在尝试使用 Pyspark 和 graphframes 的 pregel 包装器来实现 Rocha & Thatte 的算法(http://cdsid.org.br/sbpo2015/wp-content/uploads/2015/08/142825.pdf )。在这里,我遇到了消息聚合的正确语法问题。
这个想法是直截了当的:
...在每次传递中,G 的每个活动顶点都会向其外围邻居发送一组顶点序列,如下所述。在第一遍中,每个顶点 v 向其所有邻居发送消息 (v)。在后续迭代中,每个活动顶点 v 将 v 附加到它在上一次迭代中接收到的每个序列。然后它将所有更新的序列发送到其外围邻居。如果 v 在上一次迭代中没有收到任何消息,则 v 将自行停用。当所有顶点都已停用时,算法终止。...
我的想法是将顶点 id 发送到目标顶点(dst),并在聚合函数中将它们收集到一个列表中。然后,在我的顶点列“序列”中,我想将这个新列表项与现有列表项追加/合并,然后使用 when 语句检查当前顶点 id 是否已在序列中。然后我可以根据顶点列将顶点设置为 true 以将它们标记为循环。但我在 Spark 中找不到关于如何连接它的正确语法。有人有想法吗?或者实施类似的东西?
我当前的代码
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
import pyspark.sql.functions as f
from pyspark.sql.functions import coalesce, col, lit, sum, when
from graphframes import GraphFrame
from graphframes.lib import *
SimpleCycle=[
("1","2"),
("2","3"),
("3","4"),
("4","5"),
("5","2"),
("5","6")
]
edges = sqlContext.createDataFrame(SimpleCycle,["src","dst"]) \
.withColumn("self_loop",when(col("src")==col("dst"),True).otherwise(False))
edges.show()
+---+---+---------+
|src|dst|self_loop|
+---+---+---------+
| 1| 2| false|
| 2| 3| false|
| 3| 4| false|
| 4| 5| false|
| 5| 2| false|
| 5| 6| false|
+---+---+---------+
vertices=edges.select("src").union(edges.select("dst")).distinct().distinct().withColumnRenamed('src', 'id')
#vertices = spark.createDataFrame([[1], [2], [3], [4],[5],[6],[7],[8],[9]], ["id"])
#vertices.sort("id").show()
graph = GraphFrame(vertices, edges)
cycles=graph.pregel \
.setMaxIter(5) \
.withVertexColumn("is_cycle", lit(""),lit("logic to be added")) \
.withVertexColumn("sequence", lit(""),Pregel.msg()) \
.sendMsgToDst(Pregel.src("id")) \
.aggMsgs(f.collect_list(Pregel.msg())) \
.run()
cycles.show()
+---+-----------------+--------+
| id| is_cycle|sequence|
+---+-----------------+--------+
| 3|logic to be added| [2]|
| 5|logic to be added| [4]|
| 6|logic to be added| [5]|
| 1|logic to be added| null|
| 4|logic to be added| [3]|
| 2|logic to be added| [5, 1]|
+---+-----------------+--------+
Run Code Online (Sandbox Code Playgroud)
代码不起作用,但我认为逻辑应该是这样
cycles=graph.pregel \
.setMaxIter(5) \
.withVertexColumn("is_cycle", lit(""), \
when(Pregel.src("id").isin(Pregel.src(sequence)),True).otherwise(False) \
.withVertexColumn("sequence", lit("null"),Append_To_Existing_List(Pregel.msg()) \
.sendMsgToDst(
when(Pregel.src("sequence").isNull(),Pregel.src("id")) \
.otherwise(Pregel.src("sequence")) \
.aggMsgs(f.collect_list(Pregel.msg())) \
.run()
# I would like to have a result like
+---+-----------------+---------+
| id| is_cycle|sequence |
+---+-----------------+---------+
| 1|false | [1] |
| 2|true |[2,3,4,5]|
| 3|true |[2,3,4,5]|
| 4|true |[2,3,4,5]|
| 5|true |[2,3,4,5]|
| 6|false | null |
+---+-----------------+---------+
Run Code Online (Sandbox Code Playgroud)
最后,我不是通过 pregel 而是使用 graphframe/graphX 的底层消息聚合函数实现了 Rocha-Thatte 算法。如果有人感兴趣,我想分享解决方案
\n该解决方案工作正常,并且可以处理非常大的图形而不会失败\n但是,如果循环长度或图形很长,它会变得非常慢。\n现在不知道如何改进它。\n可能以智能方式使用检查点或广播
\n对任何改进意见感到高兴
\n# spark modules\nfrom pyspark import SparkContext, SparkConf\nfrom pyspark.sql import SparkSession\nfrom pyspark.sql import SQLContext\nfrom pyspark.sql.types import *\nfrom pyspark.sql import Row\nfrom pyspark.sql.window import Window\nimport pyspark.sql.functions as f\n\n# graphframes modules\nfrom graphframes import GraphFrame\nfrom graphframes.lib import *\nAM=AggregateMessages\n\n\ndef find_cycles(sqlContext,sc,vertices,edges,max_iter=100000):\n\n # Cycle detection via message aggregation\n """\n This code is an implementation of the Rocha-Thatte algorithm for large-scale sparce graphs\n\n Source:\n ==============\n wiki: https://en.wikipedia.org/wiki/Rocha%E2%80%93Thatte_cycle_detection_algorithm\n paper: https://www.researchgate.net/publication/283642998_Distributed_cycle_detection_in_large-scale_sparse_graphs\n\n The basic idea:\n ===============\n We propose a general algorithm for detecting cycles in a directed graph G by message passing among its vertices, \n based on the bulk synchronous message passing abstraction. This is a vertex-centric approach in which the vertices \n of the graph work together for detecting cycles. The bulk synchronous parallel model consists of a sequence of iterations, \n in each of which a vertex can receive messages sent by other vertices in the previous iteration, and send messages to other \n vertices.\n In each pass, each active vertex of G sends a set of sequences of vertices to its out- neighbours as described next. \n In the first pass, each vertex v sends the message (v) to all its out- neighbours. In subsequent iterations, each active vertex v \n appends v to each sequence it received in the previous iteration. It then sends all the updated sequences to its out-neighbours. \n If v has not received any message in the previous iteration, then v deactivates itself. The algorithm terminates when all the \n vertices have been deactivated.\n For a sequence (v1, v2, . . . , vk) received by vertex v, the appended sequence is not for- warded in two cases: (i) if v = v1, \n then v has detected a cycle, which is reported (see line 9 of Algorithm 1); (ii) if v = vi for some i \xe2\x88\x88 {2, 3, . . . , k}, \n then v has detected a sequence that contains the cycle (v = vi, vi+1, . . . , vk, vk+1 = v); in this case, \n the sequence is discarded, since the cycle must have been detected in an earlier iteration (see line 11 of Algorithm 1); \n to be precise, this cycle must have been detected in iteration k \xe2\x88\x92 i + 1. Every cycle (v1, v2, . . . , vk, vk+1 = v1) \n is detected by all vi,i = 1 to k in the same iteration; it is reported by the vertex min{v1,...,vk} (see line 9 of Algorithm 1).\n The total number of iterations of the algorithm is the number of vertices in the longest path in the graph, plus a few more steps \n for deactivating the final vertices. During the analysis of the total number of iterations, we ignore the few extra iterations \n needed for deactivating the final vertices and detecting the end of the computation, since it is O(1).\n \n Pseudocode of the algorithm:\n ============================\n M(v): Message received from vertex v\n N+(v): all dst verties from v\n\n functionCOMPUTE(M(v)):\n if i=0 then:\n for each w \xe2\x88\x88 N+(v) do:\n send (v) to w \n else if M(v) = \xe2\x88\x85 then:\n deactivate v and halt \n else:\n for each (v1,v2,...,vk) \xe2\x88\x88 M(v) do:\n if v1 = v and min{v1,v2,...,vk} = v then:\n report (v1 = v,v2,...,vk,vk+1 = v)\n else if v not \xe2\x88\x88 {v2,...,vk} then:\n for each w \xe2\x88\x88 N+(v) do:\n send (v1,v2,...,vk,v) to w\n\n \n Scalablitiy of the algorithm:\n ============================\n the number of iteration depends on the path of the longest cycle\n the scaling it between \n O(log(n)) up to maxium O(n) where n=number of vertices\n so the number of iterations is less to max linear to the number of vertices, \n if there are more edges (parallel etc.) it will not affect the the runtime\n\n\n for more details please refer to the oringinal publication\n """\n\n\n _logger.warning("+++ find_cycles(): starting cycle search ...")\n \n # create emtpy dataframe to collect all cycles\n cycles = sqlContext.createDataFrame(sc.emptyRDD(),StructType([StructField("cycle",ArrayType(StringType()),True)]))\n\n # initialize the messege column with own source id \n init_vertices=(vertices\n .withColumn("message",f.array(f.col("id")))\n )\n \n init_edges=(edges\n .where(f.col("src")!=f.col("dst"))\n .select("src","dst")\n )\n \n # create graph object that will be update each iteration\n gx = GraphFrame(init_vertices, init_edges)\n\n # iterate until max_iter \n # max iter is used in case that the3 break condition is never reached during this time\n # defaul value=100.000\n for iter_ in range(max_iter):\n \n # message that should be send to destination for aggregation\n msgToDst = AM.src["message"]\n # aggregate all messages that where received into a python set (drops duplicate edges)\n agg = gx.aggregateMessages(\n f.collect_set(AM.msg).alias("aggMess"),\n sendToSrc=None,\n sendToDst=msgToDst)\n \n # BREAK condition: if no more messages are received all cycles where found \n # and we can quit the loop \n if(len(agg.take(1))==0):\n #print("THE END: All cycles found in " + str(iter_) + " iterations")\n break\n \n # apply the alorithm logic \n # filter for cycles that should be reported as found\n # compose new message to be send for next iteration\n # _column name stands for temporary columns that are only used in the algo and then dropped again\n checkVerties=(\n agg\n # flatten the aggregated message from [[2]] to [] in order to have proper 1D arrays\n .withColumn("_flatten1",f.explode(f.col("aggMess")))\n # take first element of the array\n .withColumn("_first_element_agg",f.element_at(f.col("_flatten1"), 1))\n # take minimum element of th array\n .withColumn("_min_agg",f.array_min(f.col("_flatten1")))\n # check if it is a cycle \n # it is cycle when v1 = v and min{v1,v2,...,vk} = v\n .withColumn("_is_cycle",f.when(\n (f.col("id")==f.col("_first_element_agg")) &\n (f.col("id")==f.col("_min_agg"))\n ,True)\n .otherwise(False)\n )\n # pick cycle that should be reported=append to cylce list\n .withColumn("_cycle_to_report",f.when(f.col("_is_cycle")==True,f.col("_flatten1")).otherwise(None))\n # sort array to have duplicates the same\n .withColumn("_cycle_to_report",f.sort_array("_cycle_to_report"))\n # create column where first array is removed to check if the current vertices is part of v=(v2,...vk)\n .withColumn("_slice",f.array_except(f.col("_flatten1"), f.array(f.element_at(f.col("_flatten1"), 1)))) \n # check if vertices is part of the slice and set True/False column\n .withColumn("_is_cycle2",f.lit(f.size(f.array_except(f.array(f.col("id")), f.col("_slice"))) == 0))\n )\n \n #print("checked Vertices")\n #checkVerties.show(truncate=False)\n # append found cycles to result dataframe via union\n cycles=(\n # take existing cycles dataframe\n cycles\n .union(\n # union=append all cyles that are in the current reporting column\n checkVerties\n .where(f.col("_cycle_to_report").isNotNull())\n .select("_cycle_to_report")\n )\n )\n\n # create list of new messages that will be send in the next iteration to the vertices\n newVertices=(\n checkVerties\n # append current vertex id on position 1\n .withColumn("message",f.concat(\n f.coalesce(f.col("_flatten1"), f.array()),\n f.coalesce(f.array(f.col("id")), f.array())\n ))\n # only send where it is no cycle duplicate\n .where(f.col("_is_cycle2")==False)\n .select("id","message")\n )\n\n print("vertics to send forward")\n newVertices.sort("id").show(truncate=False)\n \n # cache new vertices using workaround for SPARK-1334\n cachedNewVertices = AM.getCachedDataFrame(newVertices)\n\n # update graphframe object for next round\n gx = GraphFrame(cachedNewVertices, gx.edges)\n\n\n \n # materialize results and get number of found cycles\n #cycles_count=cycles.persist().count()\n\n _cycle_statistics=(\n cycles\n .withColumn("cycle_length",f.size(f.col("cycle")))\n .agg(f.count(f.col("cycle")),f.max(f.col("cycle_length")),f.min(f.col("cycle_length")))\n ).collect()\n\n cycle_statistics={"count":_cycle_statistics[0]["count(cycle)"],"max":_cycle_statistics[0]["max(cycle_length)"],"min":_cycle_statistics[0]["min(cycle_length)"]}\n \n end_time =time.time()\n _logger.warning("+++ find_cycles(): " + str(cycle_statistics["count"]) + " cycles found in " + str(iter_) + " iterations (min length=" + str(cycle_statistics["min"]) +", max length="+ str(cycle_statistics["max"]) +") in " + str(end_time-start_time) + " seconds")\n _logger.warning("+++ #########################################################################################")\n\n\n return cycles, cycle_statistics\nRun Code Online (Sandbox Code Playgroud)\n这个函数需要一个像这样的图表
\n简单循环:
\n
嵌套循环:
\n
SimpleCycle=[\n ("0","1"),\n ("1","2"),\n ("2","3"),\n ("3","4"),\n ("3","1")]\n\nNestedCycle=[\n ("1","2"),\n ("2","3"),\n ("3","4"),\n ("4","1"),\n ("3","1"),\n ("5","1"),\n ("5","2")]\n\nedges = sqlContext.createDataFrame(SimpleCycle,["src","dst"])\n\nvertices=edges.select("src").union(edges.select("dst")).distinct().distinct().withColumnRenamed(\'src\', \'id\') \n\nedges.show()\n# +---+---+\n# |src|dst|\n# +---+---+\n# | 1| 2|\n# | 2| 3|\n# | 3| 4|\n# | 4| 1|\n# | 3| 1|\n# | 5| 1|\n# | 5| 2|\n# +---+---+\n\n\nraw_cycles=find_cycles(sqlContext,sc,vertices,edges,max_iter=1000)\n\nraw_cycles.show()\n# +------------+\n# | cycle|\n# +------------+\n# | [1, 2, 3]|\n# |[1, 2, 3, 4]|\n#+------------+\n\n\nRun Code Online (Sandbox Code Playgroud)\n
| 归档时间: |
|
| 查看次数: |
2932 次 |
| 最近记录: |