如何使用包“sparklyr”在R中实现lapply函数

cic*_*ioz 0 parallel-processing r mclapply apache-spark sparklyr

我对 Spark 很陌生,我试图在网上寻找一些东西,但我没有找到任何令人满意的东西。

我一直使用该命令运行并行计算mclapply,我喜欢它的结构(即,第一个参数用作滚动索引,第二个参数是要并行化的函数,然后是传递给函数的其他可选参数)。现在我试图通过 Spark 做同样的事情,即,我想在 Spark 集群的所有节点之间分配我的计算。这就是我所学到的以及我认为应该如何构建代码的内容(我正在使用包sparklyr):

  1. 我使用命令创建到 Spark 的连接spark_connect
  2. 我在 Spark 环境中复制我的 data.framecopy_to并通过它的tibble访问它;
  3. 我想实现的“星火友好”的版本mclapply,但我已经看到有在包(我已经看到了存在的功能没有类似功能spark.lapplySparkR包,但不幸的是它是不是在CRAN了)。

下面是我实现的一个简单的测试脚本,它使用函数mclapply.

#### Standard code that works with mclapply #########
dfTest = data.frame(X = rep(1, 10000), Y = rep(2, 10000))

.testFunc = function(X = 1, df, str) {
    rowSelected = df[X, ]
    y = as.numeric(rowSelected[1] + rowSelected[2])
    return(list(y = y, str = str))
}

lOutput = mclapply(X = 1 : nrow(dfTest), FUN = .testFunc, df = dfTest, 
                   str = "useless string", mc.cores = 2)

######################################################

###### Similar code that should work with Spark ######
library(sparklyr)
sc = spark_connect(master = "local")

dfTest = data.frame(X = rep(1, 10000), Y = rep(2, 10000))

.testFunc = function(X = 1, df, str) {
  rowSelected = df[X, ]
  nSum = as.numeric(rowSelected[1] + rowSelected[2])
  return(list(nSum = nSum, str = str))
}

dfTest_tbl = copy_to(sc, dfTest, "test_tbl", overwrite = TRUE)

# Apply similar function mclapply to dfTest_tbl, that works with 
# Spark
# ???
######################################################
Run Code Online (Sandbox Code Playgroud)

如果有人已经为此找到了解决方案,那就太好了。其他参考资料/指南/链接也非常受欢迎。谢谢!

小智 5

闪闪发光的

spark_apply 是您正在寻找的现有功能:

spark_apply(sdf, function(data) {
   ...
})
Run Code Online (Sandbox Code Playgroud)

请参考分布式[Rsparklyr文档的详细信息。

星火

与 SparkR 一起使用gapply/gapplyCollect

gapply(df, groupingCols, function(data) {...} schema)
Run Code Online (Sandbox Code Playgroud)

dapply / dapplyCollect

dapply(df, function(data) {...}, schema)
Run Code Online (Sandbox Code Playgroud)

UDF。参考

详情。

请注意,与原生 Spark 代码相比,所有解决方案都较差,在需要高性能时应避免使用。