Flu*_*uxy 12 python apache-spark apache-spark-sql pyspark
这是我在 PySpark 中的数据框:
utc_timestamp data feed
2015-10-13 11:00:00+00:00 1 A
2015-10-13 12:00:00+00:00 5 A
2015-10-13 13:00:00+00:00 6 A
2015-10-13 14:00:00+00:00 10 B
2015-10-13 15:00:00+00:00 11 B
Run Code Online (Sandbox Code Playgroud)
的值data是累积的。
我想得到这个结果(连续行之间的差异,按 分组feed):
utc_timestamp data feed
2015-10-13 11:00:00+00:00 1 A
2015-10-13 12:00:00+00:00 4 A
2015-10-13 13:00:00+00:00 1 A
2015-10-13 14:00:00+00:00 10 B
2015-10-13 15:00:00+00:00 1 B
Run Code Online (Sandbox Code Playgroud)
我会这样做pandas:
df["data"] -= (df.groupby("feed")["data"].shift(fill_value=0))
Run Code Online (Sandbox Code Playgroud)
我如何在 PySpark 中做同样的事情?
Sha*_*per 21
您可以使用带有窗口的滞后函数来做到这一点:
from pyspark.sql.window import Window
import pyspark.sql.functions as f
window = Window.partitionBy("feed").orderBy("utc_timestamp")
df = df.withColumn("data", f.col("data") - f.lag(f.col("data"), 1, 0).over(window))
Run Code Online (Sandbox Code Playgroud)
您可以用作lag的替代品shift和coalesce( , F.lit(0))的替代品fill_value=0
from pyspark.sql.window import Window
import pyspark.sql.functions as F
window = Window.partitionBy("feed").orderBy("utc_timestamp")
data = F.col("data") - F.coalesce(F.lag(F.col("data")).over(window), F.lit(0))
df.withColumn("data", data)
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
13763 次 |
| 最近记录: |