在Spark / Python中向前填充缺少的值

use*_*577 6 hadoop apache-spark pyspark spark-dataframe apache-spark-mllib

我试图用以前的非空值(如果存在)填充我的Spark数据框中的缺失值。我已经在Python / Pandas中完成了这类工作,但是我的数据对于Pandas(在一个小型集群上)来说太大了,我是Spark noob。这是Spark可以做的吗?可以为多列使用吗?如果是这样,怎么办?如果没有,在谁的Hadoop工具套件中对替代方法有何建议?

谢谢!

Rom*_*ler 6

我已经找到了一种解决方案,该解决方案无需在这里使用Window即可进行编码。所以杰夫是正确的,有一个解决方案。完整的代码介绍,我将简要解释它的作用,有关更多详细信息,请参阅博客。

from pyspark.sql import Window
from pyspark.sql.functions import last
import sys

# define the window
window = Window.orderBy('time')\
               .rowsBetween(-sys.maxsize, 0)

# define the forward-filled column
filled_column_temperature = last(df6['temperature'], ignorenulls=True).over(window)

# do the fill 
spark_df_filled = df6.withColumn('temperature_filled',  filled_column_temperature)
Run Code Online (Sandbox Code Playgroud)

因此,我们的想法是通过始终包含实际行和所有先前行的数据定义一个窗口滑动(此处有更多滑动窗口):

    window = Window.orderBy('time')\
           .rowsBetween(-sys.maxsize, 0)
Run Code Online (Sandbox Code Playgroud)

请注意,我们按时间排序,因此数据顺序正确。另请注意,使用“ -sys.maxsize”可确保窗口始终包含所有先前的数据,并且在自上而下遍历数据时会不断增长,但是可能会有更有效的解决方案。

使用“ last”功能,我们总是在该窗口中寻址最后一行。通过传递“ ignorenulls = True”,我们定义了如果当前行为null,则该函数将返回窗口中的最新(最后一个)非null值。否则,将使用实际行的值。

做完了

  • 最好使用 `Window.unboundedPreceding` 而不是 `-sys.maxsize` https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=column#pyspark.sql.Window (6认同)