Maj*_*ifi 8 apache-spark pyspark spark-dataframe
我在Spark 2中有一个数据框,如下所示,用户有50到数千个帖子.我想创建一个新的数据框,其中包含原始数据框中的所有用户,但每个用户只有5个随机抽样的帖子.
+--------+--------------+--------------------+
| user_id| post_id| text|
+--------+--------------+--------------------+
|67778705|44783131591473|some text...........|
|67778705|44783134580755|some text...........|
|67778705|44783136367108|some text...........|
|67778705|44783136970669|some text...........|
|67778705|44783138143396|some text...........|
|67778705|44783155162624|some text...........|
|67778705|44783688650554|some text...........|
|68950272|88655645825660|some text...........|
|68950272|88651393135293|some text...........|
|68950272|88652615409812|some text...........|
|68950272|88655744880460|some text...........|
|68950272|88658059871568|some text...........|
|68950272|88656994832475|some text...........|
+--------+--------------+--------------------+
Run Code Online (Sandbox Code Playgroud)
有些东西,posts.groupby('user_id').agg(sample('post_id'))但在pyspark中没有这样的功能.
有什么建议?
更新:
这个问题不同于另一个密切相关的问题,分层采样 - 火花采样有两种方式:
我还更新了问题的标题以澄清这一点.
TDr*_*bas 11
您可以使用http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.sampleBy的.sampleBy(...)方法DataFrames
这是一个有效的例子:
import numpy as np
import string
import random
# generate some fake data
p = [(
str(int(e)),
''.join(
random.choice(
string.ascii_uppercase + string.digits)
for _ in range(10)
)
) for e in np.random.normal(10, 1, 10000)]
posts = spark.createDataFrame(p, ['label', 'val'])
# define the sample size
percent_back = 0.05
# use this if you want an (almost) exact number of samples
# sample_count = 200
# percent_back = sample_count / posts.count()
frac = dict(
(e.label, percent_back)
for e
in posts.select('label').distinct().collect()
)
# use this if you want (almost) balanced sample
# f = posts.groupby('label').count()
# f_min_count can also be specified to be exact number
# e.g. f_min_count = 5
# as long as it is less the the minimum count of posts per user
# calculated from all the users
# alternatively, you can take the minimum post count
# f_min_count = f.select('count').agg(func.min('count').alias('minVal')).collect()[0].minVal
# f = f.withColumn('frac',f_min_count/func.col('count'))
# frac = dict(f.select('label', 'frac').collect())
# sample the data
sampled = posts.sampleBy('label', fractions=frac)
# compare the original counts with sampled
original_total_count = posts.count()
original_counts = posts.groupby('label').count()
original_counts = original_counts \
.withColumn('count_perc',
original_counts['count'] / original_total_count)
sampled_total_count = sampled.count()
sampled_counts = sampled.groupBy('label').count()
sampled_counts = sampled_counts \
.withColumn('count_perc',
sampled_counts['count'] / sampled_total_count)
print(original_counts.sort('label').show(100))
print(sampled_counts.sort('label').show(100))
print(sampled_total_count)
print(sampled_total_count / original_total_count)
Run Code Online (Sandbox Code Playgroud)
使用sampleBy将得出近似的解决方案。这是一种替代方法,与上面的方法相比,有点笨拙,但始终导致样本大小完全相同。
import org.apache.spark.sql.functions.row_number
import org.apache.spark.sql.expressions.Window
df.withColumn("row_num",row_number().over(Window.partitionBy($"user_id").orderBy($"something_random"))
Run Code Online (Sandbox Code Playgroud)
如果您还没有随机ID,则可以使用org.apache.spark.sql.functions.rand创建具有随机值的列来保证您的随机采样。
| 归档时间: |
|
| 查看次数: |
7420 次 |
| 最近记录: |