Pho*_*yen 4 python apache-spark pyspark spark-dataframe
我正在使用带有 Pandas 的 jupyter notebook,但是当我使用 Spark 时,我想使用 Spark DataFrame 来转换或计算而不是 Pandas。请帮我将一些计算转换为 Spark DataFrame 或 RDD。
数据框:
df =
+--------+-------+---------+--------+
| userId | item | price | value |
+--------+-------+---------+--------+
| 169 | I0111 | 5300 | 1 |
| 169 | I0973 | 70 | 1 |
| 336 | C0174 | 455 | 1 |
| 336 | I0025 | 126 | 1 |
| 336 | I0973 | 4 | 1 |
| 770963 | B0166 | 2 | 1 |
| 1294537| I0110 | 90 | 1 |
+--------+-------+---------+--------+
Run Code Online (Sandbox Code Playgroud)
1.使用熊猫计算:
(1) userItem = df.groupby(['userId'])['item'].nunique()
Run Code Online (Sandbox Code Playgroud)
结果是一个系列对象:
+--------+------+
| userId | |
+--------+------+
| 169 | 2 |
| 336 | 3 |
| 770963 | 1 |
| 1294537| 1 |
+--------+------+
Run Code Online (Sandbox Code Playgroud)
2. 使用乘法
data_sum = df.groupby(['userId', 'item'])['value'].sum() --> result is Series object
average_played = np.mean(userItem) --> result is number
(2) weighted_games_played = data_sum * (average_played / userItem)
Run Code Online (Sandbox Code Playgroud)
请帮助我在 Spark 上使用 Spark DataFrame 和 Operators 来执行此操作 (1) 和 (2)
您可以使用以下内容实现(1):
import pyspark.sql.functions as f
userItem=df.groupby('userId').agg(f.expr('count(distinct item)').alias('n_item'))
Run Code Online (Sandbox Code Playgroud)
对于(2):
data_sum=df.groupby(['userId','item']).agg(f.sum('value').alias('sum_value'))
average_played=userItem.agg(f.mean('n_item').alias('avg_played'))
data_sum=data_sum.join(userItem, on='userId').crossJoin(average_played)
data_sum=data_sum.withColumn("weighted_games_played", f.expr("sum_value*avg_played/n_item"))
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
5115 次 |
| 最近记录: |