Pyspark - 如何回填数据帧?

Adr*_*rie 6 pandas pyspark spark-dataframe pyspark-sql

你怎么能做与df.fillna(method='bfill')带有 的熊猫数据框相同的事情pyspark.sql.DataFrame

pyspark 数据框具有该pyspark.sql.DataFrame.fillna方法,但不支持method参数。


在 Pandas 中,您可以使用以下内容回填时间序列:

创建数据

import pandas as pd

index = pd.date_range('2017-01-01', '2017-01-05')
data = [1, 2, 3, None, 5]

df = pd.DataFrame({'data': data}, index=index)
Run Code Online (Sandbox Code Playgroud)

给予

Out[1]:
            data
2017-01-01  1.0
2017-01-02  2.0
2017-01-03  3.0
2017-01-04  NaN
2017-01-05  5.0
Run Code Online (Sandbox Code Playgroud)

回填数据框

df = df.fillna(method='bfill')
Run Code Online (Sandbox Code Playgroud)

生成回填框架

Out[2]:
            data
2017-01-01  1.0
2017-01-02  2.0
2017-01-03  3.0
2017-01-04  5.0
2017-01-05  5.0
Run Code Online (Sandbox Code Playgroud)

如何为 a 做同样的事情pyspark.sql.DataFrame

Joh*_*roh 6

last函数first及其ignorenulls=True标志可以与窗口结合起来rowsBetween。如果我们想向后填充,我们选择当前行和末尾之间的第一个非空值。如果我们想向前填充,我们选择起始行和当前行之间的最后一个非空值。

from pyspark.sql import functions as F
from pyspark.sql.window import Window as W
import sys

df.withColumn(
  'data',
  F.first(
    F.col('data'),
    ignorenulls=True
  ) \
    .over(
      W.orderBy('date').rowsBetween(0, sys.maxsize)
    )
  )
Run Code Online (Sandbox Code Playgroud)

填充 Spark 的来源:https://towardsdatascience.com/end-to-end-time-series-interpolation-in-pyspark-filling-the-gap-5ccefc6b7fc9


Mar*_*usz 4

实际上,分布式数据集的回填并不像 pandas(本地)数据帧那样简单 - 您无法确定要填充的值存在于同一分区中。我将结合使用 crossJoin 和窗口,例如 DF:

df = spark.createDataFrame([
    ('2017-01-01', None), 
    ('2017-01-02', 'B'), 
    ('2017-01-03', None), 
    ('2017-01-04', None), 
    ('2017-01-05', 'E'), 
    ('2017-01-06', None), 
    ('2017-01-07', 'G')], ['date', 'value'])
df.show()

+----------+-----+
|      date|value|
+----------+-----+
|2017-01-01| null|
|2017-01-02|    B|
|2017-01-03| null|
|2017-01-04| null|
|2017-01-05|    E|
|2017-01-06| null|
|2017-01-07|    G|
+----------+-----+
Run Code Online (Sandbox Code Playgroud)

代码是:

from pyspark.sql.window import Window

df.alias('a').crossJoin(df.alias('b')) \
    .where((col('b.date') >= col('a.date')) & (col('a.value').isNotNull() | col('b.value').isNotNull())) \
    .withColumn('rn', row_number().over(Window.partitionBy('a.date').orderBy('b.date'))) \
    .where(col('rn') == 1) \
    .select('a.date', coalesce('a.value', 'b.value').alias('value')) \
    .orderBy('a.date') \
    .show()

+----------+-----+
|      date|value|
+----------+-----+
|2017-01-01|    B|
|2017-01-02|    B|
|2017-01-03|    E|
|2017-01-04|    E|
|2017-01-05|    E|
|2017-01-06|    G|
|2017-01-07|    G|
+----------+-----+
Run Code Online (Sandbox Code Playgroud)