有人对SparkR vs sparklyr的优缺点有所概述吗?谷歌没有产生任何令人满意的结果,两者看起来非常相似.尝试两种方式,SparkR看起来更麻烦,而sparklyr非常简单(既可以安装也可以使用,特别是使用dplyr输入).sparklyr只能用于并行运行dplyr函数或"普通"R代码吗?
最好
通常,当人们想要sparklyr在自定义函数(即 ** 非翻译函数)上使用时,他们会将它们放在spark_apply(). 然而,我只遇到例子,其中一个单一的本地数据帧或者是copy_to()或spark_read_csv()到远程数据源,然后使用spark_apply()它。一个示例,仅用于说明目的:
library(sparklyr)
sc <- spark_connect(master = "local")
n_sim = 100
iris_samps <- iris %>% dplyr::filter(Species == "virginica") %>%
sapply(rep.int, times=n_sim) %>% cbind(replicate = rep(1:n_sim, each = 50)) %>%
data.frame() %>%
dplyr::group_by(replicate) %>%
dplyr::sample_n(50, replace = TRUE)
iris_samps_tbl <- copy_to(sc, iris_samps)
iris_samps_tbl %>%
spark_apply(function(x) {mean(x$Petal_Length)},
group_by = "replicate") %>%
ggplot(aes(x = result)) + geom_histogram(bins = 20) + ggtitle("Histogram of 100 Bootstrapped Means using sparklyr")
Run Code Online (Sandbox Code Playgroud)
因此,只要数据驻留在 …
介绍
R代码是使用Sparklyr包编写的,用于创建数据库模式.[给出了可重现的代码和数据库]
现有结果
root
|-- contributors : string
|-- created_at : string
|-- entities (struct)
| |-- hashtags (array) : [string]
| |-- media (array)
| | |-- additional_media_info (struct)
| | | |-- description : string
| | | |-- embeddable : boolean
| | | |-- monetizable : bollean
| | |-- diplay_url : string
| | |-- id : long
| | |-- id_str : string
| |-- urls (array)
|-- extended_entities (struct)
|-- retweeted_status (struct) …Run Code Online (Sandbox Code Playgroud) 我知道有很多关于Spark的内存错误的问题,但我还没有找到解决方案.
我有一个简单的工作流程:
filter 下到一小部分行 select 一小部分列collect进入驱动程序节点(所以我可以做其他操作R)当我运行上面然后cache表格来激发内存它需要<2GB - 与我的集群可用的内存相比很小 - 然后当我尝试collect将数据发送到我的驱动程序节点时,我收到OOM错误.
我试过运行以下设置:
对于这些我都玩过的多种配置executor.memory,driver.memory以及driver.maxResultSize覆盖全系列我的可用内存中的可能值,但始终我结束了一个内存不足的错误的collect阶段; 或者
java.lang.OutOfMemoryError: Java heap space,
java.lang.OutOfMemoryError : GC overhead limit exceeded或者
Error in invoke_method.spark_shell_connection(spark_connection(jobj), :
No status is returned.(sparklyr表示存储器问题的错误).
根据我对Spark的[有限]理解,在收集之前缓存一个表应强制进行所有计算 - 即如果表在缓存<2GB后快乐地坐在内存中,那么我不需要超过2GB的内存来收集它进入驱动程序节点.
请注意,这个问题的答案有一些我尚未尝试的建议,但这些可能会影响性能(例如序列化RDD),所以如果可能的话,我们希望避免使用.
我的问题:
谢谢
编辑:请注意以下@ Shaido的评论,cache通过Sparklyr 调用"通过执行count(*)表格来强制数据加载到内存中"[来自Sparklyr文档] - …
希望将一些R代码转换为Sparklyr,例如lmtest :: coeftest()和sandwich :: sandwich().尝试开始使用Sparklyr扩展但对Spark API来说很新并且遇到问题:(
运行Spark 2.1.1和sparklyr 0.5.5-9002
感觉第一步是使用linalg库制作DenseMatrix对象:
library(sparklyr)
library(dplyr)
sc <- spark_connect("local")
rows <- as.integer(2)
cols <- as.integer(2)
array <- c(1,2,3,4)
mat <- invoke_new(sc, "org.apache.spark.mllib.linalg.DenseMatrix",
rows, cols, array)
Run Code Online (Sandbox Code Playgroud)
这会导致错误:
Error: java.lang.Exception: No matched constructor found for class org.apache.spark.mllib.linalg.DenseMatrix
Run Code Online (Sandbox Code Playgroud)
好的,所以我得到了一个java lang异常,我很确定构造函数中的args rows和colsargs很好,但不确定最后一个,它应该是java Array.所以我尝试了几种排列:
array <- invoke_new(sc, "java.util.Arrays", c(1,2,3,4))
Run Code Online (Sandbox Code Playgroud)
但最终得到类似的错误信息......
Error: java.lang.Exception: No matched constructor found for class java.util.Arrays
Run Code Online (Sandbox Code Playgroud)
我觉得我错过了一些非常基本的东西.谁知道怎么了?
这是我的代码。我在数据块中运行它。
library(sparklyr)
library(dplyr)
library(arrow)
sc <- spark_connect(method = "databricks")
tbl_change_db(sc, "prod")
trip_ids <- spark_read_table(sc, "signals",memory=F) %>%
slice_sample(10) %>%
pull(trip_identifier)
Run Code Online (Sandbox Code Playgroud)
尽管我只查询 10 个样本,但该代码非常慢,需要几个小时才能运行。这是为什么?有没有办法提高性能?
谢谢你!
在下面的示例中,我加载了一个镶木地板文件,其中包含该meta字段中地图对象的嵌套记录.sparklyr似乎在处理这些方面做得很好.但是tidyr::unnest,不会转换为SQL(或HQL - 可以理解 - 就像LATERAL VIEW explode()),因此无法使用.有没有办法以其他方式取消数据?
tfl <- head(tf)
tfl
Source: query [?? x 10]
Database: spark connection master=yarn-client app=sparklyr local=FALSE
trkKey meta sources startTime
<chr> <list> <list> <list>
1 3juPe-k0yiMcANNMa_YiAJfJyU7WCQ3Q <S3: spark_jobj> <list [24]> <dbl [1]>
2 3juPe-k0yiAJX3ocJj1fVqru-e0syjvQ <S3: spark_jobj> <list [1]> <dbl [1]>
3 3juPe-k0yisY7UY_ufUPUo5mE1xGfmNw <S3: spark_jobj> <list [7]> <dbl [1]>
4 3juPe-k0yikXT5FhqNj87IwBw1Oy-6cw <S3: spark_jobj> <list [24]> <dbl [1]>
5 3juPe-k0yi4MMU63FEWYTNKxvDpYwsRw <S3: spark_jobj> <list [7]> <dbl [1]>
6 3juPe-k0yiFBz2uPbOQqKibCFwn7Fmlw …Run Code Online (Sandbox Code Playgroud) 据我了解,这两个包为 Apache Spark 提供了相似但主要不同的包装函数。Sparklyr 较新,但仍需要在功能范围内增长。因此,我认为目前需要同时使用这两个软件包才能获得完整的功能范围。
由于这两个包本质上都包装了对 Scala 类的 Java 实例的引用,因此我猜应该可以并行使用这些包。但实际上有可能吗?你的最佳实践是什么?
假设我有40个连续(DoubleType)变量,我已经使用了四分位数ft_quantile_discretizer.识别所有变量的四分位数非常快,因为该函数支持一次执行多个变量.
接下来,我想要一个热门代码那些分段变量,但是目前没有一个热代码支持所有这些变量的功能.所以我通过循环遍历变量,一次一个地管道ft_string_indexer,ft_one_hot_encoder并sdf_separate_column为每个分段变量.这可以完成工作.但是,随着循环的进行,它会大大减慢.我认为它的内存不足,但无法弄清楚如何编程,以便它以相同的速度在变量上执行.
如果q_vars是连续变量的变量名称(例如40个)的字符数组,我该如何以更加火花的方式对其进行编码?
for (v in q_vars) {
data_sprk_q<-data_sprk_q %>%
ft_string_indexer(v,paste0(v,"b"),"keep",string_order_type = "alphabetAsc") %>%
ft_one_hot_encoder(paste0(v,"b"),paste0(v,"bc")) %>%
sdf_separate_column(paste0(v,"bc"),into=q_vars_cat_list[[v]])
}
Run Code Online (Sandbox Code Playgroud)
我也尝试将所有引用的变量作为单个大型管道执行,但这也没有解决问题,所以我认为它与循环本身没有任何关系.
test_text<-paste0("data_sprk_q<-data_sprk_q %>% ", paste0("ft_string_indexer('",q_vars,"',paste0('",q_vars,"','b'),'keep',string_order_type = 'alphabetAsc') %>% ft_one_hot_encoder(paste0('",q_vars,"','b'),paste0('",q_vars,"','bc')) %>% sdf_separate_column(paste0('",q_vars,"','bc'),into=",q_vars_cat_list,")",collapse=" %>% "))
eval(parse(text=test_text))
Run Code Online (Sandbox Code Playgroud)
任何帮助,将不胜感激.
我java.io.IOException: No space left on device在运行一个简单的查询后得到了这个sparklyr.我使用最后版本的Spark(2.1.1)和Sparklyr
df_new <-spark_read_parquet(sc, "/mypath/parquet_*", name = "df_new", memory = FALSE)
myquery <- df_new %>% group_by(text) %>% summarize(mycount = n()) %>%
arrange(desc(mycount)) %>% head(10)
#this FAILS
get_result <- collect(myquery)
Run Code Online (Sandbox Code Playgroud)
我确实设置了两个
spark.local.dir <- "/mypath/"spark.worker.dir <- "/mypath/"使用平常
config <- spark_config()
config$`spark.executor.memory` <- "100GB"
config$`spark.executor.cores` <- "3"
config$`spark.local.dir` <- "/mypath/"
config$`spark.worker.dir` <- "mypath/"
config$`spark.cores.max`<- "2000"
config$`spark.default.parallelism`<- "4"
config$`spark.total-executor-cores`<- "80"
config$`sparklyr.shell.driver-memory` <- "100G"
config$`sparklyr.shell.executor-memory` <- "100G"
config$`spark.yarn.executor.memoryOverhead` <- "100G"
config$`sparklyr.shell.num-executors` <- …Run Code Online (Sandbox Code Playgroud)