标签: spark-graphx

Spark GraphX内存错误SparkListenerBus(java.lang.OutOfMemoryError:Java堆空间)

Apache Spark(Graphx)上的内存不足有问题.应用程序运行,但一段时间后关闭.我使用Spark 1.2.0.群集有足够的内存和多个内核.我没有使用GraphX的其他应用程序,运行没有问题.应用使用Pregel.

我在Hadoop YARN模式下提交申请:

HADOOP_CONF_DIR =/etc/hadoop/conf spark-submit --class DPFile --deploy-mode cluster --master yarn --num-executors 4 --driver-memory 10g --executor-memory 6g --executor-cores 8 - -files log4j.properties spark_routing_2.10-1.0.jar road_cr_big2 1000

Spark配置:

val conf = new SparkConf(true)
    .set("spark.eventLog.overwrite", "true")
    .set("spark.driver.extraJavaOptions", "-Dlog4j.configuration=log4j.properties")
    .set("spark.yarn.applicationMaster.waitTries", "60")
    .set("yarn.log-aggregation-enable","true")
    .set("spark.akka.frameSize", "500") 
    .set("spark.akka.askTimeout", "600") 
    .set("spark.core.connection.ack.wait.timeout", "600")
    .set("spark.akka.timeout","1000")
    .set("spark.akka.heartbeat.pauses","60000")
    .set("spark.akka.failure-detector.threshold","3000.0")
    .set("spark.akka.heartbeat.interval","10000")
    .set("spark.ui.retainedStages","100")
    .set("spark.ui.retainedJobs","100")
    .set("spark.driver.maxResultSize","4G")
Run Code Online (Sandbox Code Playgroud)

谢谢你的回答.

日志:

ERROR Utils: Uncaught exception in thread SparkListenerBus    
java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Arrays.java:2367)
at java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:130)
at java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:114)
at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:415)
at java.lang.StringBuilder.append(StringBuilder.java:132)
at scala.collection.mutable.StringBuilder.append(StringBuilder.scala:197)
at …
Run Code Online (Sandbox Code Playgroud)

hadoop-yarn apache-spark spark-graphx

6
推荐指数
0
解决办法
903
查看次数

如何使用 Long 数据类型在 Apache Spark GraphX 中创建 VertexId?

我正在尝试使用一些可以在此处找到的 Google Web Graph 数据创建一个图表:

https://snap.stanford.edu/data/web-Google.html

import org.apache.spark._
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD



val textFile = sc.textFile("hdfs://n018-data.hursley.ibm.com/user/romeo/web-Google.txt")
val arrayForm = textFile.filter(_.charAt(0)!='#').map(_.split("\\s+")).cache()
val nodes = arrayForm.flatMap(array => array).distinct().map(_.toLong)
val edges = arrayForm.map(line => Edge(line(0).toLong,line(1).toLong))

val graph = Graph(nodes,edges)
Run Code Online (Sandbox Code Playgroud)

不幸的是,我收到此错误:

<console>:27: error: type mismatch;
 found   : org.apache.spark.rdd.RDD[Long]
 required: org.apache.spark.rdd.RDD[(org.apache.spark.graphx.VertexId, ?)]
Error occurred in an application involving default arguments.
       val graph = Graph(nodes,edges)
Run Code Online (Sandbox Code Playgroud)

那么如何创建 VertexId 对象呢?根据我的理解,传递一个 Long 应该就足够了。

有任何想法吗?

非常感谢!

罗密欧

scala apache-spark spark-graphx

5
推荐指数
1
解决办法
5919
查看次数

使用HDFS在Spark Graphx中存储图形

我在Spark的GraphX中构建了一个图形.这个图表可能有10亿个节点和超过100亿个边缘,所以我不想一遍又一遍地构建这个图.

我希望能够构建一次,保存它(我认为最好的是HDFS),在它上面运行一些进程,然后在几天或几周内访问它,添加一些新的节点和边缘,然后运行还有更多的流程.

我怎么能在Apache Spark的GraphX中做到这一点?

编辑:我想我找到了一个潜在的解决方案,但我希望有人确认这是否是最好的方法.

如果我有一个图表,比方说graph,我必须将图形的顶点RDD和它的edgeRDD分别存储在文本文件中.然后,稍后,我可以访问这些文本文件,如下所示:

graph.vertices.saveAsTextFile(somePath)
graph.edges.saveAsTextFile(somePath)
Run Code Online (Sandbox Code Playgroud)

我现在面临的一个问题是:我应该使用saveAsTextFile()还是saveAsObjectFile()?然后我该如何在以后访问这些文件?

apache-spark spark-graphx

5
推荐指数
2
解决办法
3722
查看次数

在spark上没有有效的构造函数

这是我的代码:

class FNNode(val name: String)

case class Ingredient(override val name: String, category: String) extends FNNode(name)


val ingredients: RDD[(VertexId, FNNode)] = 
sc.textFile(PATH+"ingr_info.tsv").
      filter(! _.startsWith("#")).
      map(line => line.split('\t')).
      map(x => (x(0).toInt ,Ingredient(x(1), x(2))))
Run Code Online (Sandbox Code Playgroud)

当我定义这些变量时没有错误.但是,在尝试执行它时:

ingredients.take(1)
Run Code Online (Sandbox Code Playgroud)

我明白了

org.apache.spark.SparkException: Job aborted due to stage failure: Exception while getting task result: java.io.InvalidClassException: $iwC$$iwC$Ingredient; no valid constructor
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
Run Code Online (Sandbox Code Playgroud)

根据这里的答案,这似乎与序列化问题有关.但是,如果它确实是序列化问题,我不知道如何解决这个问题.

我沿着代码以下本书通过他们的方式,所以我会认为这应该在某个时候已经至少工作过?

scala apache-spark spark-graphx

5
推荐指数
1
解决办法
2252
查看次数

许多人在Spark UI中跳过Pregel的阶段

我尝试运行connected componentslogNormalGraph.

val graph: Graph[Long, Int] = GraphGenerators.
    logNormalGraph(context.spark, numEParts = 10, numVertices = 1000000,
        mu = 0.01, sigma = 0.01)

val minGraph = graph.connectedComponents()
Run Code Online (Sandbox Code Playgroud)

在每个下一个工作的火花UI中,我可以看到不断增加的跳过阶段

1 - 4/4 (12 skipped)
2 - 4/4 (23 skipped)
...
50 - 4/4 (4079 skipped)
Run Code Online (Sandbox Code Playgroud)

当我在Pregel上运行某些东西以及为什么这个数字增长如此之快(非线性)时,为什么会有这么多跳过的阶段?

apache-spark spark-graphx

5
推荐指数
1
解决办法
916
查看次数

查找特定节点的连接组件而不是整个图(GraphFrame/GraphX)

我在 Spark 中创建了一个 GraphFrame,该图目前如下所示:

基本上,会有很多这样的子图,其中每个子图都将彼此断开。给定一个特定的节点 ID,我想在子图中找到所有其他节点。例如,如果给定节点 ID 1,则图将遍历并返回 2,10,20,3,30。

我创建了一个主题,但它没有给出正确的结果。

testgraph.find("(a)-[]->(b); (c)-[]->(b)").filter("(a.id = '1')").show()
Run Code Online (Sandbox Code Playgroud)

不幸的是,连通分量函数考虑了整个图。是否可以使用GraphFrame/GraphX在给定特定节点 ID 的情况下获取断开连接的子图中的所有节点?

apache-spark spark-graphx spark-dataframe graphframes

5
推荐指数
1
解决办法
1950
查看次数

如何处理Apache Spark中集群节点之间要独立处理的不同图形文件?

?假设我有大量的图形文件,每个图形大约有 500K 条边。我一直在 Apache Spark 上处理这些图形文件,我想知道如何有效地并行化整个图形处理作业。由于现在每个图形文件都是独立的,我正在寻找与文件的并行性。所以,如果我有 100 个图形文件,我有 20 个节点集群,我可以处理每个节点上的每个文件,所以每个节点将处理 5 个文件。现在,正在发生的事情就像是在多个阶段中处理单个图,这导致了很多改组。

graphFile = "/mnt/bucket/edges" #This directory has 100 graph files each file with around 500K edges

nodeFile = "/mnt/bucket/nodes" #This directory has node files

graphData = sc.textFile(graphFile).map(lambda line: line.split(" ")).flatMap(lambda edge: [(int(edge[0]),int(edge[1]))])

graphDataFrame = sqlContext.createDataFrame(graphData, ['src', 'dst']).withColumn("relationship", lit('edges')) # Dataframe created so as to work with Graphframes

nodeData = sc.textFile(nodeFile).map(lambda line: line.split("\s")).flatMap(lambda edge: [(int(edge[0]),)])

nodeDataFrame = sqlContext.createDataFrame(nodeData, ['id'])

graphGraphFrame = GraphFrame(nodeDataFrame, graphDataFrame)

connectedComponent = graphGraphFrame.connectedComponents()
Run Code Online (Sandbox Code Playgroud)

问题是它需要花费大量时间来处理甚至几个文件。我必须处理像 20K 的文件。每个文件有 80 …

dataframe apache-spark apache-spark-sql spark-graphx graphframes

5
推荐指数
1
解决办法
321
查看次数

使用Spark的Graphx中最短路径性能

我正在从和类型的gz压缩json文件中创建图形。edgevertices

我已将文件放在此处的保管箱文件夹中

我加载并映射这些json记录以创建所需的verticesedge类型,graphx如下所示:

val vertices_raw = sqlContext.read.json("path/vertices.json.gz")
val vertices = vertices_raw.rdd.map(row=> ((row.getAs[String]("toid").stripPrefix("osgb").toLong),row.getAs[Long]("index")))
val verticesRDD: RDD[(VertexId, Long)] = vertices
val edges_raw = sqlContext.read.json("path/edges.json.gz")
val edgesRDD = edges_raw.rdd.map(row=>(Edge(row.getAs[String]("positiveNode").stripPrefix("osgb").toLong, row.getAs[String]("negativeNode").stripPrefix("osgb").toLong, row.getAs[Double]("length"))))
val my_graph: Graph[(Long),Double] = Graph.apply(verticesRDD, edgesRDD).partitionBy(PartitionStrategy.RandomVertexCut)
Run Code Online (Sandbox Code Playgroud)

然后,我使用dijkstra发现的这种实现来计算两个顶点之间的最短路径:

def dijkstra[VD](g: Graph[VD, Double], origin: VertexId) = {
          var g2 = g.mapVertices(
        (vid, vd) => (false, if (vid == origin) 0 else Double.MaxValue, List[VertexId]())
          )
          for (i …
Run Code Online (Sandbox Code Playgroud)

json apache-spark spark-graphx

5
推荐指数
1
解决办法
617
查看次数

Graphframes:spark graphframes 中两个顶点列表之间的 BFS

我的目标是确定两个顶点之间的最大路径长度是否<= 4。

我有一个图形数据框和一个以下格式的测试文件。

我正在尝试从图形数据帧的 bfs 函数获取输出列(OP)。

Col1, Col2, OP
a1,   a4,   true
a2,   a1,   false
a3,   a5,   true
Run Code Online (Sandbox Code Playgroud)

目前,我正在循环每一行并应用 bfs,如下所示

gf.bfs.fromExpr("id = 'a1'").toExpr("id = 'a4'").maxPathLength(4).run()
Run Code Online (Sandbox Code Playgroud)

有没有更好的方法可以直接插入源和目标处的顶点列表来计算图框中的 bfs。

apache-spark spark-graphx graphframes

5
推荐指数
0
解决办法
647
查看次数

如何在 Spark Scala 中使用 Graph.fromEdgeTuples 从 CSV 文件创建图表

Spark我是和的新手Scala,我正在尝试执行一项简单的任务,即根据文本文件中的数据创建图形。

从文档中

https://spark.apache.org/docs/0.9.0/api/graphx/index.html#org.apache.spark.graphx.Graph $@fromEdges[VD,ED]%28RDD[Edge[ED]], VD%29%28ClassTag[VD],ClassTag[ED]%29:图表[VD,ED]

我可以看到我可以创建一个图表tuples of vertices

我的简单文本文件如下所示,其中每个数字都是一个顶点:

v1 v3
v2 v1
v3 v4
v4
v5 v3
Run Code Online (Sandbox Code Playgroud)

当我从文件中读取数据时

val myVertices = myData.map(line=>line.split(" ")) 我得到一个 RDD[Array[String]]。

我的问题是:

  1. 如果这是解决问题的正确方法,我如何将其转换RDD[Array[String]]为正确的格式,根据文档是RDD[(VertexId, VertexId)](也VertexID必须是长类型,并且我正在使用字符串)

  2. 是否有另一种更简单的方法可以从类似的 csv 文件结构构建图表?

任何建议都将非常受欢迎。谢谢!

csv scala apache-spark rdd spark-graphx

4
推荐指数
1
解决办法
6052
查看次数