如何使用SparkR映射和减少操作?我所能找到的只是有关SQL查询的内容。有没有一种方法可以使用SQL进行映射和归约?
请原谅我的简单问题,但我对Spark/Hadoop相对较新.
我正在尝试将一堆小的CSV文件加载到Apache Spark中.它们目前存储在S3中,但如果能简化,我可以在本地下载它们.我的目标是尽可能高效地完成这项工作.看起来让我的数十名Spark工作人员无所事事地让一些单线程主人下载并解析一堆CSV文件会是一种耻辱.我希望有一种惯用的方式来分发这项工作.
CSV文件排列在一个目录结构中,如下所示:
2014/01-01/fileabcd.csv
2014/01-01/filedefg.csv
...
Run Code Online (Sandbox Code Playgroud)
我有两年的数据,每天都有目录,每个目录里面都有几百个CSV.所有这些CSV应该具有相同的模式,但是当然可能一个CSV出错并且如果存在一些有问题的文件,我讨厌整个作业崩溃.只要我在某个日志中通知我发生了这些文件,就可以跳过这些文件.
似乎我想到的每个Spark项目都采用相同的形式,我不知道如何解决它.(例如,尝试读取一组制表符分隔的天气数据,或者阅读一堆日志文件来查看这些数据.)
我已经尝试过SparkR和Scala库.我真的不在乎我需要使用哪种语言; 我对使用正确的习语/工具更感兴趣.
我原来的想法是枚举和parallelize所有year/mm-dd组合的列表,以便我可以让我的Spark工作人员每天独立处理(下载并解析所有CSV文件,然后将它们堆叠在彼此之上(unionAll())以减少它们).遗憾的是,使用spark-csv库下载和解析CSV文件只能在"父"/主作业中完成,而不能在每个子项中完成,因为Spark不允许作业嵌套.因此,只要我想使用Spark库进行导入/解析,这将无法工作.
当然,您可以使用语言的本机CSV解析来读取每个文件,然后将它们"上传"到Spark.在R中,这是一些包的组合,用于从S3获取文件,然后用a read.csv完成,createDataFrame()以获取数据到Spark.不幸的是,这真的很慢,而且似乎也是我希望Spark工作的方式.如果我的所有数据都是通过R管道进入Spark之前,为什么还要费心?
我开始研究这些量身定制的工具,很快就不堪重负.我的理解是,可以使用许多/所有这些工具将我的CSV文件从S3转换为HDFS.
当然,从HDFS读取我的CSV文件比S3更快,因此解决了部分问题.但我仍然需要解析成千上万的CSV,并且不知道在Spark中使用分布式方法.
我是Spark的新手,想知道下面有哪些选项可以使用SparkR从RStudio读取存储在hdfs中的数据,或者我是否正确使用它们.数据可以是任何类型(纯文本,csv,json,xml或任何包含关系表的数据库)和任何大小(1kb - 几gb).
我知道不应该再使用textFile(sc,path)了,除了read.df函数之外还有其他可能读取这类数据吗?
以下代码使用read.df和jsonFile但jsonFile会产生错误:
Sys.setenv(SPARK_HOME = "C:\\Users\\--\\Downloads\\spark-1.5.0-bin-hadoop2.6")
.libPaths(c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib"), .libPaths()))
#load the Sparkr library
library(SparkR)
# Create a spark context and a SQL context
sc <- sparkR.init(master="local", sparkPackages="com.databricks:spark-csv_2.11:1.0.3")
sqlContext <- sparkRSQL.init(sc)
#create a sparkR DataFrame
df <- read.df(sqlContext, "hdfs://0.0.0.0:19000/people.json", source = "json")
df <- jsonFile(sqlContext, "hdfs://0.0.0.0:19000/people.json")
Run Code Online (Sandbox Code Playgroud)
read.df适用于json,但是如何读取仅由新行分隔的日志消息等文本?例如
> df <- read.df(sqlContext, "hdfs://0.0.0.0:19000/README.txt", "text")
Error in invokeJava(isStatic = TRUE, className, methodName, ...) :
java.lang.ClassNotFoundException: Failed to load class for data source: text.
at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.lookupDataSource(ResolvedDataSource.scala:67)
at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:87)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:114)
at …Run Code Online (Sandbox Code Playgroud) 如何在RStudio上将csv文件加载到SparkR?以下是我在RStudio上运行SparkR时必须执行的步骤.我用read.df读取.csv不知道怎么写这个.不确定此步骤是否被视为创建RDD.
#Set sys environment variables
Sys.setenv(SPARK_HOME = "C:/Users/Desktop/spark/spark-1.4.1-bin-hadoop2.6")
.libPaths(c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib"), .libPaths()))
#Sys.setenv('SPARKR_SUBMIT_ARGS'='"--packages" "com.databricks:spark-csv_2.10:1.0.3" "sparkr-shell"')
#Load libraries
library(SparkR)
library(magrittr)
sc <- sparkR.init(master="local")
sc <- sparkR.init()
sc <- sparkR.init(sparkPackages="com.databricks:spark-csv_2.11:1.0.3")
sqlContext <- sparkRSQL.init(sc)
data <- read.df(sqlContext, "C:/Users/Desktop/DataSets/hello_world.csv", "com.databricks.spark.csv", header="true")
Run Code Online (Sandbox Code Playgroud)
我收到错误:
Error in writeJobj(con, object) : invalid jobj 1
Run Code Online (Sandbox Code Playgroud) 我对SparkR. 当我跑的时候sparkR,出了点问题。
sc <- sparkR.init(master="local")
Run Code Online (Sandbox Code Playgroud)
像这样的错误:
Error in socketConnection(port = monitorPort) :
cannot open the connection
In addition: Warning message:
In socketConnection(port = monitorPort) : localhost:32811 cannot be opened
Run Code Online (Sandbox Code Playgroud)
有人告诉我这是本地主机问题,但是当我将本地主机更改为本地 IP 地址时它不起作用。本地主机设置真的有问题吗?
将 DataFrame 对象放入本地内存时,as.data.frame()和之间有什么区别collect()?
No status is returned. Java SparkR backend might have failed.使用Spark安装glm时,我遇到了错误.该作业实际上似乎基于Spark web ui运行完成,但在模型拟合期间(它似乎不是一致的位置),SparkR返回上述错误消息然后返回到R REPL.我没有看到任何我可以参考的日志来识别问题.问题回答者会指出我的日志,还是提供有关此问题的其他反馈?
我可以看到错误生成代码在这里.它看起来好像是由get(".sparkRCon", .sparkREnv)just 指定的连接不存在或者在计算过程中对空字符串做出虚假响应?我不知所措.
我2.0.0使用Amazon EMR 在Spark上5.0.
语境
我正在使用 rstudio 和 sparkR 包开发一个 azure HDI R 服务器集群。我正在读取文件,修改它,然后我想用 write.df 写它,但问题是当我写文件时,我的列名消失了。
我的代码如下:
write.df(spdf,"///Output/File","csv","overwrite",header=T)
Run Code Online (Sandbox Code Playgroud)
这是我想以 csv 格式写入的文件
Num,Letter
5.5,a
9.,b
5.5,c
9,d
5.5,e
9,f
5.5,g
9,h
5.5,i
9,j
Run Code Online (Sandbox Code Playgroud)
这是我得到的文件:
0,1
5.5,a
9.,b
5.5,c
9,d
5.5,e
9,f
5.5,g
9,h
5.5,i
9,j
Run Code Online (Sandbox Code Playgroud) 我试图在每个时间步读取数据并将其写入文件。
\n\n为此,我使用该包h5来存储大型数据集,但我发现使用该包的功能的代码运行缓慢。我正在处理非常大的数据集。所以,我有内存限制问题。这是一个可重现的示例:
library(ff)\nlibrary(h5)\nset.seed(12345)\nfor(t in 1:3650){\n\n print(t)\n\n ## Initialize the matrix to fill\n mat_to_fill <- ff(-999, dim=c(7200000, 48), dimnames=list(NULL, paste0("P", as.character(seq(1, 48, 1)))), vmode="double", overwrite = T) \n ## print(mat_to_fill)\n ## summary(mat_to_fill[,])\n\n ## Create the output file\n f_t <- h5file(paste0("file",t,".h5"))\n\n ## Retrieve the matrix at t - 1 if t > 1\n if(t > 1){\n f_t_1 <- h5file(paste0("file", t - 1,".h5"))\n mat_t_1 <- f_t_1["testmat"][] ## *********** ##\n ## f_t_1["testmat"][]\n\n } else {\n\n mat_t_1 <- 0\n\n }\n\n ## …Run Code Online (Sandbox Code Playgroud) 我在 SparkRDataframe 上运行 gapply 函数,如下所示
df<-gapply(sp_Stack, function(key,e) {
Sys.setlocale('LC_COLLATE','C')
suppressPackageStartupMessages({
library(Rcpp)
library(Matrix)
library(reshape)
require(parallel)
require(lubridate)
library(plyr)
library(reticulate)
library(stringr)
library(data.table)
})
calcDecsOnly(e,RequestNumber=RequestNumber,
...)
},cols="udim",schema=schema3)
Run Code Online (Sandbox Code Playgroud)
如果我们设置 spark.sql.execution.arrow.sparkr.enabled = "false" 上面的代码运行没有任何错误,但如果我设置 spark.sql.execution.arrow.sparkr.enabled = "true" 火花作业失败低于错误
Caused by: java.io.EOFException
at java.io.DataInputStream.readInt(DataInputStream.java:392)
at org.apache.spark.sql.execution.r.ArrowRRunner$$anon$2.read(ArrowRRunner.scala:154)
Run Code Online (Sandbox Code Playgroud)
环境:Google Cloud Dataproc Spark 版本:3.1.1 Dataproc 版本:基于 2.0.9-debian10 构建的自定义映像
感谢这里的任何帮助,提前致谢
sparkr ×10
apache-spark ×6
r ×6
amazon-s3 ×1
apache-arrow ×1
azure ×1
csv ×1
glm ×1
hadoop ×1
performance ×1