具有不同窗口规格的链式火花列表达式会产生低效的 DAG

pan*_*sen 9 python dataframe directed-acyclic-graphs apache-spark pyspark

语境

假设您处理时间序列数据。您想要的结果依赖于具有不同窗口规格的多个窗口函数。结果可能类似于单个火花列表达式,例如间隔标识符。

现状

通常,我不使用df.withColumn链式/堆栈列表达式来存储中间结果,并且相信 Spark 会找到最有效的 DAG(在处理 DataFrame 时)。

可重现的例子

但是,在以下示例(PySpark 2.4.4 独立版)中,存储中间结果df.withColumn降低了 DAG 的复杂性。让我们考虑以下测试设置:

import pandas as pd
import numpy as np

from pyspark.sql import SparkSession, Window
from pyspark.sql import functions as F

spark = SparkSession.builder.getOrCreate()

dfp = pd.DataFrame(
    {
        "col1": np.random.randint(0, 5, size=100),
        "col2": np.random.randint(0, 5, size=100),
        "col3": np.random.randint(0, 5, size=100),
        "col4": np.random.randint(0, 5, size=100),        
    }
)

df = spark.createDataFrame(dfp)
df.show(5)
Run Code Online (Sandbox Code Playgroud)
import pandas as pd
import numpy as np

from pyspark.sql import SparkSession, Window
from pyspark.sql import functions as F

spark = SparkSession.builder.getOrCreate()

dfp = pd.DataFrame(
    {
        "col1": np.random.randint(0, 5, size=100),
        "col2": np.random.randint(0, 5, size=100),
        "col3": np.random.randint(0, 5, size=100),
        "col4": np.random.randint(0, 5, size=100),        
    }
)

df = spark.createDataFrame(dfp)
df.show(5)
Run Code Online (Sandbox Code Playgroud)

计算是任意的。基本上我们有 2 个窗口规格和 3 个计算步骤。3 个计算步骤相互依赖,并使用交替窗口规格:

w1 = Window.partitionBy("col1").orderBy("col2")
w2 = Window.partitionBy("col3").orderBy("col4")

# first step, arbitrary window func over 1st window
step1 = F.lag("col3").over(w1)

# second step, arbitrary window func over 2nd window with step 1
step2 = F.lag(step1).over(w2)

# third step, arbitrary window func over 1st window with step 2
step3 = F.when(step2 > 1, F.max(step2).over(w1))

df_result = df.withColumn("result", step3)
Run Code Online (Sandbox Code Playgroud)

通过查看物理计划df_result.explain()显示 4 个交换和排序!但是,这里只需要 3 个,因为我们只更改了两次窗口规范。

df_result.explain()
Run Code Online (Sandbox Code Playgroud)
+----+----+----+----+
|col1|col2|col3|col4|
+----+----+----+----+
|   1|   2|   4|   1|
|   0|   2|   3|   0|
|   2|   0|   1|   0|
|   4|   1|   1|   2|
|   1|   3|   0|   4|
+----+----+----+----+
only showing top 5 rows
Run Code Online (Sandbox Code Playgroud)

改进

为了获得更好的 DAG,我们稍微修改代码以存储step2with的列表达式,withColumn并只传递此列的引用。新的逻辑计划确实只需要 3 次 shuffle!

w1 = Window.partitionBy("col1").orderBy("col2")
w2 = Window.partitionBy("col3").orderBy("col4")

# first step, arbitrary window func
step1 = F.lag("col3").over(w1)

# second step, arbitrary window func over 2nd window with step 1
step2 = F.lag(step1).over(w2)

# save temporary
df = df.withColumn("tmp_variable", step2)
step2 = F.col("tmp_variable")

# third step, arbitrary window func over 1st window with step 2
step3 = F.when(step2 > 1, F.max(step2).over(w1))

df_result = df.withColumn("result", step3).drop("tmp_variable")
df_result.explain()
Run Code Online (Sandbox Code Playgroud)
w1 = Window.partitionBy("col1").orderBy("col2")
w2 = Window.partitionBy("col3").orderBy("col4")

# first step, arbitrary window func over 1st window
step1 = F.lag("col3").over(w1)

# second step, arbitrary window func over 2nd window with step 1
step2 = F.lag(step1).over(w2)

# third step, arbitrary window func over 1st window with step 2
step3 = F.when(step2 > 1, F.max(step2).over(w1))

df_result = df.withColumn("result", step3)
Run Code Online (Sandbox Code Playgroud)

关联

我的原始示例更加复杂,导致 DAG 的差异更大(在真实世界数据上最多慢 10 倍)

有没有人对这种奇怪的行为有答案?我认为堆叠/链接列表达式是最佳实践,因为它允许 Spark 最有效地优化中间步骤(与为中间结果创建引用相反)。

小智 0

如果我们查看分析的逻辑计划, (by=df_result.explain(True))我们可以看到,虽然我们没有tmp_variable,但由于**lazy evaluation**创建逻辑计划过程中的数据集/数据帧/表,分析器假设该列存在(惰性)对该列执行分析。由于这个假设,现在需要比之前的情况少构建 2 个腋窗才能达到相同的结果。实际上,通过遵循解析的逻辑计划,(windowspecdefinition)我们看到分析器在创建位置时需要构建较少的未评估窗口,tmp_variable而不是以其下推方式构建窗口,它主要执行简单的项目(选择)。