与sparklyr一起使用时sample_n真的是随机样本吗?

kpu*_*hko 5 random r dplyr apache-spark sparklyr

我在spark数据框中有5亿行。我对sample_nfrom 感兴趣,dplyr因为它将允许我明确指定所需的样本量。如果要使用sparklyr::sdf_sample(),我首先必须计算sdf_nrow(),然后创建指定的数据分数sample_size / nrow,然后将该分数传递给sdf_sample。这没什么大不了的,但是sdf_nrow()要花一些时间才能完成。

因此,dplyr::sample_n()直接使用将是理想的选择。但是,经过一些测试,它看起来并不是sample_n()随机的。实际上,结果与head()!相同!如果函数不是随机抽样行,而是返回第一n行,那将是一个主要问题。

有人可以确认吗?是sdf_sample()我最好的选择吗?

# install.packages("gapminder")

library(gapminder)
library(sparklyr)
library(purrr)

sc <- spark_connect(master = "yarn-client")

spark_data <- sdf_import(gapminder, sc, "gapminder")


> # Appears to be random
> spark_data %>% sdf_sample(fraction = 0.20, replace = FALSE) %>% summarise(sample_mean = mean(lifeExp))
# Source:   lazy query [?? x 1]
# Database: spark_connection
  sample_mean
        <dbl>
1    58.83397


> spark_data %>% sdf_sample(fraction = 0.20, replace = FALSE) %>% summarise(sample_mean = mean(lifeExp))
# Source:   lazy query [?? x 1]
# Database: spark_connection
  sample_mean
        <dbl>
1    60.31693


> spark_data %>% sdf_sample(fraction = 0.20, replace = FALSE) %>% summarise(sample_mean = mean(lifeExp))
# Source:   lazy query [?? x 1]
# Database: spark_connection
  sample_mean
        <dbl>
1    59.38692
> 
> 
> # Appears to be random
> spark_data %>% sample_frac(0.20) %>% summarise(sample_mean = mean(lifeExp))
# Source:   lazy query [?? x 1]
# Database: spark_connection
  sample_mean
        <dbl>
1    60.48903


> spark_data %>% sample_frac(0.20) %>% summarise(sample_mean = mean(lifeExp))
# Source:   lazy query [?? x 1]
# Database: spark_connection
  sample_mean
        <dbl>
1    59.44187


> spark_data %>% sample_frac(0.20) %>% summarise(sample_mean = mean(lifeExp))
# Source:   lazy query [?? x 1]
# Database: spark_connection
  sample_mean
        <dbl>
1    59.27986
> 
> 
> # Does not appear to be random
> spark_data %>% sample_n(300) %>% summarise(sample_mean = mean(lifeExp))
# Source:   lazy query [?? x 1]
# Database: spark_connection
  sample_mean
        <dbl>
1    57.78434


> spark_data %>% sample_n(300) %>% summarise(sample_mean = mean(lifeExp))
# Source:   lazy query [?? x 1]
# Database: spark_connection
  sample_mean
        <dbl>
1    57.78434


> spark_data %>% sample_n(300) %>% summarise(sample_mean = mean(lifeExp))
# Source:   lazy query [?? x 1]
# Database: spark_connection
  sample_mean
        <dbl>
1    57.78434
> 
> 
> 
> # === Test sample_n() ===
> sample_mean <- list()
> 
> for(i in 1:20){
+   
+   sample_mean[i] <- spark_data %>% sample_n(300) %>% summarise(sample_mean = mean(lifeExp)) %>% collect() %>% pull()
+   
+ }
> 
> 
> sample_mean %>% flatten_dbl() %>% mean()
[1] 57.78434
> sample_mean %>% flatten_dbl() %>% sd()
[1] 0
> 
> 
> # === Test head() ===
> spark_data %>% 
+   head(300) %>% 
+   pull(lifeExp) %>% 
+   mean()
[1] 57.78434
Run Code Online (Sandbox Code Playgroud)

zer*_*323 6

它不是。如果您检查执行计划(此处optimizedPlan定义的函数),您会发现它只是一个限制:

spark_data %>% sample_n(300) %>% optimizedPlan()
Run Code Online (Sandbox Code Playgroud)
spark_data %>% sample_n(300) %>% optimizedPlan()
Run Code Online (Sandbox Code Playgroud)

进一步证实了这一点show_query

spark_data %>% sample_n(300) %>% show_query()
Run Code Online (Sandbox Code Playgroud)
<jobj[168]>
  org.apache.spark.sql.catalyst.plans.logical.GlobalLimit
  GlobalLimit 300
+- LocalLimit 300
   +- InMemoryRelation [country#151, continent#152, year#153, lifeExp#154, pop#155, gdpPercap#156], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas), `gapminder`
         +- Scan ExistingRDD[country#151,continent#152,year#153,lifeExp#154,pop#155,gdpPercap#156] 
Run Code Online (Sandbox Code Playgroud)

和可视化的执行计划:

TABLESAMPLE(n ROWS) 计划

最后,如果您检查Spark 源代码,您会发现这种情况是通过 simple 实现的LIMIT

spark_data %>% sample_n(300) %>% show_query()
Run Code Online (Sandbox Code Playgroud)

我相信这个语义是从 Hive 继承的,其中等效查询从每个输入 split 中获取 n 第一行

实际上,获取精确大小的样本非常昂贵,除非绝对必要(与 large 相同LIMITS),否则应避免使用。