采用pyspark进行分层抽样

use*_*256 3 apache-spark apache-spark-sql pyspark

我有一个Spark DataFrame,它有一列有很多零,很少有一些(只有0.01%).

我想采用一个随机的子样本,但是它是一个分层的 - 因此它在该列中保持1s与0的比率.

在pyspark可以吗?

我正在寻找一个非scala解决方案,并基于DataFrames而不是RDD基于.

eli*_*sah 23

在Spark中对Stratified采样 建议的解决方案非常简单,可以从Scala转换为Python(甚至转换为Java - 对Spark数据集进行分层的最简单方法是什么?).

不过,我会把它重写为python.让我们先创建一个玩具DataFrame:

from pyspark.sql.functions import lit
list = [(2147481832,23355149,1),(2147481832,973010692,1),(2147481832,2134870842,1),(2147481832,541023347,1),(2147481832,1682206630,1),(2147481832,1138211459,1),(2147481832,852202566,1),(2147481832,201375938,1),(2147481832,486538879,1),(2147481832,919187908,1),(214748183,919187908,1),(214748183,91187908,1)]
df = spark.createDataFrame(list, ["x1","x2","x3"])
df.show()
# +----------+----------+---+
# |        x1|        x2| x3|
# +----------+----------+---+
# |2147481832|  23355149|  1|
# |2147481832| 973010692|  1|
# |2147481832|2134870842|  1|
# |2147481832| 541023347|  1|
# |2147481832|1682206630|  1|
# |2147481832|1138211459|  1|
# |2147481832| 852202566|  1|
# |2147481832| 201375938|  1|
# |2147481832| 486538879|  1|
# |2147481832| 919187908|  1|
# | 214748183| 919187908|  1|
# | 214748183|  91187908|  1|
# +----------+----------+---+
Run Code Online (Sandbox Code Playgroud)

DataFrame你可以看到这有12个元素:

df.count()
# 12
Run Code Online (Sandbox Code Playgroud)

分布如下:

df.groupBy("x1").count().show()
# +----------+-----+
# |        x1|count|
# +----------+-----+
# |2147481832|   10|
# | 214748183|    2|
# +----------+-----+
Run Code Online (Sandbox Code Playgroud)

现在让我们来看看:

首先,我们将设置种子:

seed = 12
Run Code Online (Sandbox Code Playgroud)

找到分数和样本的关键:

fractions = df.select("x1").distinct().withColumn("fraction", lit(0.8)).rdd.collectAsMap()
print(fractions)                                                            
# {2147481832: 0.8, 214748183: 0.8}
sampled_df = df.stat.sampleBy("x1", fractions, seed)
sampled_df.show()
# +----------+---------+---+
# |        x1|       x2| x3|
# +----------+---------+---+
# |2147481832| 23355149|  1|
# |2147481832|973010692|  1|
# |2147481832|541023347|  1|
# |2147481832|852202566|  1|
# |2147481832|201375938|  1|
# |2147481832|486538879|  1|
# |2147481832|919187908|  1|
# | 214748183|919187908|  1|
# | 214748183| 91187908|  1|
# +----------+---------+---+
Run Code Online (Sandbox Code Playgroud)

我们现在可以查看我们样本的内容:

sampled_df.count()
# 9

sampled_df.groupBy("x1").count().show()
# +----------+-----+
# |        x1|count|
# +----------+-----+
# |2147481832|    7|
# | 214748183|    2|
# +----------+-----+
Run Code Online (Sandbox Code Playgroud)

  • @eliasah 有什么方法可以添加 0.8 和 0.2 分数吗?我想用 0.8 作为训练集,另一个 0.2 作为测试集。我尝试使用这种方法获取 0.8,但在 Spark 1.6 中获取其他 0.2 时遇到困难,因为那里没有子查询支持 (2认同)

Ank*_*rma 16

假设您在 'data' 数据框中有一个巨大的数据集,您希望使用基于 'Survived' 目标变量的分层抽样将其拆分为训练集和测试集。

  # Check initial distributions of 0's and 1's
-> data.groupBy("Survived").count().show()

 Survived|count|
 +--------+-----+
 |       1|  342|
 |       0|  549


  # Taking 70% of both 0's and 1's into training set
-> train = data.sampleBy("Survived", fractions={0: 0.7, 1: 0.7}, seed=10)

  # Subtracting 'train' from original 'data' to get test set 
-> test = data.subtract(train)



  # Checking distributions of 0's and 1's in train and test sets after the sampling
-> train.groupBy("Survived").count().show()
+--------+-----+
|Survived|count|
+--------+-----+
|       1|  239|
|       0|  399|
+--------+-----+
-> test.groupBy("Survived").count().show()

+--------+-----+
|Survived|count|
+--------+-----+
|       1|  103|
|       0|  150|
+--------+-----+
Run Code Online (Sandbox Code Playgroud)


yea*_*c21 7

这可以通过 PySpark 中的“randomSplit”和“union”轻松完成。

# read in data
df = spark.read.csv(file, header=True)
# split dataframes between 0s and 1s
zeros = df.filter(df["Target"]==0)
ones = df.filter(df["Target"]==1)
# split datasets into training and testing
train0, test0 = zeros.randomSplit([0.8,0.2], seed=1234)
train1, test1 = ones.randomSplit([0.8,0.2], seed=1234)
# stack datasets back together
train = train0.union(train1)
test = test0.union(test1)
Run Code Online (Sandbox Code Playgroud)


Vin*_*aes 5

这是基于 @eliasah 的接受答案和这个线程

如果你想取回训练集和测试集,你可以使用以下函数:

from pyspark.sql import functions as F 

def stratified_split_train_test(df, frac, label, join_on, seed=42):
    """ stratfied split of a dataframe in train and test set.
    inspiration gotten from:
    /sf/answers/3337063551/
    /sf/answers/2792248441/"""
    fractions = df.select(label).distinct().withColumn("fraction", F.lit(frac)).rdd.collectAsMap()
    df_frac = df.stat.sampleBy(label, fractions, seed)
    df_remaining = df.join(df_frac, on=join_on, how="left_anti")
    return df_frac, df_remaining
Run Code Online (Sandbox Code Playgroud)

创建分层训练和测试集,其中总数的 80% 用于训练集

df_train, df_test = stratified_split_train_test(df=df, frac=0.8, label="y", join_on="unique_id")
Run Code Online (Sandbox Code Playgroud)