如何从graphx中的元组构建图形并在之后标记节点?

Sté*_*e C 6 serialization scala graph apache-spark

这里可以找到一些上下文,我的想法是我已经从Hive表上的请求中收集的元组创建了一个图形.这些对应于国家之间的贸易关系.以这种方式构建图形后,顶点未标记.我想学习学位分布并获得最相关国家的名字.我尝试了两个选项:

  • 第一:我尝试用顶点的字符串名称映射顶点的索引,函数idMapbis在函数内部,该函数收集并打印十个顶部连接度.
  • 第二:我试图将标签添加到图形本身的顶点.

在这两种情况下,我都会收到以下错误:任务不可序列化

全球代码:

import org.apache.spark.SparkContext
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD

val sqlContext= new org.apache.spark.sql.hive.HiveContext(sc)
val data = sqlContext.sql("select year, trade_flow, reporter_iso, partner_iso, sum(trade_value_us) from comtrade.annual_hs where length(commodity_code)='2' and not partner_iso='WLD' group by year, trade_flow, reporter_iso, partner_iso").collect()
val data_2010 = data.filter(line => line(0)==2010)
val couples = data_2010.map(line=>(line(2),line(3))) //pays->pays 
Run Code Online (Sandbox Code Playgroud)

情侣看起来像这样:数组[(任何,任何)] =数组((MWI,MOZ),(WSM,AUS),(MDA,CRI),(KNA,HTI),(PER,ERI),(SWE,CUB) ),...

val idMap = sc.broadcast(couples 
.flatMap{case (x: String, y: String) => Seq(x, y)}
.distinct 
.zipWithIndex  
.map{case (k, v) => (k, v.toLong)}  
.toMap) 

val edges: RDD[(VertexId, VertexId)] = sc.parallelize(couples
.map{case (x: String, y: String) => (idMap.value(x), idMap.value(y))})

val graph = Graph.fromEdgeTuples(edges, 1)
Run Code Online (Sandbox Code Playgroud)

以这种方式构建,顶点看起来像(68,1)

val degrees: VertexRDD[Int] = graph.degrees.cache()

//Most connected vertices 
def topNamesAndDegrees(degrees: VertexRDD[Int], graph: Graph[Int, Int]): Array[(Int, Int)] = {
val namesAndDegrees = degrees.innerJoin(graph.vertices) {
 (id, degree, k) => (id.toInt, degree)}
val ord = Ordering.by[(Int, Int), Int](_._2)
namesAndDegrees.map(_._2).top(10)(ord)}
topNamesAndDegrees(degrees, graph).foreach(println)
Run Code Online (Sandbox Code Playgroud)

我们得到:(79,1016),(64,912),(55,889)......

检索名称的第一个选项:

val idMapbis = sc.parallelize(couples
.flatMap{case (x: String, y: String) => Seq(x, y)} 
.distinct 
.zipWithIndex  
.map{case (k, v) => (v,k)}  
.toMap)

def topNamesAndDegrees(degrees: VertexRDD[Int], graph: Graph[Int, Int]):  Array[(String, Int)] = {
val namesAndDegrees = degrees.innerJoin(graph.vertices) {
 (id, degree, name) => (idMapbis.value(id.toInt), degree)}
val ord = Ordering.by[(String, Int), Int](_._2)
namesAndDegrees.map(_._2).top(10)(ord)}
topNamesAndDegrees(degrees, graph).foreach(println)
Run Code Online (Sandbox Code Playgroud)

该任务不可序列化,但函数idMapbis正在工作,因为idMapbis.value没有错误(graph.vertices.take(1)(0)._ 1.toInt)

选项2:

graph.vertices.map{case (k, v) => (k,idMapbis.value(k.toInt))}
Run Code Online (Sandbox Code Playgroud)

该任务不再可序列化(对于上下文,这里是如何修改topNamesAndDegrees以获取此选项中连接最多的顶点的名称)

def topNamesAndDegrees(degrees: VertexRDD[Int], graph: Graph[Int, Int]): Array[(String, Int)] = {
val namesAndDegrees = degrees.innerJoin(graph.vertices) {
 (id, degree, name) => (name, degree)}
val ord = Ordering.by[(String, Int), Int](_._2)
namesAndDegrees.map(_._2).top(10)(ord)}
topNamesAndDegrees(degrees, graph).foreach(println)
Run Code Online (Sandbox Code Playgroud)

我有兴趣了解如何改进这个选项之一,如果有人知道如何,也许两者都有.

zer*_*323 4

您的尝试存在的问题是,这idMapbis是一个RDD. 由于我们已经知道您的数据适合内存,因此您可以像以前一样简单地使用广播变量:

val idMapRev = sc.broadcast(idMap.value.map{case (k, v) => (v, k)}.toMap)
graph.mapVertices{case (id, _) => idMapRev.value(id)}
Run Code Online (Sandbox Code Playgroud)

或者,您可以从一开始就使用正确的标签:

val countries: RDD[(VertexId, String)] = sc
  .parallelize(idMap.value.map(_.swap).toSeq)

val relationships: RDD[Edge[Int]] = sc.parallelize(couples
 .map{case (x: String, y: String) => Edge(idMap.value(x), idMap.value(y), 1)}
)

val graph = Graph(countries, relationships)
Run Code Online (Sandbox Code Playgroud)

第二种方法有一个重要的优点 - 如果图很大,您可以相对轻松地用连接替换广播变量。