如何从Apache Spark中的数据框中选择相同大小的分层样本?

Maj*_*ifi 8 apache-spark pyspark spark-dataframe

我在Spark 2中有一个数据框,如下所示,用户有50到数千个帖子.我想创建一个新的数据框,其中包含原始数据框中的所有用户,但每个用户只有5个随机抽样的帖子.

+--------+--------------+--------------------+
| user_id|       post_id|                text|
+--------+--------------+--------------------+
|67778705|44783131591473|some text...........|
|67778705|44783134580755|some text...........|
|67778705|44783136367108|some text...........|
|67778705|44783136970669|some text...........|
|67778705|44783138143396|some text...........|
|67778705|44783155162624|some text...........|
|67778705|44783688650554|some text...........|
|68950272|88655645825660|some text...........|
|68950272|88651393135293|some text...........|
|68950272|88652615409812|some text...........|
|68950272|88655744880460|some text...........|
|68950272|88658059871568|some text...........|
|68950272|88656994832475|some text...........|
+--------+--------------+--------------------+
Run Code Online (Sandbox Code Playgroud)

有些东西,posts.groupby('user_id').agg(sample('post_id'))但在pyspark中没有这样的功能.

有什么建议?

更新:

这个问题不同于另一个密切相关的问题,分层采样 - 火花采样有两种方式:

  1. 它询问了不成比例的分层抽样,而不是上面另一个问题中的普通比例法.
  2. 它要求在Spark的Dataframe API而不是RDD中执行此操作.

我还更新了问题的标题以澄清这一点.

TDr*_*bas 11

您可以使用http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.sampleBy.sampleBy(...)方法DataFrames

这是一个有效的例子:

import numpy as np
import string
import random

# generate some fake data
p = [(
    str(int(e)), 
    ''.join(
        random.choice(
            string.ascii_uppercase + string.digits) 
        for _ in range(10)
    )
) for e in np.random.normal(10, 1, 10000)]

posts = spark.createDataFrame(p, ['label', 'val'])

# define the sample size
percent_back = 0.05

# use this if you want an (almost) exact number of samples
# sample_count = 200
# percent_back = sample_count / posts.count()

frac = dict(
    (e.label, percent_back) 
    for e 
    in posts.select('label').distinct().collect()
)

# use this if you want (almost) balanced sample
# f = posts.groupby('label').count()

# f_min_count can also be specified to be exact number 

# e.g. f_min_count = 5

# as long as it is less the the minimum count of posts per user
# calculated from all the users

# alternatively, you can take the minimum post count
# f_min_count = f.select('count').agg(func.min('count').alias('minVal')).collect()[0].minVal

# f = f.withColumn('frac',f_min_count/func.col('count'))

# frac = dict(f.select('label', 'frac').collect())

# sample the data
sampled = posts.sampleBy('label', fractions=frac)

# compare the original counts with sampled
original_total_count = posts.count()
original_counts = posts.groupby('label').count()
original_counts = original_counts \
    .withColumn('count_perc', 
                original_counts['count'] / original_total_count)

sampled_total_count = sampled.count()
sampled_counts = sampled.groupBy('label').count()
sampled_counts = sampled_counts \
    .withColumn('count_perc', 
                sampled_counts['count'] / sampled_total_count)


print(original_counts.sort('label').show(100))
print(sampled_counts.sort('label').show(100))

print(sampled_total_count)
print(sampled_total_count / original_total_count)
Run Code Online (Sandbox Code Playgroud)


Bus*_*ero 5

使用sampleBy将得出近似的解决方案。这是一种替代方法,与上面的方法相比,有点笨拙,但始终导致样本大小完全相同。

import org.apache.spark.sql.functions.row_number
import org.apache.spark.sql.expressions.Window

df.withColumn("row_num",row_number().over(Window.partitionBy($"user_id").orderBy($"something_random"))
Run Code Online (Sandbox Code Playgroud)

如果您还没有随机ID,则可以使用org.apache.spark.sql.functions.rand创建具有随机值的列来保证您的随机采样。