cic*_*ioz 0 parallel-processing r mclapply apache-spark sparklyr
我对 Spark 很陌生,我试图在网上寻找一些东西,但我没有找到任何令人满意的东西。
我一直使用该命令运行并行计算mclapply,我喜欢它的结构(即,第一个参数用作滚动索引,第二个参数是要并行化的函数,然后是传递给函数的其他可选参数)。现在我试图通过 Spark 做同样的事情,即,我想在 Spark 集群的所有节点之间分配我的计算。这就是我所学到的以及我认为应该如何构建代码的内容(我正在使用包sparklyr):
spark_connect;copy_to并通过它的tibble访问它;mclapply,但我已经看到有在包(我已经看到了存在的功能没有类似功能spark.lapply的SparkR包,但不幸的是它是不是在CRAN了)。下面是我实现的一个简单的测试脚本,它使用函数mclapply.
#### Standard code that works with mclapply #########
dfTest = data.frame(X = rep(1, 10000), Y = rep(2, 10000))
.testFunc = function(X = 1, df, str) {
rowSelected = df[X, ]
y = as.numeric(rowSelected[1] + rowSelected[2])
return(list(y = y, str = str))
}
lOutput = mclapply(X = 1 : nrow(dfTest), FUN = .testFunc, df = dfTest,
str = "useless string", mc.cores = 2)
######################################################
###### Similar code that should work with Spark ######
library(sparklyr)
sc = spark_connect(master = "local")
dfTest = data.frame(X = rep(1, 10000), Y = rep(2, 10000))
.testFunc = function(X = 1, df, str) {
rowSelected = df[X, ]
nSum = as.numeric(rowSelected[1] + rowSelected[2])
return(list(nSum = nSum, str = str))
}
dfTest_tbl = copy_to(sc, dfTest, "test_tbl", overwrite = TRUE)
# Apply similar function mclapply to dfTest_tbl, that works with
# Spark
# ???
######################################################
Run Code Online (Sandbox Code Playgroud)
如果有人已经为此找到了解决方案,那就太好了。其他参考资料/指南/链接也非常受欢迎。谢谢!
小智 5
闪闪发光的
spark_apply 是您正在寻找的现有功能:
spark_apply(sdf, function(data) {
...
})
Run Code Online (Sandbox Code Playgroud)
请参考分布式[R在sparklyr文档的详细信息。
星火
与 SparkR 一起使用gapply/gapplyCollect
gapply(df, groupingCols, function(data) {...} schema)
Run Code Online (Sandbox Code Playgroud)
dapply / dapplyCollect
dapply(df, function(data) {...}, schema)
Run Code Online (Sandbox Code Playgroud)
UDF。参考
详情。
请注意,与原生 Spark 代码相比,所有解决方案都较差,在需要高性能时应避免使用。
| 归档时间: |
|
| 查看次数: |
1414 次 |
| 最近记录: |