Sparklyr的spark_apply函数似乎在单个执行程序上运行,并且在适度大的数据集上失败

jay*_*jay 2 r apache-spark sparklyr

我试图用来spark_apply在Spark表上运行下面的R函数.如果我的输入表很小(例如5,000行),这可以正常工作,但是当表格适度大(例如5,000,000行)时,约30分钟后会抛出错误: sparklyr worker rscript failure, check worker logs for details

查看Spark UI显示,只创建了一个任务,并且只有一个执行程序应用于此任务.

任何人都可以就500万行数据集失败的原因提出建议吗? 问题可能是单个执行者正在做所有的工作,而失败了吗?

# Create data and copy to Spark
testdf <- data.frame(string_id=rep(letters[1:5], times=1000), # 5000 row table
                 string_categories=rep(c("", "1", "2 3", "4 5 6", "7"), times=1000))
testtbl <- sdf_copy_to(sc, testdf, overwrite=TRUE, repartition=100L, memory=TRUE)

# Write function to return dataframe with strings split out
myFunction <- function(inputdf){
  inputdf$string_categories <- as.character(inputdf$string_categories)
  inputdf$string_categories=with(inputdf, ifelse(string_categories=="", "blank", string_categories))
  stringCategoriesList <- strsplit(inputdf$string_categories, ' ')
  outDF <- data.frame(string_id=rep(inputdf$string_id, times=unlist(lapply(stringCategoriesList, length))),
                  string_categories=unlist(stringCategoriesList))
 return(outDF)
}

# Use spark_apply to run function in Spark
outtbl <- testtbl %>%
  spark_apply(myFunction,
          names=c('string_id', 'string_categories'))
outtbl
Run Code Online (Sandbox Code Playgroud)

Jav*_*chi 5

  1. sparklyr worker rscript failure, check worker logs for details错误由驱动程序节点写入,并指出需要在工作日志中找到实际错误.通常,可以通过stdout从Spark UI中的执行程序选项卡打开来访问工作日志; 日志应包含RScript:描述执行程序正在处理的内容和错误的具体内容的条目.

  2. 关于正在创建的单个任务,如果columns未指定类型spark_apply(),则需要计算结果的子集以猜测列类型,为避免这种情况,请按如下方式传递显式列类型:

    outtbl <- testtbl %>% spark_apply( myFunction, columns=list( string_id = "character", string_categories = "character"))

  3. 如果使用sparklyr 0.6.3,升级到sparklyr 0.6.4devtools::install_github("rstudio/sparklyr"),因为sparklyr 0.6.3在那里包分配启用并不止一个执行中的每个节点运行包含在一些边缘情况不正确的等待时间.

  4. 在高负载下,通常会耗尽内存.增加分区数可以解决此问题,因为它会减少处理此数据集所需的总内存.尝试运行此:

    testtbl %>% sdf_repartition(1000) %>% spark_apply(myFunction, names=c('string_id', 'string_categories'))

  5. 也可能是由于函数中的逻辑,函数抛出了某些分区的异常,你可以通过使用tryCatch()忽略错误然后查找哪些是缺失值以及为什么函数来查看是否是这种情况会失败的那些价值观.我会从以下内容开始:

    myFunction <- function(inputdf){ tryCatch({ inputdf$string_categories <- as.character(inputdf$string_categories) inputdf$string_categories=with(inputdf, ifelse(string_categories=="", "blank", string_categories)) stringCategoriesList <- strsplit(inputdf$string_categories, ' ') outDF <- data.frame(string_id=rep(inputdf$string_id, times=unlist(lapply(stringCategoriesList, length))), string_categories=unlist(stringCategoriesList)) return(outDF) }, error = function(e) { return( data.frame(string_id = c(0), string_categories = c("error")) ) }) }