Nyx*_*nyx 9 python python-2.7 apache-spark apache-spark-sql
我是新来使用Python中Spark和一直未能解决这个问题:运行后groupBy在pyspark.sql.dataframe.DataFrame
df = sqlsc.read.json("data.json")
df.groupBy('teamId')
Run Code Online (Sandbox Code Playgroud)
如何N从每个结果组中选择随机样本(按teamId分组)而无需替换?
我基本上试图N从每个团队中选择随机用户,也许使用groupBy错误开始?
zer*_*323 16
嗯,这有点不对劲.GroupedData并不是真正设计用于数据访问.它只描述了分组标准并提供了聚合方法.请参阅我在Spark中使用groupBy并回到DataFrame以获取更多详细信息的答案.
这个想法的另一个问题是选择N random samples.这是一项很难在没有数据心理分组的情况下并行实现的任务,当你call在以下情况下分组时,它不会发生DataFrame:
至少有两种方法可以解决这个问题:
转换为RDD,groupBy并执行本地采样
import random
n = 3
def sample(iter, n):
rs = random.Random() # We should probably use os.urandom as a seed
return rs.sample(list(iter), n)
df = sqlContext.createDataFrame(
[(x, y, random.random()) for x in (1, 2, 3) for y in "abcdefghi"],
("teamId", "x1", "x2"))
grouped = df.rdd.map(lambda row: (row.teamId, row)).groupByKey()
sampled = sqlContext.createDataFrame(
grouped.flatMap(lambda kv: sample(kv[1], n)))
sampled.show()
## +------+---+-------------------+
## |teamId| x1| x2|
## +------+---+-------------------+
## | 1| g| 0.81921738561455|
## | 1| f| 0.8563875814036598|
## | 1| a| 0.9010425238735935|
## | 2| c| 0.3864428179837973|
## | 2| g|0.06233470405822805|
## | 2| d|0.37620872770129155|
## | 3| f| 0.7518901502732027|
## | 3| e| 0.5142305439671874|
## | 3| d| 0.6250620479303716|
## +------+---+-------------------+
Run Code Online (Sandbox Code Playgroud)使用窗口函数
from pyspark.sql import Window
from pyspark.sql.functions import col, rand, rowNumber
w = Window.partitionBy(col("teamId")).orderBy(col("rnd_"))
sampled = (df
.withColumn("rnd_", rand()) # Add random numbers column
.withColumn("rn_", rowNumber().over(w)) # Add rowNumber over windw
.where(col("rn_") <= n) # Take n observations
.drop("rn_") # drop helper columns
.drop("rnd_"))
sampled.show()
## +------+---+--------------------+
## |teamId| x1| x2|
## +------+---+--------------------+
## | 1| f| 0.8563875814036598|
## | 1| g| 0.81921738561455|
## | 1| i| 0.8173912535268248|
## | 2| h| 0.10862995810038856|
## | 2| c| 0.3864428179837973|
## | 2| a| 0.6695356657072442|
## | 3| b|0.012329360826023095|
## | 3| a| 0.6450777858109182|
## | 3| e| 0.5142305439671874|
## +------+---+--------------------+
Run Code Online (Sandbox Code Playgroud)但我担心两者都会相当昂贵.如果单个组的大小是平衡的并且相对较大,我将简单地使用DataFrame.randomSplit.
如果组的数量相对较小,则可以尝试其他方法:
from pyspark.sql.functions import count, udf
from pyspark.sql.types import BooleanType
from operator import truediv
counts = (df
.groupBy(col("teamId"))
.agg(count("*").alias("n"))
.rdd.map(lambda r: (r.teamId, r.n))
.collectAsMap())
# This defines fraction of observations from a group which should
# be taken to get n values
counts_bd = sc.broadcast({k: truediv(n, v) for (k, v) in counts.items()})
to_take = udf(lambda k, rnd: rnd <= counts_bd.value.get(k), BooleanType())
sampled = (df
.withColumn("rnd_", rand())
.where(to_take(col("teamId"), col("rnd_")))
.drop("rnd_"))
sampled.show()
## +------+---+--------------------+
## |teamId| x1| x2|
## +------+---+--------------------+
## | 1| d| 0.14815204548854788|
## | 1| f| 0.8563875814036598|
## | 1| g| 0.81921738561455|
## | 2| a| 0.6695356657072442|
## | 2| d| 0.37620872770129155|
## | 2| g| 0.06233470405822805|
## | 3| b|0.012329360826023095|
## | 3| h| 0.9022527556458557|
## +------+---+--------------------+
Run Code Online (Sandbox Code Playgroud)
在Spark 1.5+中,您可以udf使用sampleBy方法调用替换:
df.sampleBy("teamId", counts_bd.value)
Run Code Online (Sandbox Code Playgroud)
只要每组的观察数量足够大以获得适当的样本,它就不会给出确切的观察数量,但在大多数情况下应该足够好.您也可以sampleByKey以类似的方式在RDD上使用.
Ita*_*chi 15
我发现这是一个更多的数据框,而不是进入 rdd 方式。
您可以使用window函数在组内创建排名,其中排名可以随机以适合您的情况。然后,您可以根据(N)每组所需的样本数进行过滤
window_1 = Window.partitionBy(data['teamId']).orderBy(F.rand())
data_1 = data.select('*', F.rank().over(window_1).alias('rank')).filter(F.col('rank') <= N).drop('rank')
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
6619 次 |
| 最近记录: |