我在本地计算机上使用 Pyspark。我有一个包含 450 万行和大约 30,000 种不同股票的 Spark 数据框。我需要计算每只股票随时间变化的百分比。我已经运行了 orderBy,以便将所有股票分组在一起(如下例所示)。
下面是一个简化的示例数据框。
df = spark.read.csv("stock_price.txt", header=True, inferSchema=True)
df.show()
**Company** **Price**
Company_A 100
Company_A 103
Company_A 105
Company_A 107
Company_B 23
Company_B 25
Company_B 28
Company_B 30
Run Code Online (Sandbox Code Playgroud)
我想要的输出是这样的
**Company** **Price** **%_Change**
Company_A 100 0
Company_A 103 3%
Company_A 105 2%
Company_A 107 2%
Company_B 23 0
Company_B 25 9%
Company_B 28 12%
Company_B 30 7%
Run Code Online (Sandbox Code Playgroud)
诀窍(在我看来)是设置一个可以做两件事的代码:1)每次上市新股票时进行识别2)开始计算该股票的第二次观察的百分比变化,并继续计算百分比变化直到最后一次观察。它需要从第二次观察开始,因为在第二次观察发生之前不会出现百分比变化。
您可以使用window操作来实现此目的,理想情况下您将使用id或timestamp来排序列。为了举例,我使用company作为排序键。
from pyspark.sql import functions as F
from pyspark.sql.window import Window
df = spark.read.csv("stock_price.txt", header=True, inferSchema=True)
price_window = Window.partitionBy("company").orderBy("company")
df = df.withColumn("prev_value", F.lag(df.price).over(price_window))
df = df.withColumn("diff", F.when(F.isnull(df.price - df.prev_value), 0).otherwise(df.price - df.prev_value))
+---------+-----+----------+----+
| company|price|prev_value|diff|
+---------+-----+----------+----+
|Company_B| 23| null| 0|
|Company_B| 25| 23| 2|
|Company_B| 28| 25| 3|
|Company_B| 30| 28| 2|
|Company_A| 100| null| 0|
|Company_A| 103| 100| 3|
|Company_A| 105| 103| 2|
|Company_A| 107| 105| 2|
+---------+-----+----------+----+
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
8520 次 |
| 最近记录: |