pr3*_*338 0 python apache-spark pyspark spark-dataframe
代码:
w = Window().partitionBy("ticker").orderBy("date")
x = s_df.withColumn("daily_return", (col("close") - lag("close", 1).over(w)) / lag("close", 1).over(w))
Run Code Online (Sandbox Code Playgroud)
s_df 的样子:
+----------+------+------+------+------+--------+------+
| date| open| high| low| close| volume|ticker|
+----------+------+------+------+------+--------+------+
|2016-11-02| 111.4|112.35|111.23|111.59|28331709| AAPL|
|2016-11-01|113.46|113.77|110.53|111.49|43825812| AAPL|
|2016-10-31|113.65|114.23| 113.2|113.54|26419398| AAPL|
+----------+------+------+------+------+--------+------+
Run Code Online (Sandbox Code Playgroud)
那么 X 的样子:
+----------+--------------------+
| date| avg(daily_return)|
+----------+--------------------+
|2015-12-28|0.004124786535090563|
|2015-11-20|0.006992226387807268|
|2015-12-29| 0.01730500286123971|
Run Code Online (Sandbox Code Playgroud)
我想找到每组股票的 avg(daily_return) 的标准偏差。
我试过的:
x.agg(stddev("avg(daily_return)")).over(w)
Run Code Online (Sandbox Code Playgroud)
我收到此错误:
AttributeError: 'DataFrame' object has no attribute 'over'
Run Code Online (Sandbox Code Playgroud)
我正在尝试做的事情是不可能的,还是有另一种方法可以做到?
小智 5
ticker是一个不同的维度,data因此您不能将这两个聚合在一起。你可以:
s_df_w_daily_rets = s_df.withColumn("daily_return",
(col("close") - lag("close", 1).over(w)) / lag("close", 1).over(w))
s_df_w_daily_rets.groupBy("date").agg(avg("daily_return"))
s_df_w_daily_rets.groupBy("ticker").agg(stddev("daily_return"))
Run Code Online (Sandbox Code Playgroud)
groupBy("date","ticker").agg(..) 没有意义,因为你只有一个股票代码date,ticker组和标准偏差将是未定义的。