标签: spark-graphx

在 Spark GraphX 中寻找最大边权重

假设我有一个边属性双值的图,我想找到我的图的最大边权重。如果我这样做:

val max = sc.accumulator(0.0) //max holds the maximum edge weight
g.edges.distinct.collect.foreach{ e => if (e.attr > max.value) max.value
= e.attr }
Run Code Online (Sandbox Code Playgroud)

我想问一下在master上做了多少工作,在executors上做了多少,因为我知道collect()方法把整个RDD带到了master上?是否发生并行?有没有更好的方法来找到最大边缘权重?

笔记:

g.edges.distinct.foreach{ e => if (e.attr > max.value) max.value =
e.attr } // does not work without the collect() method.
//I use an accumulator because I want to use the max edge weight later
Run Code Online (Sandbox Code Playgroud)

如果我想对两个图之间具有相同 srcId 和 dstId 的边的属性应用一些平均函数,最好的方法是什么?

scala apache-spark spark-graphx

4
推荐指数
1
解决办法
682
查看次数

Titan和Spark-GraphX有什么区别,哪一个是首选?

我正在寻找Titan和Spark-GraphX之间的区别,哪一个最好用.我用Google搜索了,但没有得到关于此的文章

有人可以提供指针吗?

graph-databases titan spark-graphx

4
推荐指数
1
解决办法
2907
查看次数

更新 graphx 中的边权重

我在玩graphx。我已经建立了一个图表,我正在尝试更新关系的权重,

import org.apache.spark.rdd.RDD
import org.apache.spark.graphx._
def pageHash(title:String )  = title.toLowerCase.replace(" ","").hashCode.toLong


val vertexArray = Array(
  (pageHash("Alice"), ("Alice")),
(pageHash("Bob"), ("Bob")),
(pageHash("Charlie"), ("Charlie")),
(pageHash("David"), ("David")),
(pageHash("Ed"), ("Ed")),
(pageHash("Fran"), ("Fran"))
)     
val edgeArray = Array(
 Edge(pageHash("Bob"), pageHash("Alice"), 7),
 Edge(pageHash("Bob"), pageHash("David"), 2),
Edge(pageHash("Charlie"), pageHash("Bob"), 4),
Edge(pageHash("Charlie"), pageHash("Fran"), 3),
Edge(pageHash("David"), pageHash("Alice"), 1),
Edge(pageHash("Ed"), pageHash("Bob"), 2),
Edge(pageHash("Ed"), pageHash("Charlie"), 8),
Edge(pageHash("Ed"), pageHash("Fran"), 3)
)    


val vertexRDD: RDD[(Long, (String))] = sc.parallelize(vertexArray)
val edgeRDD: RDD[Edge[Int]] = sc.parallelize(edgeArray)
val graph: Graph[(String), Int] = Graph(vertexRDD, edgeRDD)

graph.triplets.filter(triplet => triplet.srcAttr.equals("Bob")&&triplet.dstAttr.equals("Alice")).collect()

graph.triplets.filter(triplet => …
Run Code Online (Sandbox Code Playgroud)

scala apache-spark spark-graphx

4
推荐指数
1
解决办法
1176
查看次数

在 Spark 中为每个 Executor 创建数组并组合成 RDD

我正在从基于 MPI 的系统转向 Apache Spark。我需要在 Spark 中执行以下操作。

假设,我有n顶点。我想从这些n顶点创建一个边列表。边只是两个整数 (u,v) 的元组,不需要属性。

但是,我想在每个执行程序中独立地并行创建它们。因此,我想PPSpark Executors独立创建边缘数组。每个数组可能有不同的大小并取决于顶点,因此,我还需要从0to的执行者 ID n-1。接下来,我想要一个全局 RDD 边缘数组。

在 MPI 中,我将使用处理器等级在每个处理器中创建一个数组。我如何在 Spark 中做到这一点,尤其是使用GraphX库?

因此,我的主要目标是在每个执行器中创建一组边并将它们组合成一个 RDD。

我首先尝试了鄂尔多斯的一个修改版本——仁义模型。作为参数,我只有节点数 n 和概率 p。

假设,执行程序i必须处理从101到 的节点200。对于任何节点,例如 node 101,它将以概率 p从101到创建边102 -- n。在每个执行程序创建分配的边后,我将实例化 GraphXEdgeRDDVertexRDD. 因此,我的计划是在每个执行器中独立创建边缘列表,并将它们合并到RDD.

scala apache-spark spark-graphx

3
推荐指数
1
解决办法
765
查看次数

在 Spark 中使用 Windows 函数进行每周聚合

我有从 2017 年 1 月 1 日到 2017 年 1 月 7 日的数据,这是每周想要的一周汇总数据。我以下列方式使用窗口函数

val df_v_3 = df_v_2.groupBy(window(col("DateTime"), "7 day"))
      .agg(sum("Value") as "aggregate_sum")
      .select("window.start", "window.end", "aggregate_sum")
Run Code Online (Sandbox Code Playgroud)

我在数据框中有数据作为

    DateTime,value
    2017-01-01T00:00:00.000+05:30,1.2
    2017-01-01T00:15:00.000+05:30,1.30
--
    2017-01-07T23:30:00.000+05:30,1.43
    2017-01-07T23:45:00.000+05:30,1.4
Run Code Online (Sandbox Code Playgroud)

我得到的输出为:

2016-12-29T05:30:00.000+05:30,2017-01-05T05:30:00.000+05:30,723.87
2017-01-05T05:30:00.000+05:30,2017-01-12T05:30:00.000+05:30,616.74
Run Code Online (Sandbox Code Playgroud)

它显示我的一天是从 2016 年 12 月 29 日开始,但实际数据是从 2017 年 1 月 1 日开始,为什么会出现这种保证金?

scala dataframe apache-spark spark-graphx

3
推荐指数
1
解决办法
4034
查看次数

如何使用 pyspark graphframe pregel API 实现循环检测

我正在尝试使用 Pyspark 和 graphframes 的 pregel 包装器来实现 Rocha & Thatte 的算法(http://cdsid.org.br/sbpo2015/wp-content/uploads/2015/08/142825.pdf )。在这里,我遇到了消息聚合的正确语法问题。

这个想法是直截了当的:

...在每次传递中,G 的每个活动顶点都会向其外围邻居发送一组顶点序列,如下所述。在第一遍中,每个顶点 v 向其所有邻居发送消息 (v)。在后续迭代中,每个活动顶点 v 将 v 附加到它在上一次迭代中接收到的每个序列。然后它将所有更新的序列发送到其外围邻居。如果 v 在上一次迭代中没有收到任何消息,则 v 将自行停用。当所有顶点都已停用时,算法终止。...

我的想法是将顶点 id 发送到目标顶点(dst),并在聚合函数中将它们收集到一个列表中。然后,在我的顶点列“序列”中,我想将这个新列表项与现有列表项追加/合并,然后使用 when 语句检查当前顶点 id 是否已在序列中。然后我可以根据顶点列将顶点设置为 true 以将它们标记为循环。但我在 Spark 中找不到关于如何连接它的正确语法。有人有想法吗?或者实施类似的东西?

我当前的代码

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
import pyspark.sql.functions as f
from pyspark.sql.functions import coalesce, col, lit, sum, when

from graphframes import GraphFrame
from graphframes.lib import *



SimpleCycle=[
    ("1","2"),
    ("2","3"),
    ("3","4"),
    ("4","5"),
    ("5","2"),
    ("5","6") …
Run Code Online (Sandbox Code Playgroud)

graph-theory spark-graphx pyspark graphframes pregel

3
推荐指数
1
解决办法
2932
查看次数

GraphX是否支持同一图表中的不同类型的顶点?

我想知道我可以用不同类型的顶点建模GraphX图吗?说我有以下实体:产品,买家,卖家.我想形成一个图形结构,将这些实体作为顶点.(例如:以图形方式显示卖方出售并由买方购买的产品.)是否可以使用GraphX,如果可以,如何?谢谢!

spark-graphx

2
推荐指数
1
解决办法
708
查看次数

GraphX - 加权最短路径实现 - java.lang.NoSuchMethodError

编辑 - 我发现这本书是为scala写的,1.6但剩下的就是2.11.

我正在尝试实现Michael Malak和Robin East的Spark GraphX in Action书中的加权最短路径算法.有问题的部分是清单6.4"执行使用面包屑的最短路径算法",这里是第6章.

我有自己的图表,我是从两个RDD创建的.有344436顶点和772983边.我可以使用原生GraphX库执行未加权的最短路径计算,我对图形构造很有信心.

在这种情况下,我使用他们的Dijkstra实现如下:

val my_graph: Graph[(Long),Double] = Graph.apply(verticesRDD, edgesRDD).cache()

def dijkstra[VD](g:Graph[VD,Double], origin:VertexId) = {
  var g2 = g.mapVertices(
      (vid,vd) => (false, if (vid == origin) 0 else Double.MaxValue, List[VertexId]())
  )

  for (i <- 1L to g.vertices.count-1) {
    val currentVertexId = g2.vertices
      .filter(!_._2._1)
      .fold((0L, (false, Double.MaxValue, List[VertexId]())))(
          (a,b) => if (a._2._2 < b._2._2) a else b)
      )
      ._1

    val newDistances = …
Run Code Online (Sandbox Code Playgroud)

java scala apache-spark spark-graphx

2
推荐指数
1
解决办法
818
查看次数

在pyspark中使用graphX

是否有用于 GraphX 的 Python API?我遇到了 Scala API,但我想知道是否可以在 PySpark 中使用 GraphX 功能。

apache-spark spark-graphx pyspark

2
推荐指数
1
解决办法
3524
查看次数

如何从数据框构建图形?(GraphX)

我是 Scala 和 Spark 的新手,我需要从数据框构建一个图表。这是我的数据框的结构,其中 S 和 O 是节点,列 P 表示边。

+---------------------------+---------------------+----------------------------+
|S                          |P                    |O                           |
+---------------------------+---------------------+----------------------------+
|http://website/Jimmy_Carter|http://web/name      |James Earl Carter           |
|http://website/Jimmy_Car   |http://web/country   |http://website/United_States|
|http://website/Jimmy_Car   |http://web/birthPlace|http://web/Georgia_(US)     |
+---------------------------+---------------------+----------------------------+
Run Code Online (Sandbox Code Playgroud)

这是数据框的代码,我想从数据框“dfA”创建一个图形

 val test = sc
     .textFile("testfile.ttl")
     .map(_.split(" "))
     .map(p => Triple(Try(p(0).toString()).toOption,
                      Try(p(1).toString()).toOption,
                      Try(p(2).toString()).toOption))
     .toDF()

  val url_regex = """^(?:"|<{1}\s?)(.*)(?:>(?:\s\.)?|,\s.*)$"""
  val dfA = test
      .withColumn("Subject", regexp_extract($"Subject", url_regex, 1))
      .withColumn("Predicate", regexp_extract($"Predicate", url_regex, 1))
      .withColumn("Object", regexp_extract($"Object", url_regex, 1))
Run Code Online (Sandbox Code Playgroud)

scala graph dataframe apache-spark spark-graphx

2
推荐指数
1
解决办法
3069
查看次数