Flu*_*uxy 3 python pyspark pyspark-dataframes
我在 PySpark 中有以下 DataFrame:
Id DateActual DateStart DateEnd SourceCode
107 2019-08-11 00:00:00 null null 1111
107 2019-08-16 00:00:00 2019-08-11 00:00:00 2019-08-18 00:00:00 1111
128 2019-02-11 00:00:00 null null 101
128 2019-02-13 00:00:00 2019-02-11 00:00:00 2019-02-18 00:00:00 168
128 2019-02-14 00:00:00 2019-02-13 00:00:00 2019-02-20 00:00:00 187
Run Code Online (Sandbox Code Playgroud)
我需要替换null值以获得以下结果:
Id DateActual DateStart DateEnd SourceCode
107 2019-08-11 00:00:00 2019-08-11 00:00:00 2019-08-18 00:00:00 1111
107 2019-08-16 00:00:00 2019-08-11 00:00:00 2019-08-18 00:00:00 1111
128 2019-02-11 00:00:00 2019-02-11 00:00:00 2019-02-18 00:00:00 101
128 2019-02-13 00:00:00 2019-02-11 00:00:00 2019-02-18 00:00:00 168
128 2019-02-14 00:00:00 2019-02-13 00:00:00 2019-02-20 00:00:00 187
Run Code Online (Sandbox Code Playgroud)
基本上,DateStart并DateEnd与null值等于DateStart和DateEnd所述的NEXT行如果它具有相同的Id。
如何null在 PySpark 中按照上述逻辑填写值?
数据框:
df = (
sc.parallelize([
(107, "2019-08-11 00:00:00", None, None, 1111),
(107, "2019-08-16 00:00:00", "2019-08-11 00:00:00", "2019-08-18 00:00:00", 1111),
(128, "2019-02-11 00:00:00", None, None, 101),
(128, "2019-02-13 00:00:00", "2019-02-11 00:00:00", "2019-02-11 00:00:00", 168),
(128, "2019-02-14 00:00:00", "2019-02-13 00:00:00", "2019-02-20 00:00:00", 187)
]).toDF(["Id", "DateActual", "DateStart", "DateEnd", "SourceCode"])
)
Run Code Online (Sandbox Code Playgroud)
这是我尝试过的:
from pyspark.sql.functions import col, when
import pyspark.sql.functions as F
from pyspark.sql.window import Window
my_window = Window.partitionBy("Id").orderBy("DateActual")
df.withColumn("DateStart_start", when(col("DateStart").isNull(), F.lag(df.DateStart).over(my_window)).otherwise(col("DateStart"))).show()
Run Code Online (Sandbox Code Playgroud)
我不需要像df.na.fill(0). 我需要null用 NEXT ROW 值替换值,这可能假设使用lag或其他类似的函数。
使用first自pyspark.sql.functions:
from pyspark.sql import Window
from pyspark.sql.functions import first
# define the window
window = Window.partitionBy('Id')\
.orderBy('DateActual')\
.rowsBetween(0,sys.maxsize)
# define the back-filled column
filled_column_start = first(spark_df['DateStart'], ignorenulls=True).over(window)
filled_column_end = first(spark_df['DateEnd'], ignorenulls=True).over(window)
# do the fill
spark_df_filled = spark_df.withColumn('filled_start', filled_column_start)
spark_df_filled = spark_df_filled .withColumn('filled_end', filled_column_end)
# show off our glorious achievements
spark_df_filled.orderBy('Id').show(10)
Run Code Online (Sandbox Code Playgroud)