我在 Spark 中创建了一个 GraphFrame,该图目前如下所示:

基本上,会有很多这样的子图,其中每个子图都将彼此断开。给定一个特定的节点 ID,我想在子图中找到所有其他节点。例如,如果给定节点 ID 1,则图将遍历并返回 2,10,20,3,30。
我创建了一个主题,但它没有给出正确的结果。
testgraph.find("(a)-[]->(b); (c)-[]->(b)").filter("(a.id = '1')").show()
Run Code Online (Sandbox Code Playgroud)
不幸的是,连通分量函数考虑了整个图。是否可以使用GraphFrame/GraphX在给定特定节点 ID 的情况下获取断开连接的子图中的所有节点?
?假设我有大量的图形文件,每个图形大约有 500K 条边。我一直在 Apache Spark 上处理这些图形文件,我想知道如何有效地并行化整个图形处理作业。由于现在每个图形文件都是独立的,我正在寻找与文件的并行性。所以,如果我有 100 个图形文件,我有 20 个节点集群,我可以处理每个节点上的每个文件,所以每个节点将处理 5 个文件。现在,正在发生的事情就像是在多个阶段中处理单个图,这导致了很多改组。
graphFile = "/mnt/bucket/edges" #This directory has 100 graph files each file with around 500K edges
nodeFile = "/mnt/bucket/nodes" #This directory has node files
graphData = sc.textFile(graphFile).map(lambda line: line.split(" ")).flatMap(lambda edge: [(int(edge[0]),int(edge[1]))])
graphDataFrame = sqlContext.createDataFrame(graphData, ['src', 'dst']).withColumn("relationship", lit('edges')) # Dataframe created so as to work with Graphframes
nodeData = sc.textFile(nodeFile).map(lambda line: line.split("\s")).flatMap(lambda edge: [(int(edge[0]),)])
nodeDataFrame = sqlContext.createDataFrame(nodeData, ['id'])
graphGraphFrame = GraphFrame(nodeDataFrame, graphDataFrame)
connectedComponent = graphGraphFrame.connectedComponents()
Run Code Online (Sandbox Code Playgroud)
问题是它需要花费大量时间来处理甚至几个文件。我必须处理像 20K 的文件。每个文件有 80 …
dataframe apache-spark apache-spark-sql spark-graphx graphframes
我正在尝试运行以下利用图形框架的代码,但我现在遇到了一个错误,据我所知,经过几个小时的谷歌搜索后,我无法解决。似乎无法加载一个类,但我真的不知道我还应该做什么。
有人可以再看看下面的代码和错误吗?我已按照此处的说明进行操作,如果您想快速尝试一下,可以在此处找到我的数据集。
"""
Program: RUNNING GRAPH ANALYTICS WITH SPARK GRAPH-FRAMES:
Author: Dr. C. Hadjinikolis
Date: 14/09/2016
Description: This is the application's core module from where everything is executed.
The module is responsible for:
1. Loading Spark
2. Loading GraphFrames
3. Running analytics by leveraging other modules in the package.
"""
# IMPORT OTHER LIBS -------------------------------------------------------------------------------#
import os
import sys
import pandas as pd
# IMPORT SPARK ------------------------------------------------------------------------------------#
# Path to Spark source folder
USER_FILE_PATH = "/Users/christoshadjinikolis" …Run Code Online (Sandbox Code Playgroud) 我正在尝试在 AWS EMR 上的 Jupyter Notebook(使用 Sagemaker 和 sparkmagic)中使用 pyspark 中的graphframes包。在 AWS 控制台中创建 EMR 集群时,我尝试添加一个配置选项:
[{"classification":"spark-defaults", "properties":{"spark.jars.packages":"graphframes:graphframes:0.7.0-spark2.4-s_2.11"}, "configurations":[]}]
Run Code Online (Sandbox Code Playgroud)
但是在 jupyter notebook 中尝试在我的 pyspark 代码中使用 graphframes 包时,我仍然遇到错误。
这是我的代码(来自graphframes示例):
# Create a Vertex DataFrame with unique ID column "id"
v = spark.createDataFrame([
("a", "Alice", 34),
("b", "Bob", 36),
("c", "Charlie", 30),
], ["id", "name", "age"])
# Create an Edge DataFrame with "src" and "dst" columns
e = spark.createDataFrame([
("a", "b", "friend"),
("b", "c", "follow"),
("c", "b", "follow"),
], ["src", "dst", …Run Code Online (Sandbox Code Playgroud) amazon-emr apache-spark pyspark jupyter-notebook graphframes
如何在 Google colab 上安装 graphframes?
我尝试过,但在致电时!pip install graphframes收到错误。我使用的是Spark2.4。An error occurred while calling o503.loadClass.: java.lang.ClassNotFoundException: org.graphframes.GraphFramePythonAPIg = GraphFrame(v,e)
我也尝试过! pyspark --packages graphframes:graphframes:0.8.0-spark2.4-s_2.11
所有其他来源似乎都无法在 Colab 平台上运行。
tl; dr:您如何简化图形,删除具有相同name值的边节点?
我有一个图定义如下:
import graphframes
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
vertices = spark.createDataFrame([
('1', 'foo', '1'),
('2', 'bar', '2'),
('3', 'bar', '3'),
('4', 'bar', '5'),
('5', 'baz', '9'),
('6', 'blah', '1'),
('7', 'blah', '2'),
('8', 'blah', '3')
], ['id', 'name', 'value'])
edges = spark.createDataFrame([
('1', '2'),
('1', '3'),
('1', '4'),
('1', '5'),
('5', '6'),
('5', '7'),
('5', '8')
], ['src', 'dst'])
f = graphframes.GraphFrame(vertices, edges)
Run Code Online (Sandbox Code Playgroud)
从顶点 ID 等于 开始1,我想简化图形。这样具有相似name值的节点合并为一个节点。结果图看起来像这样:
请注意我们如何只有一个 …
我正在尝试使用Spark-shell安装 PySpark 包Graphframes:
pyspark --packages graphframes:graphframes:0.8.1-spark3.0-s_2.12
Run Code Online (Sandbox Code Playgroud)
但是,终端中出现这样的错误:
root@hpcc:~# pyspark --packages graphframes:graphframes:0.8.1-spark3.0-s_2.12
Python 3.6.9 (default, Jan 26 2021, 15:33:00)
[GCC 8.4.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/root/spark-3.0.2-bin-hadoop3.2/jars/spark-unsafe_2.12-3.0.2.jar) to constructor java.nio.DirectByteBuffer(long,int)
WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations …Run Code Online (Sandbox Code Playgroud) 我试图找到最有效的方法从GraphFrames函数shortestPaths获取Map输出,并将每个顶点的距离映射平铺为新DataFrame中的各个行.通过将距离列拉入字典然后从那里转换为pandas数据帧然后转换回Spark数据帧,我已经能够非常笨拙地做到这一点,但我知道必须有更好的方法.
from graphframes import *
v = sqlContext.createDataFrame([
("a", "Alice", 34),
("b", "Bob", 36),
("c", "Charlie", 30),
], ["id", "name", "age"])
# Create an Edge DataFrame with "src" and "dst" columns
e = sqlContext.createDataFrame([
("a", "b", "friend"),
("b", "c", "follow"),
("c", "b", "follow"),
], ["src", "dst", "relationship"])
# Create a GraphFrame
g = GraphFrame(v, e)
results = g.shortestPaths(landmarks=["a", "b","c"])
results.select("id","distances").show()
+---+--------------------+
| id| distances|
+---+--------------------+
| a|Map(a -> 0, b -> ...|
| b| Map(b -> 0, c -> 1)| …Run Code Online (Sandbox Code Playgroud) 尝试使用pyspark运行一个简单的GraphFrame示例.
火花版:2.0
graphframe版本:0.2.0
我可以在Jupyter中导入图形框架:
from graphframes import GraphFrame
GraphFrame
graphframes.graphframe.GraphFrame
Run Code Online (Sandbox Code Playgroud)
我尝试创建GraphFrame对象时收到此错误:
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<ipython-input-23-2bf19c66804d> in <module>()
----> 1 gr_links = GraphFrame(df_web_page, df_parent_child_link)
/Users/roopal/software/graphframes-release-0.2.0/python/graphframes/graphframe.pyc in __init__(self, v, e)
60 self._sc = self._sqlContext._sc
61 self._sc._jvm.org.apache.spark.ml.feature.Tokenizer()
---> 62 self._jvm_gf_api = _java_api(self._sc)
63 self._jvm_graph = self._jvm_gf_api.createGraph(v._jdf, e._jdf)
64
/Users/roopal/software/graphframes-release-0.2.0/python/graphframes/graphframe.pyc in _java_api(jsc)
32 def _java_api(jsc):
33 javaClassName = "org.graphframes.GraphFramePythonAPI"
---> 34 return jsc._jvm.Thread.currentThread().getContextClassLoader().loadClass(javaClassName) \
35 .newInstance()
36
/Users/roopal/software/spark-2.0.0-bin-hadoop2.7/python/lib/py4j-0.10.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
931 answer = self.gateway_client.send_command(command)
932 return_value = get_return_value(
--> …Run Code Online (Sandbox Code Playgroud) 以下代码给出了每列中具有三个值的数据帧,如下所示.
import org.graphframes._
import org.apache.spark.sql.DataFrame
val v = sqlContext.createDataFrame(List(
("1", "Al"),
("2", "B"),
("3", "C"),
("4", "D"),
("5", "E")
)).toDF("id", "name")
val e = sqlContext.createDataFrame(List(
("1", "3", 5),
("1", "2", 8),
("2", "3", 6),
("2", "4", 7),
("2", "1", 8),
("3", "1", 5),
("3", "2", 6),
("4", "2", 7),
("4", "5", 8),
("5", "4", 8)
)).toDF("src", "dst", "property")
val g = GraphFrame(v, e)
val paths: DataFrame = g.bfs.fromExpr("id = '1'").toExpr("id = '5'").run()
paths.show()
val df=paths
df.select(df.columns.filter(_.startsWith("e")).map(df(_)) : _*).show
Run Code Online (Sandbox Code Playgroud)
以上代码的OutPut如下:
+-------+-------+-------+ …Run Code Online (Sandbox Code Playgroud) graphframes ×10
apache-spark ×9
pyspark ×7
hadoop ×2
python ×2
scala ×2
spark-graphx ×2
amazon-emr ×1
dataframe ×1
graph ×1
jupyter ×1