jay*_*jay 13 memory apache-spark sparklyr
我知道有很多关于Spark的内存错误的问题,但我还没有找到解决方案.
我有一个简单的工作流程:
filter
下到一小部分行 select
一小部分列collect
进入驱动程序节点(所以我可以做其他操作R
)当我运行上面然后cache
表格来激发内存它需要<2GB - 与我的集群可用的内存相比很小 - 然后当我尝试collect
将数据发送到我的驱动程序节点时,我收到OOM错误.
我试过运行以下设置:
对于这些我都玩过的多种配置executor.memory
,driver.memory
以及driver.maxResultSize
覆盖全系列我的可用内存中的可能值,但始终我结束了一个内存不足的错误的collect
阶段; 或者
java.lang.OutOfMemoryError: Java heap space
,
java.lang.OutOfMemoryError : GC overhead limit exceeded
或者
Error in invoke_method.spark_shell_connection(spark_connection(jobj), :
No status is returned.
(sparklyr
表示存储器问题的错误).
根据我对Spark的[有限]理解,在收集之前缓存一个表应强制进行所有计算 - 即如果表在缓存<2GB后快乐地坐在内存中,那么我不需要超过2GB的内存来收集它进入驱动程序节点.
请注意,这个问题的答案有一些我尚未尝试的建议,但这些可能会影响性能(例如序列化RDD),所以如果可能的话,我们希望避免使用.
我的问题:
谢谢
编辑:请注意以下@ Shaido的评论,cache
通过Sparklyr 调用"通过执行count(*)
表格来强制数据加载到内存中"[来自Sparklyr文档] - 即表格应该在内存中并且所有计算都在运行(I相信)在打电话之前collect
.
编辑:遵循以下建议后的一些额外观察:
driver.maxResultSize
为<1G,我得到一个错误,指出序列化RDD的大小是1030 MB,大于driver.maxResultSize.collect
我会看到该用法一直持续到达到~90GB,此时会发生OOM错误.因此无论出于何种原因,用于执行collect
操作的RAM 量比我想要收集的RDD的大小大约100倍.编辑:下面添加的代码,如评论中所要求的.
#__________________________________________________________________________________________________________________________________
# Set parameters used for filtering rows
#__________________________________________________________________________________________________________________________________
firstDate <- '2017-07-01'
maxDate <- '2017-08-31'
advertiserID <- '4529611'
advertiserID2 <- '4601141'
advertiserID3 <- '4601141'
library(dplyr)
library(stringr)
library(sparklyr)
#__________________________________________________________________________________________________________________________________
# Configure & connect to spark
#__________________________________________________________________________________________________________________________________
Sys.setenv("SPARK_MEM"="100g")
Sys.setenv(HADOOP_HOME="C:/Users/Jay.Ruffell/AppData/Local/rstudio/spark/Cache/spark-2.0.1-bin-hadoop2.7/tmp/hadoop")
config <- spark_config()
config$sparklyr.defaultPackages <- "org.apache.hadoop:hadoop-aws:2.7.3" # used to connect to S3
Sys.setenv(AWS_ACCESS_KEY_ID="")
Sys.setenv(AWS_SECRET_ACCESS_KEY="") # setting these blank ensures that AWS uses the IAM roles associated with the cluster to define S3 permissions
# Specify memory parameters - have tried lots of different values here!
config$`sparklyr.shell.driver-memory` <- '50g'
config$`sparklyr.shell.executor-memory` <- '50g'
config$spark.driver.maxResultSize <- '50g'
sc <- spark_connect(master='local', config=config, version='2.0.1')
#__________________________________________________________________________________________________________________________________
# load data into spark from S3 ----
#__________________________________________________________________________________________________________________________________
#+++++++++++++++++++
# create spark table (not in memory yet) of all logfiles within logfiles path
#+++++++++++++++++++
spark_session(sc) %>%
invoke("read") %>%
invoke("format", "orc") %>%
invoke("load", 's3a://nz-omg-ann-aipl-data-lake/aip-connect-256537/orc-files/dcm-log-files/dt2-facts') %>%
invoke("createOrReplaceTempView", "alldatadf")
alldftbl <- tbl(sc, 'alldatadf') # create a reference to the sparkdf without loading into memory
#+++++++++++++++++++
# define variables used to filter table down to daterange
#+++++++++++++++++++
# Calculate firstDate & maxDate as unix timestamps
unixTime_firstDate <- as.numeric(as.POSIXct(firstDate))+1
unixTime_maxDate <- as.numeric(as.POSIXct(maxDate)) + 3600*24-1
# Convert daterange params into date_year, date_month & date_day values to pass to filter statement
dateRange <- as.character(seq(as.Date(firstDate), as.Date(maxDate), by=1))
years <- unique(substring(dateRange, first=1, last=4))
if(length(years)==1) years <- c(years, years)
year_y1 <- years[1]; year_y2 <- years[2]
months_y1 <- substring(dateRange[grepl(years[1], dateRange)], first=6, last=7)
minMonth_y1 <- min(months_y1)
maxMonth_y1 <- max(months_y1)
months_y2 <- substring(dateRange[grepl(years[2], dateRange)], first=6, last=7)
minMonth_y2 <- min(months_y2)
maxMonth_y2 <- max(months_y2)
# Repeat for 1 day prior to first date & one day after maxdate (because of the way logfile orc partitions are created, sometimes touchpoints can end up in the wrong folder by 1 day. So read in extra days, then filter by event time)
firstDateMinusOne <- as.Date(firstDate)-1
firstDateMinusOne_year <- substring(firstDateMinusOne, first=1, last=4)
firstDateMinusOne_month <- substring(firstDateMinusOne, first=6, last=7)
firstDateMinusOne_day <- substring(firstDateMinusOne, first=9, last=10)
maxDatePlusOne <- as.Date(maxDate)+1
maxDatePlusOne_year <- substring(maxDatePlusOne, first=1, last=4)
maxDatePlusOne_month <- substring(maxDatePlusOne, first=6, last=7)
maxDatePlusOne_day <- substring(maxDatePlusOne, first=9, last=10)
#+++++++++++++++++++
# Read in data, filter & select
#+++++++++++++++++++
# startTime <- proc.time()[3]
dftbl <- alldftbl %>% # create a reference to the sparkdf without loading into memory
# filter by month and year, using ORC partitions for extra speed
filter(((date_year==year_y1 & date_month>=minMonth_y1 & date_month<=maxMonth_y1) |
(date_year==year_y2 & date_month>=minMonth_y2 & date_month<=maxMonth_y2) |
(date_year==firstDateMinusOne_year & date_month==firstDateMinusOne_month & date_day==firstDateMinusOne_day) |
(date_year==maxDatePlusOne_year & date_month==maxDatePlusOne_month & date_day==maxDatePlusOne_day))) %>%
# filter to be within firstdate & maxdate. Note that event_time_char will be in UTC, so 12hrs behind.
filter(event_time>=(unixTime_firstDate*1000000) & event_time<(unixTime_maxDate*1000000)) %>%
# filter by advertiser ID
filter(((advertiser_id==advertiserID | advertiser_id==advertiserID2 | advertiser_id==advertiserID3) &
!is.na(advertiser_id)) |
((floodlight_configuration==advertiserID | floodlight_configuration==advertiserID2 |
floodlight_configuration==advertiserID3) & !is.na(floodlight_configuration)) & user_id!="0") %>%
# Define cols to keep
transmute(time=as.numeric(event_time/1000000),
user_id=as.character(user_id),
action_type=as.character(if(fact_type=='click') 'C' else if(fact_type=='impression') 'I' else if(fact_type=='activity') 'A' else NA),
lookup=concat_ws("_", campaign_id, ad_id, site_id_dcm, placement_id),
activity_lookup=as.character(activity_id),
sv1=as.character(segment_value_1),
other_data=as.character(other_data)) %>%
mutate(time_char=as.character(from_unixtime(time)))
# cache to memory
dftbl <- sdf_register(dftbl, "filtereddf")
tbl_cache(sc, "filtereddf")
#__________________________________________________________________________________________________________________________________
# Collect out of spark
#__________________________________________________________________________________________________________________________________
myDF <- collect(dftbl)
Run Code Online (Sandbox Code Playgroud)
小智 5
当您说在数据框上收集时,发生了两件事,
回答:
如果您只想将数据加载到执行器的内存中,那么count()也是将数据加载到执行器的内存中的操作,其他进程可以使用该操作。
如果要提取数据,则在处理数据“ --conf spark.driver.maxResultSize = 10g”时,请尝试将其与其他属性一起使用。
归档时间: |
|
查看次数: |
8466 次 |
最近记录: |