标签: sparkr

如何在SparkR中进行映射和归约

如何使用SparkR映射和减少操作?我所能找到的只是有关SQL查询的内容。有没有一种方法可以使用SQL进行映射和归约?

apache-spark sparkr

5
推荐指数
1
解决办法
1672
查看次数

在Spark中有效地聚合许多CSV

请原谅我的简单问题,但我对Spark/Hadoop相对较新.

我正在尝试将一堆小的CSV文件加载到Apache Spark中.它们目前存储在S3中,但如果能简化,我可以在本地下载它们.我的目标是尽可能高效地完成这项工作.看起来让我的数十名Spark工作人员无所事事地让一些单线程主人下载并解析一堆CSV文件会是一种耻辱.我希望有一种惯用的方式来分发这项工作.

CSV文件排列在一个目录结构中,如下所示:

2014/01-01/fileabcd.csv
2014/01-01/filedefg.csv
...
Run Code Online (Sandbox Code Playgroud)

我有两年的数据,每天都有目录,每个目录里面都有几百个CSV.所有这些CSV应该具有相同的模式,但是当然可能一个CSV出错并且如果存在一些有问题的文件,我讨厌整个作业崩溃.只要我在某个日志中通知我发生了这些文件,就可以跳过这些文件.

似乎我想到的每个Spark项目都采用相同的形式,我不知道如何解决它.(例如,尝试读取一组制表符分隔的天气数据,或者阅读一堆日志文件来查看这些数据.)

我试过的

我已经尝试过SparkR和Scala库.我真的不在乎我需要使用哪种语言; 我对使用正确的习语/工具更感兴趣.

纯斯卡拉

我原来的想法是枚举和parallelize所有year/mm-dd组合的列表,以便我可以让我的Spark工作人员每天独立处理(下载并解析所有CSV文件,然后将它们堆叠在彼此之上(unionAll())以减少它们).遗憾的是,使用spark-csv库下载和解析CSV文件只能在"父"/主作业中完成,而不能在每个子项中完成,因为Spark不允许作业嵌套.因此,只要我想使用Spark库进行导入/解析,这将无法工作.

混合语言

当然,您可以使用语言的本机CSV解析来读取每个文件,然后将它们"上传"到Spark.在R中,这是一些包的组合,用于从S3获取文件,然后用a read.csv完成,createDataFrame()以获取数据到Spark.不幸的是,这真的很慢,而且似乎也是我希望Spark工作的方式.如果我的所有数据都是通过R管道进入Spark之前,为什么还要费心?

Hive/Sqoop/Phoenix/Pig/Flume/Flume Ng/s3distcp

我开始研究这些量身定制的工具,很快就不堪重负.我的理解是,可以使用许多/所有这些工具将我的CSV文件从S3转换为HDFS.

当然,从HDFS读取我的CSV文件比S3更快,因此解决了部分问题.但我仍然需要解析成千上万的CSV,并且不知道在Spark中使用分布式方法.

csv amazon-s3 apache-spark sparkr

5
推荐指数
1
解决办法
865
查看次数

使用SparkR 1.5从RStudio中的hdfs读取大文件(纯文本,xml,json,csv)的选项

我是Spark的新手,想知道下面有哪些选项可以使用SparkR从RStudio读取存储在hdfs中的数据,或者我是否正确使用它们.数据可以是任何类型(纯文本,csv,json,xml或任何包含关系表的数据库)和任何大小(1kb - 几gb).

我知道不应该再使用textFile(sc,path)了,除了read.df函数之外还有其他可能读取这类数据吗?

以下代码使用read.df和jsonFile但jsonFile会产生错误:

Sys.setenv(SPARK_HOME = "C:\\Users\\--\\Downloads\\spark-1.5.0-bin-hadoop2.6")
.libPaths(c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib"), .libPaths()))
#load the Sparkr library
library(SparkR)

# Create a spark context and a SQL context
sc <- sparkR.init(master="local", sparkPackages="com.databricks:spark-csv_2.11:1.0.3")
sqlContext <- sparkRSQL.init(sc)

#create a sparkR DataFrame
df <- read.df(sqlContext, "hdfs://0.0.0.0:19000/people.json", source = "json")
df <- jsonFile(sqlContext, "hdfs://0.0.0.0:19000/people.json")
Run Code Online (Sandbox Code Playgroud)

read.df适用于json,但是如何读取仅由新行分隔的日志消息等文本?例如

> df <- read.df(sqlContext, "hdfs://0.0.0.0:19000/README.txt", "text")
     Error in invokeJava(isStatic = TRUE, className, methodName, ...) : 
  java.lang.ClassNotFoundException: Failed to load class for data source: text.
    at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.lookupDataSource(ResolvedDataSource.scala:67)
    at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:87)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:114)
    at …
Run Code Online (Sandbox Code Playgroud)

r sparkr apache-spark-1.5

5
推荐指数
1
解决办法
1672
查看次数

如何在RStudio上将csv文件加载到SparkR?

如何在RStudio上将csv文件加载到SparkR?以下是我在RStudio上运行SparkR时必须执行的步骤.我用read.df读取.csv不知道怎么写这个.不确定此步骤是否被视为创建RDD.

#Set sys environment variables
Sys.setenv(SPARK_HOME = "C:/Users/Desktop/spark/spark-1.4.1-bin-hadoop2.6")
.libPaths(c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib"), .libPaths()))

#Sys.setenv('SPARKR_SUBMIT_ARGS'='"--packages" "com.databricks:spark-csv_2.10:1.0.3" "sparkr-shell"')

#Load libraries
library(SparkR)
library(magrittr)

sc <- sparkR.init(master="local")
sc <- sparkR.init()
sc <- sparkR.init(sparkPackages="com.databricks:spark-csv_2.11:1.0.3")
sqlContext <- sparkRSQL.init(sc)

data <- read.df(sqlContext, "C:/Users/Desktop/DataSets/hello_world.csv", "com.databricks.spark.csv", header="true")
Run Code Online (Sandbox Code Playgroud)

我收到错误:

Error in writeJobj(con, object) : invalid jobj 1
Run Code Online (Sandbox Code Playgroud)

r apache-spark apache-spark-sql sparkr

5
推荐指数
1
解决办法
1611
查看次数

sparkR 中的 socketConnection 错误

我对SparkR. 当我跑的时候sparkR,出了点问题。

sc <- sparkR.init(master="local")
Run Code Online (Sandbox Code Playgroud)

像这样的错误:

Error in socketConnection(port = monitorPort) : 
  cannot open the connection
In addition: Warning message:
In socketConnection(port = monitorPort) : localhost:32811 cannot be opened
Run Code Online (Sandbox Code Playgroud)

有人告诉我这是本地主机问题,但是当我将本地主机更改为本地 IP 地址时它不起作用。本地主机设置真的有问题吗?

r sparkr

5
推荐指数
1
解决办法
562
查看次数

sparkR中collect和as.data.frame的区别

将 DataFrame 对象放入本地内存时,as.data.frame()和之间有什么区别collect()

r apache-spark sparkr

5
推荐指数
1
解决办法
571
查看次数

如何调试/获取SparkR Java后端故障的日志?

No status is returned. Java SparkR backend might have failed.使用Spark安装glm时,我遇到了错误.该作业实际上似乎基于Spark web ui运行完成,但在模型拟合期间(它似乎不是一致的位置),SparkR返回上述错误消息然后返回到R REPL.我没有看到任何我可以参考的日志来识别问题.问题回答者会指出我的日志,还是提供有关此问题的其他反馈?

我可以看到错误生成代码在这里.它看起来好像是由get(".sparkRCon", .sparkREnv)just 指定的连接不存在或者在计算过程中对空字符串做出虚假响应?我不知所措.

2.0.0使用Amazon EMR 在Spark上5.0.

glm apache-spark sparkr

5
推荐指数
1
解决办法
1329
查看次数

使用 sparkR write.df 写入 sparkdataframe 时丢失列名

语境

我正在使用 rstudio 和 sparkR 包开发一个 azure HDI R 服务器集群。我正在读取文件,修改它,然后我想用 write.df 写它,但问题是当我写文件时,我的列名消失了。

我的代码如下:

write.df(spdf,"///Output/File","csv","overwrite",header=T)
Run Code Online (Sandbox Code Playgroud)

这是我想以 csv 格式写入的文件

Num,Letter
5.5,a
9.,b
5.5,c
9,d
5.5,e
9,f
5.5,g
9,h
5.5,i
9,j
Run Code Online (Sandbox Code Playgroud)

这是我得到的文件:

    0,1
    5.5,a
    9.,b
    5.5,c
    9,d
    5.5,e
    9,f
    5.5,g
    9,h
    5.5,i
    9,j
Run Code Online (Sandbox Code Playgroud)

hadoop r azure sparkr spark-dataframe

5
推荐指数
0
解决办法
369
查看次数

使用 R 通过循环读取数据并将数据写入文件的有效方法

我试图在每个时间步读取数据并将其写入文件。

\n\n

为此,我使用该包h5来存储大型数据集,但我发现使用该包的功能的代码运行缓慢。我正在处理非常大的数据集。所以,我有内存限制问题。这是一个可重现的示例:

\n\n
library(ff)\nlibrary(h5)\nset.seed(12345)\nfor(t in 1:3650){\n\n  print(t)\n\n  ## Initialize the matrix to fill\n  mat_to_fill <- ff(-999, dim=c(7200000, 48), dimnames=list(NULL, paste0("P", as.character(seq(1, 48, 1)))), vmode="double", overwrite = T) \n  ## print(mat_to_fill)\n  ## summary(mat_to_fill[,])\n\n  ## Create the output file\n  f_t <- h5file(paste0("file",t,".h5"))\n\n  ## Retrieve the matrix at t - 1 if t > 1\n  if(t > 1){\n    f_t_1 <- h5file(paste0("file", t - 1,".h5"))\n    mat_t_1 <- f_t_1["testmat"][] ## *********** ##\n    ## f_t_1["testmat"][]\n\n  } else {\n\n    mat_t_1 <- 0\n\n  }\n\n  ## …
Run Code Online (Sandbox Code Playgroud)

performance r sparkr

5
推荐指数
1
解决办法
864
查看次数

如果启用 Apache Arrow,SparkR 代码将失败

我在 SparkRDataframe 上运行 gapply 函数,如下所示

 df<-gapply(sp_Stack, function(key,e) {
    
      Sys.setlocale('LC_COLLATE','C')
   
      suppressPackageStartupMessages({ 
      library(Rcpp)
      library(Matrix)
      library(reshape)
      require(parallel)
      require(lubridate)
      library(plyr)
      library(reticulate)
      library(stringr)
      
      library(data.table)
   })
      calcDecsOnly(e,RequestNumber=RequestNumber,
                   ...)
    },cols="udim",schema=schema3)
Run Code Online (Sandbox Code Playgroud)

如果我们设置 spark.sql.execution.arrow.sparkr.enabled = "false" 上面的代码运行没有任何错误,但如果我设置 spark.sql.execution.arrow.sparkr.enabled = "true" 火花作业失败低于错误

Caused by: java.io.EOFException
    at java.io.DataInputStream.readInt(DataInputStream.java:392)
    at org.apache.spark.sql.execution.r.ArrowRRunner$$anon$2.read(ArrowRRunner.scala:154)
Run Code Online (Sandbox Code Playgroud)

环境:Google Cloud Dataproc Spark 版本:3.1.1 Dataproc 版本:基于 2.0.9-debian10 构建的自定义映像

感谢这里的任何帮助,提前致谢

apache-spark sparkr google-cloud-dataproc apache-arrow

5
推荐指数
0
解决办法
134
查看次数