标签: sparklyr

SparkR vs sparklyr

有人对SparkR vs sparklyr的优缺点有所概述吗?谷歌没有产生任何令人满意的结果,两者看起来非常相似.尝试两种方式,SparkR看起来更麻烦,而sparklyr非常简单(既可以安装也可以使用,特别是使用dplyr输入).sparklyr只能用于并行运行dplyr函数或"普通"R代码吗?

最好

r apache-spark sparkr sparklyr

51
推荐指数
4
解决办法
2万
查看次数

从 JPEG 创建 Spark 对象并在非翻译函数上使用 spark_apply()

通常,当人们想要sparklyr在自定义函数( ** 非翻译函数)上使用时,他们会将它们放在spark_apply(). 然而,我只遇到例子,其中一个单一的本地数据帧或者是copy_to()spark_read_csv()到远程数据源,然后使用spark_apply()它。一个示例,仅用于说明目的:

library(sparklyr)
sc <- spark_connect(master = "local")

n_sim = 100
iris_samps <- iris %>% dplyr::filter(Species == "virginica") %>%
  sapply(rep.int, times=n_sim) %>% cbind(replicate = rep(1:n_sim, each = 50)) %>% 
  data.frame() %>%
  dplyr::group_by(replicate) %>%
  dplyr::sample_n(50, replace = TRUE)

iris_samps_tbl <- copy_to(sc, iris_samps)

iris_samps_tbl %>% 
  spark_apply(function(x) {mean(x$Petal_Length)}, 
    group_by = "replicate") %>%
  ggplot(aes(x = result)) + geom_histogram(bins = 20) + ggtitle("Histogram of 100 Bootstrapped Means using sparklyr")
Run Code Online (Sandbox Code Playgroud)

因此,只要数据驻留在 …

binary jpeg r sparklyr

15
推荐指数
0
解决办法
199
查看次数

如何使用Sparklyr包来展平不同数据类型的数据?

介绍

R代码是使用Sparklyr包编写的,用于创建数据库模式.[给出了可重现的代码和数据库]

现有结果

root
|-- contributors : string
|-- created_at : string
|-- entities (struct)
|     |-- hashtags (array) : [string]
|     |-- media (array)
|     |     |-- additional_media_info (struct)
|     |     |       |-- description : string
|     |     |       |-- embeddable : boolean
|     |     |       |-- monetizable : bollean
|     |     |-- diplay_url : string
|     |     |-- id : long
|     |     |-- id_str : string
|     |-- urls (array)     
|-- extended_entities (struct)
|-- retweeted_status (struct) …
Run Code Online (Sandbox Code Playgroud)

nested r flatten apache-spark sparklyr

14
推荐指数
1
解决办法
1392
查看次数

从Spark群集中收集数据时出现内存不足错误

我知道有很多关于Spark的内存错误的问题,但我还没有找到解决方案.

我有一个简单的工作流程:

  1. 从Amazon S3读入ORC文件
  2. filter 下到一小部分行
  3. select 一小部分列
  4. collect进入驱动程序节点(所以我可以做其他操作R)

当我运行上面然后cache表格来激发内存它需要<2GB - 与我的集群可用的内存相比很小 - 然后当我尝试collect将数据发送到我的驱动程序节点时,我收到OOM错误.

我试过运行以下设置:

  • 具有32核和244GB内存的计算机上的本地模式
  • 独立模式,具有10 x 6.2 GB执行程序和61 GB驱动程序节点

对于这些我都玩过的多种配置executor.memory,driver.memory以及driver.maxResultSize覆盖全系列我的可用内存中的可能值,但始终我结束了一个内存不足的错误的collect阶段; 或者 java.lang.OutOfMemoryError: Java heap space,
java.lang.OutOfMemoryError : GC overhead limit exceeded或者 Error in invoke_method.spark_shell_connection(spark_connection(jobj), : No status is returned.(sparklyr表示存储器问题的错误).

根据我对Spark的[有限]理解,在收集之前缓存一个表应强制进行所有计算 - 即如果表在缓存<2GB后快乐地坐在内存中,那么我不需要超过2GB的内存来收集它进入驱动程序节点.

请注意,这个问题的答案有一些我尚未尝试的建议,但这些可能会影响性能(例如序列化RDD),所以如果可能的话,我们希望避免使用.

我的问题:

  1. 如何在缓存后占用如此少空间的数据帧会导致内存问题?
  2. 在我继续讨论可能影响性能的其他选项之前,有什么显而易见的东西可以检查/更改/排除故障以帮助解决问题吗?

谢谢

编辑:请注意以下@ Shaido的评论,cache通过Sparklyr 调用"通过执行count(*)表格来强制数据加载到内存中"[来自Sparklyr文档] - …

memory apache-spark sparklyr

13
推荐指数
1
解决办法
8466
查看次数

Matrix数学与Sparklyr

希望将一些R代码转换为Sparklyr,例如lmtest :: coeftest()和sandwich :: sandwich().尝试开始使用Sparklyr扩展但对Spark API来说很新并且遇到问题:(

运行Spark 2.1.1和sparklyr 0.5.5-9002

感觉第一步是使用linalg库制作DenseMatrix对象:

library(sparklyr)
library(dplyr)
sc <- spark_connect("local")

rows <- as.integer(2)
cols <- as.integer(2)
array <- c(1,2,3,4)

mat <- invoke_new(sc, "org.apache.spark.mllib.linalg.DenseMatrix", 
                  rows, cols, array)
Run Code Online (Sandbox Code Playgroud)

这会导致错误:

Error: java.lang.Exception: No matched constructor found for class org.apache.spark.mllib.linalg.DenseMatrix
Run Code Online (Sandbox Code Playgroud)

好的,所以我得到了一个java lang异常,我很确定构造函数中的args rowscolsargs很好,但不确定最后一个,它应该是java Array.所以我尝试了几种排列:

array <- invoke_new(sc, "java.util.Arrays", c(1,2,3,4))
Run Code Online (Sandbox Code Playgroud)

但最终得到类似的错误信息......

Error: java.lang.Exception: No matched constructor found for class java.util.Arrays
Run Code Online (Sandbox Code Playgroud)

我觉得我错过了一些非常基本的东西.谁知道怎么了?

r apache-spark apache-spark-mllib sparklyr

12
推荐指数
1
解决办法
644
查看次数

R 和 Sparklyr:为什么简单的查询这么慢?

这是我的代码。我在数据块中运行它。

library(sparklyr)
library(dplyr)
library(arrow)

sc <- spark_connect(method = "databricks")
tbl_change_db(sc, "prod")
trip_ids <- spark_read_table(sc, "signals",memory=F) %>% 
            slice_sample(10) %>% 
            pull(trip_identifier)
Run Code Online (Sandbox Code Playgroud)

尽管我只查询 10 个样本,但该代码非常慢,需要几个小时才能运行。这是为什么?有没有办法提高性能?

谢谢你!

r apache-spark sparklyr

10
推荐指数
1
解决办法
583
查看次数

有没有办法用sparklyr处理嵌套数据?

在下面的示例中,我加载了一个镶木地板文件,其中包含该meta字段中地图对象的嵌套记录.sparklyr似乎在处理这些方面做得很好.但是tidyr::unnest,不会转换为SQL(或HQL - 可以理解 - 就像LATERAL VIEW explode()),因此无法使用.有没有办法以其他方式取消数据?

tfl <- head(tf)
tfl
Source:   query [?? x 10]
Database: spark connection master=yarn-client app=sparklyr local=FALSE

                            trkKey             meta     sources startTime
                             <chr>           <list>      <list>    <list>
1 3juPe-k0yiMcANNMa_YiAJfJyU7WCQ3Q <S3: spark_jobj> <list [24]> <dbl [1]>
2 3juPe-k0yiAJX3ocJj1fVqru-e0syjvQ <S3: spark_jobj>  <list [1]> <dbl [1]>
3 3juPe-k0yisY7UY_ufUPUo5mE1xGfmNw <S3: spark_jobj>  <list [7]> <dbl [1]>
4 3juPe-k0yikXT5FhqNj87IwBw1Oy-6cw <S3: spark_jobj> <list [24]> <dbl [1]>
5 3juPe-k0yi4MMU63FEWYTNKxvDpYwsRw <S3: spark_jobj>  <list [7]> <dbl [1]>
6 3juPe-k0yiFBz2uPbOQqKibCFwn7Fmlw …
Run Code Online (Sandbox Code Playgroud)

r tidyr sparklyr

9
推荐指数
1
解决办法
1140
查看次数

同时使用 SparkR 和 Sparklyr

据我了解,这两个包为 Apache Spark 提供了相似但主要不同的包装函数。Sparklyr 较新,但仍需要在功能范围内增长。因此,我认为目前需要同时使用这两个软件包才能获得完整的功能范围。

由于这两个包本质上都包装了对 Scala 类的 Java 实例的引用,因此我猜应该可以并行使用这些包。但实际上有可能吗?你的最佳实践是什么?

r apache-spark sparkr sparklyr

9
推荐指数
1
解决办法
349
查看次数

通过重复调用内存中的数据帧来减速

假设我有40个连续(DoubleType)变量,我已经使用了四分位数ft_quantile_discretizer.识别所有变量的四分位数非常快,因为该函数支持一次执行多个变量.

接下来,我想要一个热门代码那些分段变量,但是目前没有一个热代码支持所有这些变量的功能.所以我通过循环遍历变量,一次一个地管道ft_string_indexer,ft_one_hot_encodersdf_separate_column为每个分段变量.这可以完成工作.但是,随着循环的进行,它会大大减慢.我认为它的内存不足,但无法弄清楚如何编程,以便它以相同的速度在变量上执行.

如果q_vars是连续变量的变量名称(例如40个)的字符数组,我该如何以更加火花的方式对其进行编码?

for (v in q_vars) {
   data_sprk_q<-data_sprk_q %>% 
       ft_string_indexer(v,paste0(v,"b"),"keep",string_order_type = "alphabetAsc") %>%
       ft_one_hot_encoder(paste0(v,"b"),paste0(v,"bc")) %>%
       sdf_separate_column(paste0(v,"bc"),into=q_vars_cat_list[[v]]) 
}
Run Code Online (Sandbox Code Playgroud)

我也尝试将所有引用的变量作为单个大型管道执行,但这也没有解决问题,所以我认为它与循环本身没有任何关系.

test_text<-paste0("data_sprk_q<-data_sprk_q %>% ", paste0("ft_string_indexer('",q_vars,"',paste0('",q_vars,"','b'),'keep',string_order_type = 'alphabetAsc') %>% ft_one_hot_encoder(paste0('",q_vars,"','b'),paste0('",q_vars,"','bc')) %>% sdf_separate_column(paste0('",q_vars,"','bc'),into=",q_vars_cat_list,")",collapse=" %>% "))
eval(parse(text=test_text))
Run Code Online (Sandbox Code Playgroud)

任何帮助,将不胜感激.

r apache-spark apache-spark-ml sparklyr

9
推荐指数
1
解决办法
246
查看次数

spark:java.io.IOException:设备上没有剩余空间[再次!]

java.io.IOException: No space left on device在运行一个简单的查询后得到了这个sparklyr.我使用最后版本的Spark(2.1.1)和Sparklyr

df_new <-spark_read_parquet(sc, "/mypath/parquet_*", name = "df_new", memory = FALSE)

myquery <- df_new %>% group_by(text) %>% summarize(mycount = n()) %>% 
  arrange(desc(mycount)) %>% head(10)

#this FAILS
get_result <- collect(myquery)
Run Code Online (Sandbox Code Playgroud)

我确实设置了两个

  • spark.local.dir <- "/mypath/"
  • spark.worker.dir <- "/mypath/"

使用平常

config <- spark_config()

config$`spark.executor.memory` <- "100GB"
config$`spark.executor.cores` <- "3"
config$`spark.local.dir` <- "/mypath/"
config$`spark.worker.dir` <- "mypath/"
config$`spark.cores.max`<- "2000"
config$`spark.default.parallelism`<- "4"
config$`spark.total-executor-cores`<- "80"
config$`sparklyr.shell.driver-memory` <- "100G"
config$`sparklyr.shell.executor-memory` <- "100G"
config$`spark.yarn.executor.memoryOverhead` <- "100G"
config$`sparklyr.shell.num-executors` <- …
Run Code Online (Sandbox Code Playgroud)

r apache-spark pyspark sparklyr

8
推荐指数
2
解决办法
2846
查看次数