Pyspark 列转换:计算列中每个组的百分比变化

shr*_*nar 5 pyspark

我在本地计算机上使用 Pyspark。我有一个包含 450 万行和大约 30,000 种不同股票的 Spark 数据框。我需要计算每只股票随时间变化的百分比。我已经运行了 orderBy,以便将所有股票分组在一起(如下例所示)。

下面是一个简化的示例数据框。

df = spark.read.csv("stock_price.txt", header=True, inferSchema=True)
df.show()

**Company**     **Price**
Company_A         100
Company_A         103
Company_A         105
Company_A         107
Company_B          23
Company_B          25
Company_B          28
Company_B          30
Run Code Online (Sandbox Code Playgroud)

我想要的输出是这样的

**Company**     **Price**     **%_Change**
Company_A         100              0
Company_A         103              3%
Company_A         105              2%
Company_A         107              2%
Company_B          23              0
Company_B          25              9%
Company_B          28              12%
Company_B          30              7%
Run Code Online (Sandbox Code Playgroud)

诀窍(在我看来)是设置一个可以做两件事的代码:1)每次上市新股票时进行识别2)开始计算该股票的第二次观察的百分比变化,并继续计算百分比变化直到最后一次观察。它需要从第二次观察开始,因为在第二次观察发生之前不会出现百分比变化。

Waq*_*qas 6

您可以使用window操作来实现此目的,理想情况下您将使用idtimestamp来排序列。为了举例,我使用company作为排序键。

from pyspark.sql import functions as F
from pyspark.sql.window import Window

df = spark.read.csv("stock_price.txt", header=True, inferSchema=True)
price_window = Window.partitionBy("company").orderBy("company")
df = df.withColumn("prev_value", F.lag(df.price).over(price_window))
df = df.withColumn("diff", F.when(F.isnull(df.price - df.prev_value), 0).otherwise(df.price - df.prev_value))

+---------+-----+----------+----+
|  company|price|prev_value|diff|
+---------+-----+----------+----+
|Company_B|   23|      null|   0|
|Company_B|   25|        23|   2|
|Company_B|   28|        25|   3|
|Company_B|   30|        28|   2|
|Company_A|  100|      null|   0|
|Company_A|  103|       100|   3|
|Company_A|  105|       103|   2|
|Company_A|  107|       105|   2|
+---------+-----+----------+----+
Run Code Online (Sandbox Code Playgroud)