createDataFrame中的SparkR瓶颈?

Krz*_*ski 5 r apache-spark sparkr

我是Spark,SparkR以及所有与HDFS相关的技术的新手.我最近安装了Spark 1.5.0并使用SparkR运行一些简单的代码:

Sys.setenv(SPARK_HOME="/private/tmp/spark-1.5.0-bin-hadoop2.6")
.libPaths("/private/tmp/spark-1.5.0-bin-hadoop2.6/R/lib")
require('SparkR')
require('data.table')

sc <- sparkR.init(master="local")
sqlContext <- sparkRSQL.init(sc)
hiveContext <- sparkRHive.init(sc)

n = 1000
x = data.table(id = 1:n, val = rnorm(n))

Sys.time()
xs <- createDataFrame(sqlContext, x)
Sys.time()
Run Code Online (Sandbox Code Playgroud)

代码立即执行.但是,当我将其更改为n = 1000000大约需要4分钟(两次Sys.time()通话之间的时间).当我在端口:4040的控制台中检查这些作业时,作业n = 1000持续时间为0.2秒,作业为n = 10000000.3 秒.难道我做错了什么?

zer*_*323 5

你没有做任何特别错误的事情.它只是不同因素组合的结果:

  1. createDataFrame因为它目前(Spark 1.5.1)的实施速度很慢.这是SPARK-8277中描述的已知问题.
  2. 目前的实施并不适合data.table.
  3. 基础R相对较慢.聪明的人说这是一个功能而不是一个bug,但它仍然需要考虑.

在SPARK-8277解决之前,您可以做的事情并不多,但您可以尝试两种选择:

  • 使用普通老data.frame而不是data.table.使用航班数据集(227496行,14列):

    df <- read.csv("flights.csv")
    microbenchmark::microbenchmark(createDataFrame(sqlContext, df), times=3)
    
    ## Unit: seconds
    ##                             expr      min       lq     mean   median
    ##  createDataFrame(sqlContext, df) 96.41565 97.19515 99.08441 97.97465
    ##        uq      max neval
    ##  100.4188 102.8629     3
    
    Run Code Online (Sandbox Code Playgroud)

    相比 data.table

    dt <- data.table::fread("flights.csv")
    microbenchmark::microbenchmark(createDataFrame(sqlContext, dt), times=3)
    
    ## Unit: seconds        
    ##                             expr      min       lq     mean  median
    ##  createDataFrame(sqlContext, dt) 378.8534 379.4482 381.2061 380.043
    ##        uq     max neval
    ##  382.3825 384.722     3
    
    Run Code Online (Sandbox Code Playgroud)
  • 写入磁盘并用于spark-csv直接将数据加载到Spark DataFrame而无需与R直接交互.听起来很疯狂:

    dt <- data.table::fread("flights.csv")
    
    write_and_read <- function() {
        write.csv(dt, tempfile(), row.names=FALSE)
        read.df(sqlContext, "flights.csv",
            source = "com.databricks.spark.csv",
            header = "true",
            inferSchema = "true"
        )
    }
    
    ## Unit: seconds
    ##              expr      min       lq     mean   median
    ##  write_and_read() 2.924142 2.959085 2.983008 2.994027
    ##       uq      max neval
    ##  3.01244 3.030854     3
    
    Run Code Online (Sandbox Code Playgroud)

我不确定在第一时间将可以在R中处理的数据推送到Spark是否真的有意义,但不要赘述.

编辑:

此问题应由Spark 1.6.0中的SPARK-11086解决.