标签: graphframes

使用 Pyspark 从关系数据集构建层次结构

我是 Python 新手,一直致力于从关系数据集构建层次结构。
如果有人知道如何进行此操作,那将有巨大的帮助。

我有一个关系数据集,其中包含如下数据

_currentnode,  childnode_  
 root,         child1  
 child1,       leaf2  
 child1,       child3  
 child1,       leaf4  
 child3,       leaf5  
 child3,       leaf6  
Run Code Online (Sandbox Code Playgroud)

很快。我正在寻找一些 python 或 pyspark 代码来
构建如下所示的层次结构数据框

_level1, level2,  level3,  level4_  
root,    child1,  leaf2,   null  
root,    child1,  child3,  leaf5  
root,    child1,  child3,  leaf6  
root,    child1,  leaf4,   null  
Run Code Online (Sandbox Code Playgroud)

这些数据是字母数字,是一个巨大的数据集[约 5000 万条记录]。
此外,层次结构的根是已知的,并且可以在代码中硬连线。
因此,在上面的示例中,层次结构的根是“root”。

python hierarchy apache-spark pyspark graphframes

9
推荐指数
1
解决办法
4999
查看次数

使用Spark Graphframes进行分区

我正在使用Spark Graphframes处理一个较大的(?)图形(6000万个顶点和95亿个边缘).基础数据不大 - 磁盘上的顶点约为500mb,边缘约为40gb.由于java堆内存不足问题,我的容器经常关闭,但我认为底层问题是graphframe一直在不停地调整数据(我看到shuffle读/写高达150gb).有没有办法有效地划分Graphframe或底层边/顶点以减少shuffle?

apache-spark graphframes

8
推荐指数
1
解决办法
1081
查看次数

PySpark GraphFrame 的正确子图

graphframes是一个基于PySpark DataFrames的网络分析工具。以下代码是教程子图示例的修改版本:

from graphframes.examples import Graphs
import graphframes
g = Graphs(sqlContext).friends()  # Get example graph
# Select subgraph of users older than 30
v2 = g.vertices.filter("age > 30")
g2 = graphframes.GraphFrame(v2, g.edges)
Run Code Online (Sandbox Code Playgroud)

人们会期望与原始图相比,新图g2将包含更少的节点和边g。然而,这种情况并非如此:

print(g.vertices.count(), g.edges.count())
print(g2.vertices.count(), g2.edges.count())
Run Code Online (Sandbox Code Playgroud)

给出输出:

(6, 7)
(7, 4)
Run Code Online (Sandbox Code Playgroud)

很明显,结果图包含不存在节点的边。更令人不安的是g.degreesg2.degrees是相同的。这意味着至少某些图形功能会忽略节点信息。有没有一种好方法可以确保仅使用提供的参数和参数GraphFrame的交集来创建图形?nodesedges

python pyspark graphframes

7
推荐指数
1
解决办法
2961
查看次数

使用 GraphFrames Spark 在加权有向图中查找最短路径

spark的graphFrames包很棒。我可以使用命令找到从“a”到“d”的最短路径

val results = g.shortestPaths.landmarks(Seq("a", "d")).run()
Run Code Online (Sandbox Code Playgroud)

但是如何定义加权图并计算两个节点之间的最短路径?

谢谢。

shortest-path graphframes weighted-graph

6
推荐指数
0
解决办法
605
查看次数

Graphframes: py4j.protocol.Py4JJavaError: 调用 o100.createGraph 时出错

我使用 Spark 2.4.4 运行一个简单的 EMR 集群,我想使用 graphframes v0.7 来运行以下代码:

from pyspark import *
from pyspark.sql import *
from graphframes import *


sc= SparkContext().getOrCreate()
sc.setLogLevel("ERROR")
spark = SparkSession.builder.appName('graphFrames').getOrCreate()
spark.sparkContext.addPyFile("/home/hadoop/jars/graphframes.zip")

vertices = spark.createDataFrame([('1', 'Carter', 'Derrick', 50),
                                  ('2', 'May', 'Derrick', 26),
                                 ('3', 'Mills', 'Jeff', 80),
                                  ('4', 'Hood', 'Robert', 65),
                                  ('5', 'Banks', 'Mike', 93),
                                 ('98', 'Berg', 'Tim', 28),
                                 ('99', 'Page', 'Allan', 16)],
                                 ['id', 'name', 'firstname', 'age'])
edges = spark.createDataFrame([('1', '2', 'friend'),
                               ('2', '1', 'friend'),
                              ('3', '1', 'friend'),
                              ('1', '3', 'friend'),
                               ('2', '3', 'follows'),
                               ('3', …
Run Code Online (Sandbox Code Playgroud)

amazon-emr apache-spark pyspark graphframes

6
推荐指数
0
解决办法
623
查看次数

在 kubernetes 上安装 PySpark 软件包时出现 Spark-Submit: ivy-cache file not found 错误

我一整天都在与它斗争。我能够安装并使用带有 Spark shell 或连接的 Jupiter 笔记本的包(graphframes),但我想使用 Spark-Submit 将其移动到基于 kubernetes 的 Spark 环境。我的spark版本:3.0.1 我从spark-packages下载了最后一个可用的.jar文件(graphframes-0.8.1-spark3.0-s_2.12.jar)并将其放入jars文件夹中。我使用标准 Spark docker 文件的变体来构建我的图像。我的 Spark-submit 命令如下所示:

$SPARK_HOME/bin/spark-submit \
--master k8s://https://kubernetes.docker.internal:6443 \
--deploy-mode cluster \
--conf spark.executor.instances=$2 \
--conf spark.kubernetes.container.image=myimage.io/repositorypath \
--packages graphframes:graphframes:0.8.1-spark3.0-s_2.12 \
--jars "local:///opt/spark/jars/graphframes-0.8.1-spark3.0-s_2.12.jar" \
path/to/my/script/script.py
Run Code Online (Sandbox Code Playgroud)

但它以错误结束

Ivy Default Cache set to: /opt/spark/.ivy2/cache
The jars for the packages stored in: /opt/spark/.ivy2/jars
:: loading settings :: url = jar:file:/opt/spark/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
graphframes#graphframes added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-e833e157-44f5-4055-81a4-3ab524176ef5;1.0
    confs: [default]
Exception in …
Run Code Online (Sandbox Code Playgroud)

ivy apache-spark pyspark graphframes spark-submit

6
推荐指数
2
解决办法
7917
查看次数

可变长度图案 GraphFrames

我正在尝试使用 GraphFrames 查找从节点 A 到节点 B 且路径长度 < 10 的所有路径。我可以使用以下代码来做到这一点,但是,想知道是否有更好的方法来做到这一点。

val graph = GraphFrame(vertices, edges)


val motif1 = graph.find("(start)-[]->(d1)").select($"start.id".as("start_id"), $"d1.id".as("end_id"))
val motif2 = graph.find("(start)-[]->(d1); (d1)-[]->(d2)").select($"start.id".as("start_id"), $"d2.id".as("end_id"))
val motif3 = graph.find("(start)-[]->(d1); (d1)-[]->(d2); (d2)-[]->(d3)").select($"start.id".as("start_id"), $"d3.id".as("end_id"))
val motif4 = graph.find("(start)-[]->(d1); (d1)-[]->(d2); (d2)-[]->(d3); (d3)-[]->(d4)").select($"start.id".as("start_id"), $"d4.id".as("end_id"))
val motif5 = graph.find("(start)-[]->(d1); (d1)-[]->(d2); (d2)-[]->(d3); (d3)-[]->(d4)  ; (d4)-[]->(d5) ").select($"start.id".as("start_id"), $"d5.id".as("end_id"))
val motif6 = graph.find("(start)-[]->(d1); (d1)-[]->(d2); (d2)-[]->(d3); (d3)-[]->(d4)  ; (d4)-[]->(d5) ;  (d5)-[]->(d6)").select($"start.id".as("start_id"), $"d6.id".as("end_id"))
val motif7 = graph.find("(start)-[]->(d1); (d1)-[]->(d2); (d2)-[]->(d3); (d3)-[]->(d4) ; (d4)-[]->(d5) ;  (d5)-[]->(d6) ;  (d6)-[]->(d7) ").select($"start.id".as("start_id"), $"d7.id".as("end_id"))
val …
Run Code Online (Sandbox Code Playgroud)

graph apache-spark graphframes

5
推荐指数
1
解决办法
690
查看次数

没有名为graphframes的模块Jupyter Notebook

我正在遵循安装指南,但是在使用时遇到以下问题graphframes

from pyspark import SparkContext
sc =SparkContext()
!pyspark --packages graphframes:graphframes:0.5.0-spark2.1-s_2.11
from graphframes import *
Run Code Online (Sandbox Code Playgroud)

-------------------------------------------------- ------------------------- ImportError Traceback(最近一次通话最近)在()----> 1从graphframes import *

ImportError:没有名为graphframes的模块

我不确定是否可以通过以下方式安装软件包。但我会感谢您的建议和帮助。

python apache-spark graphframes

5
推荐指数
2
解决办法
1823
查看次数

Graphframes:spark graphframes 中两个顶点列表之间的 BFS

我的目标是确定两个顶点之间的最大路径长度是否<= 4。

我有一个图形数据框和一个以下格式的测试文件。

我正在尝试从图形数据帧的 bfs 函数获取输出列(OP)。

Col1, Col2, OP
a1,   a4,   true
a2,   a1,   false
a3,   a5,   true
Run Code Online (Sandbox Code Playgroud)

目前,我正在循环每一行并应用 bfs,如下所示

gf.bfs.fromExpr("id = 'a1'").toExpr("id = 'a4'").maxPathLength(4).run()
Run Code Online (Sandbox Code Playgroud)

有没有更好的方法可以直接插入源和目标处的顶点列表来计算图框中的 bfs。

apache-spark spark-graphx graphframes

5
推荐指数
0
解决办法
647
查看次数

GraphFrames:合并具有相似列值的边缘节点

tl; dr:您如何简化图形,删除具有相同name值的边节点?

我有一个图定义如下:

import graphframes
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
vertices = spark.createDataFrame([
    ('1', 'foo', '1'),
    ('2', 'bar', '2'),
    ('3', 'bar', '3'),
    ('4', 'bar', '5'),
    ('5', 'baz', '9'),
    ('6', 'blah', '1'),
    ('7', 'blah', '2'),
    ('8', 'blah', '3')
], ['id', 'name', 'value'])

edges = spark.createDataFrame([
    ('1', '2'),
    ('1', '3'),
    ('1', '4'),
    ('1', '5'),
    ('5', '6'),
    ('5', '7'),
    ('5', '8')
], ['src', 'dst'])

f = graphframes.GraphFrame(vertices, edges)
Run Code Online (Sandbox Code Playgroud)

这会生成一个看起来像这样的图(其中数字代表顶点 ID): 图形

从顶点 ID 等于 开始1,我想简化图形。这样具有相似name值的节点合并为一个节点。结果图看起来像这样:

图形

请注意我们如何只有一个 …

hadoop graph apache-spark pyspark graphframes

5
推荐指数
1
解决办法
452
查看次数