Zyg*_*ygD 7 join apache-spark apache-spark-sql pyspark
更新:根本问题是Spark 3.2.0 中修复的一个错误。
两次运行中的输入 df 结构相同,但输出不同。只有第二次运行才返回所需的结果 ( df6)。我知道我可以使用数据帧的别名来返回所需的结果。
问题。Spark 创建的底层机制是什么df3?Spark读取df1.c1 == df2.c2了join'son子句,但很明显它没有关注提供的dfs。引擎盖下有什么?如何预见此类行为?
第一次运行(错误df3结果):
data = [
(1, 'bad', 'A'),
(4, 'ok', None)]
df1 = spark.createDataFrame(data, ['ID', 'Status', 'c1'])
df1 = df1.withColumn('c2', F.lit('A'))
df1.show()
#+---+------+----+---+
#| ID|Status| c1| c2|
#+---+------+----+---+
#| 1| bad| A| A|
#| 4| ok|null| A|
#+---+------+----+---+
df2 = df1.filter((F.col('Status') == 'ok'))
df2.show()
#+---+------+----+---+
#| ID|Status| c1| c2|
#+---+------+----+---+
#| 4| ok|null| A|
#+---+------+----+---+
df3 = df2.join(df1, (df1.c1 == df2.c2), 'full')
df3.show()
#+----+------+----+----+----+------+----+----+
#| ID|Status| c1| c2| ID|Status| c1| c2|
#+----+------+----+----+----+------+----+----+
#| 4| ok|null| A|null| null|null|null|
#|null| null|null|null| 1| bad| A| A|
#|null| null|null|null| 4| ok|null| A|
#+----+------+----+----+----+------+----+----+
Run Code Online (Sandbox Code Playgroud)
第二次运行(正确df6结果):
data = [
(1, 'bad', 'A', 'A'),
(4, 'ok', None, 'A')]
df4 = spark.createDataFrame(data, ['ID', 'Status', 'c1', 'c2'])
df4.show()
#+---+------+----+---+
#| ID|Status| c1| c2|
#+---+------+----+---+
#| 1| bad| A| A|
#| 4| ok|null| A|
#+---+------+----+---+
df5 = spark.createDataFrame(data, ['ID', 'Status', 'c1', 'c2']).filter((F.col('Status') == 'ok'))
df5.show()
#+---+------+----+---+
#| ID|Status| c1| c2|
#+---+------+----+---+
#| 4| ok|null| A|
#+---+------+----+---+
df6 = df5.join(df4, (df4.c1 == df5.c2), 'full')
df6.show()
#+----+------+----+----+---+------+----+---+
#| ID|Status| c1| c2| ID|Status| c1| c2|
#+----+------+----+----+---+------+----+---+
#|null| null|null|null| 4| ok|null| A|
#| 4| ok|null| A| 1| bad| A| A|
#+----+------+----+----+---+------+----+---+
Run Code Online (Sandbox Code Playgroud)
我可以看到物理计划的不同之处在于内部使用了不同的联接(BroadcastNestedLoopJoin和SortMergeJoin)。但这本身并不能解释为什么结果不同,因为对于不同的内部连接类型,结果应该仍然相同。
df3.explain()
== Physical Plan ==
BroadcastNestedLoopJoin BuildRight, FullOuter, (c1#23335 = A)
:- *(1) Project [ID#23333L, Status#23334, c1#23335, A AS c2#23339]
: +- *(1) Filter (isnotnull(Status#23334) AND (Status#23334 = ok))
: +- *(1) Scan ExistingRDD[ID#23333L,Status#23334,c1#23335]
+- BroadcastExchange IdentityBroadcastMode, [id=#9250]
+- *(2) Project [ID#23379L, Status#23380, c1#23381, A AS c2#23378]
+- *(2) Scan ExistingRDD[ID#23379L,Status#23380,c1#23381]
df6.explain()
== Physical Plan ==
SortMergeJoin [c2#23459], [c1#23433], FullOuter
:- *(2) Sort [c2#23459 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(c2#23459, 200), ENSURE_REQUIREMENTS, [id=#9347]
: +- *(1) Filter (isnotnull(Status#23457) AND (Status#23457 = ok))
: +- *(1) Scan ExistingRDD[ID#23456L,Status#23457,c1#23458,c2#23459]
+- *(4) Sort [c1#23433 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(c1#23433, 200), ENSURE_REQUIREMENTS, [id=#9352]
+- *(3) Scan ExistingRDD[ID#23431L,Status#23432,c1#23433,c2#23434]
Run Code Online (Sandbox Code Playgroud)
连接取决于连接数据帧的结构,但构建这些数据帧的方式也会产生影响。如果您加入的两个数据帧共享相同的lineage,则连接条件中可能会出现不明确的列问题,从而导致您在问题中描述的内容。
在第一次运行中,当您df2从构建时df1,两个数据帧共享相同的谱系。当您连接这两个数据帧时,实际上是在进行自连接,Spark 选择仅属于其中一个连接数据帧的错误列作为连接条件,从而产生笛卡尔积,后跟始终为 false 的过滤器。
在第二次运行中,由于两个数据帧是独立构建的,因此连接条件是通过两列之间的相等性正确定义的,每列属于不同的数据帧。因此 Spark 执行经典的连接。
正如pltc在他的回答中所解释的那样,在第一次运行时 Spark 不会为您的连接选择正确的列。让我们找出原因。
df1让我们从获取和df2使用的物理计划开始explain。这是物理计划df1:
== Physical Plan ==
*(1) Project [ID#0L, Status#1, c1#2, A AS c2#6]
+- *(1) Scan ExistingRDD[ID#0L,Status#1,c1#2]
Run Code Online (Sandbox Code Playgroud)
这是物理计划df2:
== Physical Plan ==
*(1) Project [ID#0L, Status#1, c1#2, A AS c2#6]
+- *(1) Filter (isnotnull(Status#1) AND (Status#1 = ok))
+- *(1) Scan ExistingRDD[ID#0L,Status#1,c1#2]
Run Code Online (Sandbox Code Playgroud)
您可以在第一行中看到两个数据框(1) Project开头,并具有相同的列名和 id:。这并不奇怪,因为是从创建的,所以您可以看到附加的转换。所以我们有以下参考:df1df2 [ID#0L, Status#1, c1#2, A AS c2#6]df2df1df2df1
df1.c1< df2.c1=><=>c1#2df1.c2< df2.c2=><=>A AS c2#6当您加入df1和时df2,这意味着您进行了自连接。并且您的条件的以下所有组合都将被转换为c1#2 = A AS c2#6,这将为您留下简化的连接条件c1#2 = A:
df1.c1 = df2.c2df1.c2 = df2.c1df2.c1 = df1.c2df2.c2 = df1.c1当您在 Spark 中执行自连接时,Spark 将重新生成正确数据帧的列 id,以避免最终数据帧中具有相同的列 id。因此,在您的情况下,它将重写 的列 ID df1。所以列c1#2将引用c1的列df2。
现在您的条件不包含 中的任何列df1,那么 Spark 将选择执行笛卡尔积作为连接策略。由于两个数据帧之一足够小,可以广播,因此所选算法将为BroadcastNestedLoopJoin。这就是物理计划所df3显示的内容:
== Physical Plan ==
BroadcastNestedLoopJoin BuildRight, FullOuter, (c1#2 = A)
:- *(1) Project [ID#0L, Status#1, c1#2, A AS c2#6]
: +- *(1) Filter (isnotnull(Status#1) AND (Status#1 = ok))
: +- *(1) Scan ExistingRDD[ID#0L,Status#1,c1#2]
+- BroadcastExchange IdentityBroadcastMode, [id=#75]
+- *(2) Project [ID#46L, Status#47, c1#48, A AS c2#45]
+- *(2) Scan ExistingRDD[ID#46L,Status#47,c1#48]
Run Code Online (Sandbox Code Playgroud)
df1请注意,现在的四个新列 ID是[ID#46L, Status#47, c1#48, A AS c2#45]。
当执行该计划时,对于 的唯一行df2, 的值c1与null不同A,因此连接条件始终为 false。当您选择完全外部联接时,您将获得三行(两行来自df1,一行来自df2),null其中列来自另一个数据帧:
+----+------+----+----+----+------+----+----+
| ID|Status| c1| c2| ID|Status| c1| c2|
+----+------+----+----+----+------+----+----+
| 4| ok|null| A|null| null|null|null|
|null| null|null|null| 1| bad| A| A|
|null| null|null|null| 4| ok|null| A|
+----+------+----+----+----+------+----+----+
Run Code Online (Sandbox Code Playgroud)
对于第二次运行,您创建两个独立的数据帧。因此,如果我们查看df4和的物理计划df5,您可以看到列 ID 不同。这是物理计划df4:
== Physical Plan ==
*(1) Scan ExistingRDD[ID#98L,Status#99,c1#100,c2#101]
Run Code Online (Sandbox Code Playgroud)
这是物理计划df5:
== Physical Plan ==
*(1) Filter (isnotnull(Status#124) AND (Status#124 = ok))
+- *(1) Scan ExistingRDD[ID#123L,Status#124,c1#125,c2#126]
Run Code Online (Sandbox Code Playgroud)
您的连接条件是c1#100 = c2#126、c1#100isc1列来自df4和c2#126isc2列来自df5。连接条件中相等的每一端都来自不同的数据帧,因此 Spark 可以按照您的预期执行连接。
从 Spark 3.0 开始,Spark 会检查您用于连接的列是否不明确。df2如果您在加入它们时颠倒了和的顺序,df1如下所示:
== Physical Plan ==
*(1) Project [ID#0L, Status#1, c1#2, A AS c2#6]
+- *(1) Filter (isnotnull(Status#1) AND (Status#1 = ok))
+- *(1) Scan ExistingRDD[ID#0L,Status#1,c1#2]
Run Code Online (Sandbox Code Playgroud)
你会得到以下错误:
pyspark.sql.utils.AnalysisException:列 c2#6 不明确。
那为什么我们执行的时候没有出现这个错误呢df2.join(df1, ...)?
您可以在Spark 代码中的DetectAmbigeousSelfJoin文件中找到答案:
// 当自连接发生时,分析器要求右侧计划生成
// 具有新 exprId 的属性。如果数据集的计划输出一个
由列引用引用的属性,并且该属性与 // 列引用的属性具有不同的 exprId
,则列引用是不明确的,因为它
// 引用了以下列:通过自连接重新生成。
这意味着在执行时df2.join(df1, ...),我们只会检查连接条件中使用的列df1。在我们的例子中,我们没有对 执行任何转换,与过滤df1相反,列的 exprIds 没有改变,因此不会引发不明确的列错误。df2df1
我在 Spark Jira 上创建了一个关于此行为的问题,请参阅SPARK-36874(该错误已在版本 3.2.0 中修复)。
您必须非常小心您的加入是否是自加入。如果您从 dataframe 开始df1,对其执行一些转换以获取df2,然后加入df1,则df2您可能会遇到这种行为。为了缓解这种情况,在进行连接时,您应该始终将原始数据帧作为第一个数据帧,因此df1.join(df2, ...)使用df2.join(df1, ...). 通过这样做,您将得到一个“Analysis Exception: Column x are ambiguous如果 Spark 无法选择正确的列”的信息。
| 归档时间: |
|
| 查看次数: |
1216 次 |
| 最近记录: |