Spark-GraphX-缩放连接的组件

Shi*_*mar 6 apache-spark connected-components spark-graphx

我正在尝试使用连接的组件,但缩放时遇到问题。我的这里就是我所拥有的-

// get vertices
val vertices = stage_2.flatMap(x => GraphUtil.getVertices(x)).cache

// get edges
val edges = stage_2.map(x => GraphUtil.getEdges(x)).filter(_ != null).flatMap(x => x).cache

// create graph  
val identityGraph = Graph(vertices, edges)

// get connected components
val cc = identityGraph.connectedComponents.vertices
Run Code Online (Sandbox Code Playgroud)

在哪里,GraphUtil具有帮助程序功能以返回顶点和边。在这一点上,我的图有大约100万个节点和200万个边(顺便说一句,预计将增长到1亿个节点)。我的图非常稀疏,因此我希望有很多小图。

当我执行上述操作时,我会不断得到java.lang.OutOfMemoryError: Java heap space。我已经尝试executor-memory 32g并运行了15个节点的集群,其纱线容器大小为45g。

这是异常详细信息:

16/10/26 10:32:26 ERROR util.Utils: uncaught error in thread SparkListenerBus, stopping SparkContext
java.lang.OutOfMemoryError: Java heap space
    at java.util.Arrays.copyOfRange(Arrays.java:2694)
    at java.lang.String.<init>(String.java:203)
    at java.lang.StringBuilder.toString(StringBuilder.java:405)
    at com.fasterxml.jackson.core.util.TextBuffer.contentsAsString(TextBuffer.java:360)
    at com.fasterxml.jackson.core.io.SegmentedStringWriter.getAndClear(SegmentedStringWriter.java:98)
    at com.fasterxml.jackson.databind.ObjectMapper.writeValueAsString(ObjectMapper.java:2216)
    at org.json4s.jackson.JsonMethods$class.compact(JsonMethods.scala:32)
    at org.json4s.jackson.JsonMethods$.compact(JsonMethods.scala:44)
    at org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$1.apply(EventLoggingListener.scala:146)
    at org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$1.apply(EventLoggingListener.scala:146)
    at scala.Option.foreach(Option.scala:236)
    at org.apache.spark.scheduler.EventLoggingListener.logEvent(EventLoggingListener.scala:146)
    at org.apache.spark.scheduler.EventLoggingListener.onJobStart(EventLoggingListener.scala:173)
    at org.apache.spark.scheduler.SparkListenerBus$class.onPostEvent(SparkListenerBus.scala:34)
    at org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveListenerBus.scala:31)
    at org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveListenerBus.scala:31)
    at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:55)
    at org.apache.spark.util.AsynchronousListenerBus.postToAll(AsynchronousListenerBus.scala:37)
    at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(AsynchronousListenerBus.scala:80)
    at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(AsynchronousListenerBus.scala:65)
    at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(AsynchronousListenerBus.scala:65)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
    at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(AsynchronousListenerBus.scala:64)
    at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1181)
    at org.apache.spark.util.AsynchronousListenerBus$$anon$1.run(AsynchronousListenerBus.scala:63)
Run Code Online (Sandbox Code Playgroud)

另外,我得到了很多以下日志:

16/10/26 10:30:32 INFO spark.MapOutputTrackerMaster: Size of output statuses for shuffle 320 is 263 bytes
16/10/26 10:30:32 INFO spark.MapOutputTrackerMaster: Size of output statuses for shuffle 321 is 268 bytes
16/10/26 10:30:32 INFO spark.MapOutputTrackerMaster: Size of output statuses for shuffle 322 is 264 bytes
Run Code Online (Sandbox Code Playgroud)

我的问题是有人尝试过这种规模的ConnectedComponents吗?如果是,我在做什么错?

Shi*_*mar 6

正如我在上面的评论中发布的那样,我在 Spark 上使用 map/reduce 实现了连接组件。您可以在此处找到更多详细信息 - https://www.linkedin.com/pulse/connected-component-using-map-reduce-apache-spark-shirish-kumar和 MIT 许可下的源代码在此处 - https://github .com/kwartile/connected-component


Wil*_*ton 4

连接组件算法的扩展性不是很好,其性能在很大程度上取决于图的拓扑。边缘的稀疏并不意味着组件很小。一长串边非常稀疏(边数 = 顶点数 - 1),但 GraphX 中实现的强力算法不会非常有效(请参阅ccpregel的来源)。

您可以尝试以下方法(已排序,仅代码):

  1. 检查镶木地板(在磁盘上)中的顶点和边,然后再次加载它们以构建图形。当执行计划变得太大时,缓存有时并不能减少它。
  2. 以保持算法结果不变的方式转换图表。例如,您可以在代码中看到算法正在两个方向传播信息(默认情况下应该如此)。因此,如果您有多个连接相同两个顶点的边,请从应用算法的图中将它们过滤掉。
  3. 自己优化 GraphX 代码(实际上非​​常简单),使用节省内存的通用优化(即每次迭代时在磁盘上设置检查点以避免 OOM)或特定于域的优化(类似于第 2 点)

如果您可以放弃 GraphX(它正在变得有些遗留),您可以考虑 GraphFrames(博客 )。我没试过,不知道有没有CC。

我确信您可以在 Spark 包中找到其他可能性,但也许您甚至想使用 Spark 之外的东西。但这超出了问题的范围。

祝你好运!