从 sparklyr 编写一个与 spark_apply() 一起使用的函数

Kre*_*igs 5 r rstudio dplyr sparklyr

test <- data.frame('prod_id'= c("shoe", "shoe", "shoe", "shoe", "shoe", "shoe", "boat", "boat","boat","boat","boat","boat"), 
               'seller_id'= c("a", "b", "c", "d", "e", "f", "a","g", "h", "r", "q", "b"), 
               'Dich'= c(1, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0),
               'price' = c(120, 20, 10, 4, 3, 4, 30, 43, 56, 88, 75, 44)
                )
test

       prod_id seller_id Dich price
 1     shoe         a    1   120
 2     shoe         b    0    20
 3     shoe         c    0    10
 4     shoe         d    0     4
 5     shoe         e    0     3
 6     shoe         f    0     4
 7     boat         a    0    30
 8     boat         g    0    43
 9     boat         h    1    56
10     boat         r    0    88
11     boat         q    0    75
12     boat         b    0    44
Run Code Online (Sandbox Code Playgroud)

我想创建一个新列,该列根据 Dich 的值计算 price 列中观察值之间的差异,其中每个观察值与每个 prod_id 组中 Dich==1 的观察值不同。这样做的语法如下。

test %>% 
group_by(prod_id) %>% 
mutate(diff_p = if(any(Dich ==1)) price - price[Dich == 1] else NA)

       prod_id seller_id Dich price diff_p
 1     shoe         a    1   120      0
 2     shoe         b    0    20     -100
 3     shoe         c    0    10     -110
 4     shoe         d    0     4     -116
 5     shoe         e    0     3     -117
 6     shoe         f    0     4     -116
 7     boat         a    0    30     -26
 8     boat         g    0    43     -13
 9     boat         h    1    56       0
10     boat         r    0    88      32
11     boat         q    0    75      19
12     boat         b    0    44     -12
Run Code Online (Sandbox Code Playgroud)

现在我想创建一个使用相同语法的函数,我可以在新数据帧上使用该函数并使用 sparklyr::spark_apply() 获得相同的结果。

trans <- function(e) {e %>%
         group_by(prod_id) %>% 
         mutate(diff_p = if(any(Dich ==1)) price -price[Dich == 1] else NA)
         }
Run Code Online (Sandbox Code Playgroud)

在他们的网站上,rstudio 讨论了如何使用 R 函数来激发对象。

https://spark.rstudio.com/guides/distributed-r/

这是一个缩放 spark 数据帧所有列的函数示例。

 trees_tbl %>%
 spark_apply(function(e) scale(e))
Run Code Online (Sandbox Code Playgroud)

我想知道如何以解释用于 spark_apply() 的格式编写上述函数。如果您能解释如何在函数中包含 e,将会很有帮助 - e 代表什么?

小智 2

所有的包都需要在worker中并且需要找到函数(但是%>%需要你告诉workerlibrary(magrittr)),一种可行的方法是:

\n
trans <- function(e) {\n    library(magrittr)\n\n    e %>%\n        dplyr::group_by(prod_id) %>% \n        dplyr::mutate(diff_p = if(any(Dich ==1)) price -price[Dich == 1] else NA)\n}\n\nsparklyr::spark_apply(\n  x = test_sf, \n  f = trans)\n# Source: spark<?> [?? x 5]\n   prod_id seller_id  Dich price diff_p\n   <chr>   <chr>     <dbl> <dbl>  <dbl>\n 1 shoe    a             1   120      0\n 2 shoe    b             0    20   -100\n 3 shoe    c             0    10   -110\n 4 shoe    d             0     4   -116\n 5 shoe    e             0     3   -117\n 6 shoe    f             0     4   -116\n 7 boat    a             0    30    -26\n 8 boat    g             0    43    -13\n 9 boat    h             1    56      0\n10 boat    r             0    88     32\n# \xe2\x80\xa6 with more rows\n# \xe2\x84\xb9 Use `print(n = ...)` to see more rows\n
Run Code Online (Sandbox Code Playgroud)\n