标签: graphframes

如何使用java创建一个简单的spark graphframe?

基本上我是一个java开发人员,现在我有机会参与Spark,我经历了Spark api的基础知识,比如SparkConfig,SparkContaxt,RDD,SQLContaxt,DataFrame,DataSet然后我能够使用它来执行一些简单的简单转换RDD,SQL ....但是当我尝试使用java训练一些示例graphframe应用程序时,我可以成功并且我经历了很多youtube教程,论坛和stackoverflow线程但没有我没有找到任何直接建议当我尝试为GraphFrame类创建一个对象时,我实际上遇到了这个问题,我也下载了接收jar(graphframes-0.2.0-spark2.0-s_2.11.jar)但现在仍然面临问题我想放我的分析直到我到达的地方由于Spark的新东西我无法进一步移动所以如果有人帮助我它对所有人都非常有帮助.提前致谢.我面临的例外是构造函数GraphFrame(DataFrame,DataFrame)未定义

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.storage.StorageLevel;
import org.graphframes.GraphFrame;

import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.databind.JsonMappingException;

public class SparkJavaGraphFrameOne {

    public static void main(String[] args) throws JsonParseException, JsonMappingException, IOException{

        SparkConf conf = new SparkConf().setAppName("test").setMaster("local");

        JavaSparkContext sc = new JavaSparkContext(conf);
        SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);

        JavaRDD<Row> verRow = sc.parallelize(Arrays.asList(RowFactory.create(1,"A"),RowFactory.create(2,"B")));
        JavaRDD<Row> edgRow = sc.parallelize(Arrays.asList(RowFactory.create(1,2,"Edge")));     

        List<StructField> verFields …
Run Code Online (Sandbox Code Playgroud)

java apache-spark graphframes

3
推荐指数
1
解决办法
2483
查看次数

Apache-Spark图形框架中的SBT

我有以下SBT文件,我正在使用Apache GraphFrame编译Scala代码并读取CSV文件.

name := "Simple"

version := "1.0"

scalaVersion := "2.10.5"

libraryDependencies ++= Seq(

"org.apache.spark" %% "spark-core" % "1.6.1",

"graphframes" % "graphframes" % "0.2.0-spark1.6-s_2.10",

"org.apache.spark" %% "spark-sql" % "1.0.0",

"com.databricks" % "spark-csv" % "1.0.3"
)
Run Code Online (Sandbox Code Playgroud)

这是我在斯卡拉的代码

import org.graphframes._
import org.apache.spark.sql.DataFrame
    val nodesList = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load("/Users/Desktop/GraphFrame/NodesList.csv")
    val edgesList= sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load("/Users/Desktop/GraphFrame/EdgesList.csv")
    val v=nodesList.toDF("id", "name")
    val e=edgesList.toDF("src", "dst", "dist")
    val g = GraphFrame(v, e)
Run Code Online (Sandbox Code Playgroud)

当我尝试使用SBT制作Jar文件时,它在编译期间给出了以下错误

[trace] Stack trace suppressed: run last *:update for the full output.
[error] (*:update) sbt.ResolveException: unresolved dependency: …
Run Code Online (Sandbox Code Playgroud)

scala sbt apache-spark graphframes

3
推荐指数
2
解决办法
1663
查看次数

PYSPARK:如何可视化 GraphFrame?

假设我创建了下图。我的问题是如何可视化它?

 # Create a Vertex DataFrame with unique ID column "id"
    v = sqlContext.createDataFrame([
      ("a", "Alice", 34),
      ("b", "Bob", 36),
      ("c", "Charlie", 30),
    ], ["id", "name", "age"])
    # Create an Edge DataFrame with "src" and "dst" columns
    e = sqlContext.createDataFrame([
      ("a", "b", "friend"),
      ("b", "c", "follow"),
      ("c", "b", "follow"),
    ], ["src", "dst", "relationship"])
    # Create a GraphFrame
    from graphframes import *
    g = GraphFrame(v, e)
Run Code Online (Sandbox Code Playgroud)

python graph apache-spark pyspark graphframes

3
推荐指数
1
解决办法
1万
查看次数

如何仅从 Edge DataFrame 制作 GraphFrame

由此看来 “GraphFrame 也可以从包含边信息的单个 DataFrame 构建。顶点将从边的源和目的地推断出来。”

然而,当我查看其API 文档时,似乎无法创建一个。

有人尝试仅使用边缘 DataFrame 创建 GraphFrame 吗?如何?

apache-spark apache-spark-sql pyspark graphframes

3
推荐指数
2
解决办法
1710
查看次数

如何使用 pyspark graphframe pregel API 实现循环检测

我正在尝试使用 Pyspark 和 graphframes 的 pregel 包装器来实现 Rocha & Thatte 的算法(http://cdsid.org.br/sbpo2015/wp-content/uploads/2015/08/142825.pdf )。在这里,我遇到了消息聚合的正确语法问题。

这个想法是直截了当的:

...在每次传递中,G 的每个活动顶点都会向其外围邻居发送一组顶点序列,如下所述。在第一遍中,每个顶点 v 向其所有邻居发送消息 (v)。在后续迭代中,每个活动顶点 v 将 v 附加到它在上一次迭代中接收到的每个序列。然后它将所有更新的序列发送到其外围邻居。如果 v 在上一次迭代中没有收到任何消息,则 v 将自行停用。当所有顶点都已停用时,算法终止。...

我的想法是将顶点 id 发送到目标顶点(dst),并在聚合函数中将它们收集到一个列表中。然后,在我的顶点列“序列”中,我想将这个新列表项与现有列表项追加/合并,然后使用 when 语句检查当前顶点 id 是否已在序列中。然后我可以根据顶点列将顶点设置为 true 以将它们标记为循环。但我在 Spark 中找不到关于如何连接它的正确语法。有人有想法吗?或者实施类似的东西?

我当前的代码

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
import pyspark.sql.functions as f
from pyspark.sql.functions import coalesce, col, lit, sum, when

from graphframes import GraphFrame
from graphframes.lib import *



SimpleCycle=[
    ("1","2"),
    ("2","3"),
    ("3","4"),
    ("4","5"),
    ("5","2"),
    ("5","6") …
Run Code Online (Sandbox Code Playgroud)

graph-theory spark-graphx pyspark graphframes pregel

3
推荐指数
1
解决办法
2932
查看次数

如何从 Pyspark 中的火花数据框创建边列表?

graphframes在 pyspark 中使用某种图形类型的分析,想知道从顶点数据框创建边列表数据框的最佳方法是什么。

例如,下面是我的顶点数据框。我有一个 ID 列表,它们属于不同的组。

+---+-----+
|id |group|
+---+-----+
|a  |1    |
|b  |2    |
|c  |1    |
|d  |2    |
|e  |3    |
|a  |3    |
|f  |1    |
+---+-----+
Run Code Online (Sandbox Code Playgroud)

我的目标是创建一个边缘列表数据框来指示出现在公共组中的 id。请注意,1 个 id 可能出现在多个组中(例如,上面的 id a 在组 1 和 3 中)。以下是我想获得的边缘列表数据框:

+---+-----+-----+
|src|dst  |group|
+---+-----+-----+
|a  |c    |1    |
|a  |f    |1    |
|c  |f    |1    |
|b  |d    |2    |
|a  |e    |3    |
+---+-----+-----+
Run Code Online (Sandbox Code Playgroud)

提前致谢!

python apache-spark apache-spark-sql pyspark graphframes

2
推荐指数
1
解决办法
342
查看次数

在 PyCharm 中使用图形框架

我花了将近 2 天的时间在互联网上滚动,但我无法解决这个问题。我正在尝试安装graphframes 包(版本:0.2.0-spark2.0-s_2.11)以通过 PyCharm 运行 spark,但是,尽管我尽了最大努力,但这是不可能的。

我几乎尝试了所有方法。请知道,在发布答案之前,我也在这里检查了这个网站。

这是我试图运行的代码:

# IMPORT OTHER LIBS --------------------------------------------------------
import os
import sys
import pandas as pd

# IMPORT SPARK ------------------------------------------------------------------------------------#
# Path to Spark source folder
USER_FILE_PATH = "/Users/<username>"
SPARK_PATH = "/PycharmProjects/GenesAssociation"
SPARK_FILE = "/spark-2.0.0-bin-hadoop2.7"
SPARK_HOME = USER_FILE_PATH + SPARK_PATH + SPARK_FILE
os.environ['SPARK_HOME'] = SPARK_HOME

# Append pySpark to Python Path
sys.path.append(SPARK_HOME + "/python")
sys.path.append(SPARK_HOME + "/python" + "/lib/py4j-0.10.1-src.zip")

try:
    from pyspark import SparkContext
    from pyspark import SparkConf
    from pyspark.sql …
Run Code Online (Sandbox Code Playgroud)

python installation pycharm pyspark graphframes

1
推荐指数
1
解决办法
2576
查看次数

如何在 Spark/Scala 中将 Array[String] 转换为 Array[Any]

我正在尝试为 Graphframes 内的 parallelPersonalizedPageRank 算法生成 sourceIds 并调用该算法,如下所示:

val PPRIdCS = studentCS.select("id").collect.map(row => row.getString(0))
val ranksCS = studentGraph
  .parallelPersonalizedPageRank
  .resetProbability(0.15)
  .maxIter(10)
  .sourceIds(PPRIdCS)
  .run()
Run Code Online (Sandbox Code Playgroud)

我得到的错误信息如下:

Message: <console>:46: error: type mismatch;
found   : Array[String]
required: Array[Any]
Note: String <: Any, but class Array is invariant in type T.
You may wish to investigate a wildcard type such as `_ <: Any`. (SLS 
3.2.10).sourceIds(PPRIdCS)
Run Code Online (Sandbox Code Playgroud)

我无法弄清楚将 String 类型转换为 Any 类型的方法是什么,或者在生成 PPRIdCS 时将 String 映射到 Any 的方法。谢谢!

arrays scala graph apache-spark graphframes

1
推荐指数
1
解决办法
4967
查看次数