小编Ale*_*ner的帖子

使用 apache 箭头读取一个 R 数据框中的分区镶木地板目录(所有文件)

如何使用箭头将分区的镶木地板文件读入 R(没有任何火花)

情况

  1. 使用 Spark 管道创建镶木地板文件并保存在 S3 上
  2. 使用 RStudio/RShiny 以一列作为索引读取以进行进一步分析

Parquet 文件结构

从我的 Spark 创建的镶木地板文件由几个部分组成

tree component_mapping.parquet/
component_mapping.parquet/
??? _SUCCESS
??? part-00000-e30f9734-71b8-4367-99c4-65096143cc17-c000.snappy.parquet
??? part-00001-e30f9734-71b8-4367-99c4-65096143cc17-c000.snappy.parquet
??? part-00002-e30f9734-71b8-4367-99c4-65096143cc17-c000.snappy.parquet
??? part-00003-e30f9734-71b8-4367-99c4-65096143cc17-c000.snappy.parquet
??? part-00004-e30f9734-71b8-4367-99c4-65096143cc17-c000.snappy.parquet
??? etc
Run Code Online (Sandbox Code Playgroud)

我如何将此 component_mapping.parquet 读入 R?

我试过的

install.packages("arrow")
library(arrow)
my_df<-read_parquet("component_mapping.parquet")
Run Code Online (Sandbox Code Playgroud)

但这因错误而失败

IOError: Cannot open for reading: path 'component_mapping.parquet' is a directory
Run Code Online (Sandbox Code Playgroud)

如果我只读取目录的一个文件,它就可以工作

install.packages("arrow")
library(arrow)
my_df<-read_parquet("component_mapping.parquet/part-00000-e30f9734-71b8-4367-99c4-65096143cc17-c000.snappy.parquet")
Run Code Online (Sandbox Code Playgroud)

但我需要加载所有内容才能对其进行查询

我在文档中发现的

在 apache 箭头文档 https://arrow.apache.org/docs/r/reference/read_parquet.htmlhttps://arrow.apache.org/docs/r/reference/ParquetReaderProperties.html 我发现有一些区域read_parquet() 命令的属性,但我无法让它工作,也找不到任何示例。

read_parquet(file, col_select = NULL, as_data_frame = TRUE, props = ParquetReaderProperties$create(), ...)
Run Code Online (Sandbox Code Playgroud)

如何正确设置属性以读取完整目录?

# should …
Run Code Online (Sandbox Code Playgroud)

r rstudio parquet apache-arrow

10
推荐指数
4
解决办法
3444
查看次数

如何使用 pyspark graphframe pregel API 实现循环检测

我正在尝试使用 Pyspark 和 graphframes 的 pregel 包装器来实现 Rocha & Thatte 的算法(http://cdsid.org.br/sbpo2015/wp-content/uploads/2015/08/142825.pdf )。在这里,我遇到了消息聚合的正确语法问题。

这个想法是直截了当的:

...在每次传递中,G 的每个活动顶点都会向其外围邻居发送一组顶点序列,如下所述。在第一遍中,每个顶点 v 向其所有邻居发送消息 (v)。在后续迭代中,每个活动顶点 v 将 v 附加到它在上一次迭代中接收到的每个序列。然后它将所有更新的序列发送到其外围邻居。如果 v 在上一次迭代中没有收到任何消息,则 v 将自行停用。当所有顶点都已停用时,算法终止。...

我的想法是将顶点 id 发送到目标顶点(dst),并在聚合函数中将它们收集到一个列表中。然后,在我的顶点列“序列”中,我想将这个新列表项与现有列表项追加/合并,然后使用 when 语句检查当前顶点 id 是否已在序列中。然后我可以根据顶点列将顶点设置为 true 以将它们标记为循环。但我在 Spark 中找不到关于如何连接它的正确语法。有人有想法吗?或者实施类似的东西?

我当前的代码

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
import pyspark.sql.functions as f
from pyspark.sql.functions import coalesce, col, lit, sum, when

from graphframes import GraphFrame
from graphframes.lib import *



SimpleCycle=[
    ("1","2"),
    ("2","3"),
    ("3","4"),
    ("4","5"),
    ("5","2"),
    ("5","6") …
Run Code Online (Sandbox Code Playgroud)

graph-theory spark-graphx pyspark graphframes pregel

3
推荐指数
1
解决办法
2932
查看次数