为什么连接结构相同的数据帧会产生不同的结果?

Zyg*_*ygD 7 join apache-spark apache-spark-sql pyspark

更新:根本问题是Spark 3.2.0 中修复的一个错误。


两次运行中的输入 df 结构相同,但输出不同。只有第二次运行才返回所需的结果 ( df6)。我知道我可以使用数据帧的别名来返回所需的结果。

问题。Spark 创建的底层机制是什么df3?Spark读取df1.c1 == df2.c2join'son子句,但很明显它没有关注提供的dfs。引擎盖下有什么?如何预见此类行为?

第一次运行(错误df3结果):

data = [
    (1, 'bad', 'A'),
    (4, 'ok', None)]
df1 = spark.createDataFrame(data, ['ID', 'Status', 'c1'])
df1 = df1.withColumn('c2', F.lit('A'))
df1.show()

#+---+------+----+---+
#| ID|Status|  c1| c2|
#+---+------+----+---+
#|  1|   bad|   A|  A|
#|  4|    ok|null|  A|
#+---+------+----+---+

df2 = df1.filter((F.col('Status') == 'ok'))
df2.show()

#+---+------+----+---+
#| ID|Status|  c1| c2|
#+---+------+----+---+
#|  4|    ok|null|  A|
#+---+------+----+---+

df3 = df2.join(df1, (df1.c1 == df2.c2), 'full')
df3.show()

#+----+------+----+----+----+------+----+----+
#|  ID|Status|  c1|  c2|  ID|Status|  c1|  c2|
#+----+------+----+----+----+------+----+----+
#|   4|    ok|null|   A|null|  null|null|null|
#|null|  null|null|null|   1|   bad|   A|   A|
#|null|  null|null|null|   4|    ok|null|   A|
#+----+------+----+----+----+------+----+----+
Run Code Online (Sandbox Code Playgroud)

第二次运行(正确df6结果):

data = [
    (1, 'bad', 'A', 'A'),
    (4, 'ok', None, 'A')]
df4 = spark.createDataFrame(data, ['ID', 'Status', 'c1', 'c2'])
df4.show()

#+---+------+----+---+
#| ID|Status|  c1| c2|
#+---+------+----+---+
#|  1|   bad|   A|  A|
#|  4|    ok|null|  A|
#+---+------+----+---+

df5 = spark.createDataFrame(data, ['ID', 'Status', 'c1', 'c2']).filter((F.col('Status') == 'ok'))
df5.show()

#+---+------+----+---+
#| ID|Status|  c1| c2|
#+---+------+----+---+
#|  4|    ok|null|  A|
#+---+------+----+---+

df6 = df5.join(df4, (df4.c1 == df5.c2), 'full')
df6.show()

#+----+------+----+----+---+------+----+---+
#|  ID|Status|  c1|  c2| ID|Status|  c1| c2|
#+----+------+----+----+---+------+----+---+
#|null|  null|null|null|  4|    ok|null|  A|
#|   4|    ok|null|   A|  1|   bad|   A|  A|
#+----+------+----+----+---+------+----+---+
Run Code Online (Sandbox Code Playgroud)

我可以看到物理计划的不同之处在于内部使用了不同的联接(BroadcastNestedLoopJoinSortMergeJoin)。但这本身并不能解释为什么结果不同,因为对于不同的内部连接类型,结果应该仍然相同。

df3.explain()

== Physical Plan ==
BroadcastNestedLoopJoin BuildRight, FullOuter, (c1#23335 = A)
:- *(1) Project [ID#23333L, Status#23334, c1#23335, A AS c2#23339]
:  +- *(1) Filter (isnotnull(Status#23334) AND (Status#23334 = ok))
:     +- *(1) Scan ExistingRDD[ID#23333L,Status#23334,c1#23335]
+- BroadcastExchange IdentityBroadcastMode, [id=#9250]
   +- *(2) Project [ID#23379L, Status#23380, c1#23381, A AS c2#23378]
      +- *(2) Scan ExistingRDD[ID#23379L,Status#23380,c1#23381]

df6.explain()

== Physical Plan ==
SortMergeJoin [c2#23459], [c1#23433], FullOuter
:- *(2) Sort [c2#23459 ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(c2#23459, 200), ENSURE_REQUIREMENTS, [id=#9347]
:     +- *(1) Filter (isnotnull(Status#23457) AND (Status#23457 = ok))
:        +- *(1) Scan ExistingRDD[ID#23456L,Status#23457,c1#23458,c2#23459]
+- *(4) Sort [c1#23433 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(c1#23433, 200), ENSURE_REQUIREMENTS, [id=#9352]
      +- *(3) Scan ExistingRDD[ID#23431L,Status#23432,c1#23433,c2#23434]
Run Code Online (Sandbox Code Playgroud)

Vin*_*oba 4

连接取决于连接数据帧的结构,但构建这些数据帧的方式也会产生影响。如果您加入的两个数据帧共享相同的lineage,则连接条件中可能会出现不明确的列问题,从而导致您在问题中描述的内容。

在第一次运行中,当您df2从构建时df1,两个数据帧共享相同的谱系。当您连接这两个数据帧时,实际上是在进行自连接,Spark 选择仅属于其中一个连接数据帧的错误列作为连接条件,从而产生笛卡尔积,后跟始终为 false 的过滤器。

在第二次运行中,由于两个数据帧是独立构建的,因此连接条件是通过两列之间的相等性正确定义的,每列属于不同的数据帧。因此 Spark 执行经典的连接。


详细解释

正如pltc在他的回答中所解释的那样,在第一次运行时 Spark 不会为您的连接选择正确的列。让我们找出原因。

引擎盖下是什么?

df1让我们从获取和df2使用的物理计划开始explain。这是物理计划df1

== Physical Plan ==
*(1) Project [ID#0L, Status#1, c1#2, A AS c2#6]
+- *(1) Scan ExistingRDD[ID#0L,Status#1,c1#2]
Run Code Online (Sandbox Code Playgroud)

这是物理计划df2

== Physical Plan ==
*(1) Project [ID#0L, Status#1, c1#2, A AS c2#6]
+- *(1) Filter (isnotnull(Status#1) AND (Status#1 = ok))
   +- *(1) Scan ExistingRDD[ID#0L,Status#1,c1#2]
Run Code Online (Sandbox Code Playgroud)

您可以在第一行中看到两个数据框(1) Project开头,并具有相同的列名和 id:。这并不奇怪,因为是从创建的,所以您可以看到附加的转换。所以我们有以下参考:df1df2 [ID#0L, Status#1, c1#2, A AS c2#6]df2df1df2df1

  • df1.c1< df2.c1=><=>c1#2
  • df1.c2< df2.c2=><=>A AS c2#6

当您加入df1和时df2,这意味着您进行了自连接。并且您的条件的以下所有组合都将被转换为c1#2 = A AS c2#6,这将为您留下简化的连接条件c1#2 = A

  • df1.c1 = df2.c2
  • df1.c2 = df2.c1
  • df2.c1 = df1.c2
  • df2.c2 = df1.c1

当您在 Spark 中执行自连接时,Spark 将重新生成正确数据帧的列 id,以避免最终数据帧中具有相同的列 id。因此,在您的情况下,它将重写 的列 ID df1所以列c1#2将引用c1的列df2

现在您的条件不包含 中的任何列df1,那么 Spark 将选择执行笛卡尔积作为连接策略。由于两个数据帧之一足够小,可以广播,因此所选算法将为BroadcastNestedLoopJoin。这就是物理计划所df3显示的内容:

== Physical Plan ==
BroadcastNestedLoopJoin BuildRight, FullOuter, (c1#2 = A)
:- *(1) Project [ID#0L, Status#1, c1#2, A AS c2#6]
:  +- *(1) Filter (isnotnull(Status#1) AND (Status#1 = ok))
:     +- *(1) Scan ExistingRDD[ID#0L,Status#1,c1#2]
+- BroadcastExchange IdentityBroadcastMode, [id=#75]
   +- *(2) Project [ID#46L, Status#47, c1#48, A AS c2#45]
      +- *(2) Scan ExistingRDD[ID#46L,Status#47,c1#48]
Run Code Online (Sandbox Code Playgroud)

df1请注意,现在的四个新列 ID是[ID#46L, Status#47, c1#48, A AS c2#45]

当执行该计划时,对于 的唯一行df2, 的值c1null不同A,因此连接条件始终为 false。当您选择完全外部联接时,您将获得三行(两行来自df1,一行来自df2),null其中列来自另一个数据帧:

+----+------+----+----+----+------+----+----+
|  ID|Status|  c1|  c2|  ID|Status|  c1|  c2|
+----+------+----+----+----+------+----+----+
|   4|    ok|null|   A|null|  null|null|null|
|null|  null|null|null|   1|   bad|   A|   A|
|null|  null|null|null|   4|    ok|null|   A|
+----+------+----+----+----+------+----+----+
Run Code Online (Sandbox Code Playgroud)

为什么第二次运行我得到了所需的输出?

对于第二次运行,您创建两个独立的数据帧。因此,如果我们查看df4和的物理计划df5,您可以看到列 ID 不同。这是物理计划df4

== Physical Plan ==
*(1) Scan ExistingRDD[ID#98L,Status#99,c1#100,c2#101]
Run Code Online (Sandbox Code Playgroud)

这是物理计划df5

== Physical Plan ==
*(1) Filter (isnotnull(Status#124) AND (Status#124 = ok))
+- *(1) Scan ExistingRDD[ID#123L,Status#124,c1#125,c2#126]
Run Code Online (Sandbox Code Playgroud)

您的连接条件是c1#100 = c2#126c1#100isc1列来自df4c2#126isc2列来自df5。连接条件中相等的每一端都来自不同的数据帧,因此 Spark 可以按照您的预期执行连接。

为什么这没有被检测为不明确的自连接?

从 Spark 3.0 开始,Spark 会检查您用于连接的列是否不明确。df2如果您在加入它们时颠倒了和的顺序,df1如下所示:

== Physical Plan ==
*(1) Project [ID#0L, Status#1, c1#2, A AS c2#6]
+- *(1) Filter (isnotnull(Status#1) AND (Status#1 = ok))
   +- *(1) Scan ExistingRDD[ID#0L,Status#1,c1#2]
Run Code Online (Sandbox Code Playgroud)

你会得到以下错误:

pyspark.sql.utils.AnalysisException:列 c2#6 不明确。

那为什么我们执行的时候没有出现这个错误呢df2.join(df1, ...)

您可以在Spark 代码中的DetectAmbigeousSelfJoin文件中找到答案:

// 当自连接发生时,分析器要求右侧计划生成
// 具有新 exprId 的属性。如果数据集的计划输出一个
由列引用引用的属性,并且该属性与 // 列引用的属性具有不同的 exprId
,则列引用是不明确的,因为它
// 引用了以下列:通过自连接重新生成。

这意味着在执行时df2.join(df1, ...),我们只会检查连接条件中使用的列df1。在我们的例子中,我们没有对 执行任何转换,与过滤df1相反,列的 exprIds 没有改变,因此不会引发不明确的列错误。df2df1

我在 Spark Jira 上创建了一个关于此行为的问题,请参阅SPARK-36874(该错误已在版本 3.2.0 中修复)。

如何预见此类行为?

您必须非常小心您的加入是否是自加入。如果您从 dataframe 开始df1,对其执行一些转换以获取df2,然后加入df1,则df2您可能会遇到这种行为。为了缓解这种情况,在进行连接时,您应该始终将原始数据帧作为第一个数据帧,因此df1.join(df2, ...)使用df2.join(df1, ...). 通过这样做,您将得到一个“Analysis Exception: Column x are ambiguous如果 Spark 无法选择正确的列”的信息。