我知道有很多关于Spark的内存错误的问题,但我还没有找到解决方案.
我有一个简单的工作流程:
filter 下到一小部分行 select 一小部分列collect进入驱动程序节点(所以我可以做其他操作R)当我运行上面然后cache表格来激发内存它需要<2GB - 与我的集群可用的内存相比很小 - 然后当我尝试collect将数据发送到我的驱动程序节点时,我收到OOM错误.
我试过运行以下设置:
对于这些我都玩过的多种配置executor.memory,driver.memory以及driver.maxResultSize覆盖全系列我的可用内存中的可能值,但始终我结束了一个内存不足的错误的collect阶段; 或者
java.lang.OutOfMemoryError: Java heap space,
java.lang.OutOfMemoryError : GC overhead limit exceeded或者
Error in invoke_method.spark_shell_connection(spark_connection(jobj), :
No status is returned.(sparklyr表示存储器问题的错误).
根据我对Spark的[有限]理解,在收集之前缓存一个表应强制进行所有计算 - 即如果表在缓存<2GB后快乐地坐在内存中,那么我不需要超过2GB的内存来收集它进入驱动程序节点.
请注意,这个问题的答案有一些我尚未尝试的建议,但这些可能会影响性能(例如序列化RDD),所以如果可能的话,我们希望避免使用.
我的问题:
谢谢
编辑:请注意以下@ Shaido的评论,cache通过Sparklyr 调用"通过执行count(*)表格来强制数据加载到内存中"[来自Sparklyr文档] - …
我试图改变火花写临时文件的位置.我在网上找到的所有内容都说通过SPARK_LOCAL_DIRS在spark-env.sh文件中设置参数来设置它,但我没有任何运气实际生效的更改.
这就是我所做的:
sparklyr包作为前端.使用自动缩放组旋转工作节点./tmp/jaytest.每个工人中都有一个,主人中有一个.home/ubuntu/spark-2.2.0-bin-hadoop2.7/conf/spark-env.sh并修改文件以包含此行:SPARK_LOCAL_DIRS="/tmp/jaytest"每个spark-env.sh文件的权限是-rwxr-xr-x,并且对于jaytest文件夹是drwxrwxr-x.
据我所知,这符合我在网上阅读的所有建议.但是,当我将一些数据加载到集群中时,它仍然会结束/tmp,而不是/tmp/jaytest.
我也尝试将spark.local.dir参数设置为同一目录,但也没有运气.
有人可以告诉我这里可能缺少什么吗?
编辑:我将其作为独立群集运行(因为下面的答案表明要设置的正确参数取决于群集类型).
我有一个数据集df,我想删除变量y没有值的所有行a.变量y还包含一些NAs:
df <- data.frame(x=1:3, y=c('a', NA, 'c'))
Run Code Online (Sandbox Code Playgroud)
我可以使用R的索引语法实现这一点,如下所示:
df[df$y!='a',]
x y
2 <NA>
3 c
Run Code Online (Sandbox Code Playgroud)
注意这会返回NA值和值c- 这就是我想要的.
然而,当我使用同样的尝试subset或者dplyr::filter,将NA被剥离出来:
subset(df, y!='a')
x y
3 c
dplyr::filter(df, y!='a')
x y
3 c
Run Code Online (Sandbox Code Playgroud)
为什么subset而dplyr::filter这样的工作吗?这对我来说似乎不合逻辑 - 这NA是不一样的a,所以为什么要删除NA当我指定我想要除变量y等于的那些行之外的所有行a?
除了明确要求NAs返回之外,还有一些方法可以改变这些函数的行为,即
subset(df, y!='a' | is.na(y))
Run Code Online (Sandbox Code Playgroud)
谢谢
我试图用来spark_apply在Spark表上运行下面的R函数.如果我的输入表很小(例如5,000行),这可以正常工作,但是当表格适度大(例如5,000,000行)时,约30分钟后会抛出错误:
sparklyr worker rscript failure, check worker logs for details
查看Spark UI显示,只创建了一个任务,并且只有一个执行程序应用于此任务.
任何人都可以就500万行数据集失败的原因提出建议吗? 问题可能是单个执行者正在做所有的工作,而失败了吗?
# Create data and copy to Spark
testdf <- data.frame(string_id=rep(letters[1:5], times=1000), # 5000 row table
string_categories=rep(c("", "1", "2 3", "4 5 6", "7"), times=1000))
testtbl <- sdf_copy_to(sc, testdf, overwrite=TRUE, repartition=100L, memory=TRUE)
# Write function to return dataframe with strings split out
myFunction <- function(inputdf){
inputdf$string_categories <- as.character(inputdf$string_categories)
inputdf$string_categories=with(inputdf, ifelse(string_categories=="", "blank", string_categories))
stringCategoriesList <- strsplit(inputdf$string_categories, ' ')
outDF <- data.frame(string_id=rep(inputdf$string_id, times=unlist(lapply(stringCategoriesList, length))),
string_categories=unlist(stringCategoriesList))
return(outDF)
} …Run Code Online (Sandbox Code Playgroud)