为什么 left_anti join 在 pyspark 中不能按预期工作?

hei*_*nzK 6 anti-join apache-spark-sql

在数据框中,我试图识别那些在 C2 列中具有值的行,而该值在任何其他行的 C1 列中都不存在。我尝试了以下代码:

in_df = sqlContext.createDataFrame([[1,None,'A'],[2,1,'B'],[3,None,'C'],[4,11,'D']],['C1','C2','C3'])
in_df.show()
    +---+----+---+
    | C1|  C2| C3|
    +---+----+---+
    |  1|null|  A|
    |  2|   1|  B|
    |  3|null|  C|
    |  4|  11|  D|
    +---+----+---+
filtered = in_df.filter(in_df.C2.isNotNull())
filtered.show()
    +---+---+---+
    | C1| C2| C3|
    +---+---+---+
    |  2|  1|  B|
    |  4| 11|  D|
    +---+---+---+
Run Code Online (Sandbox Code Playgroud)

现在应用 left_anti 连接预计只会返回第 4 行,但我也得到第 2 行:

filtered.join(in_df,(in_df.C1 == filtered.C2), 'left_anti').show()
    +---+---+---+
    | C1| C2| C3|
    +---+---+---+
    |  2|  1|  B|
    |  4| 11|  D|
    +---+---+---+
Run Code Online (Sandbox Code Playgroud)

如果我“具体化”过滤后的 DF,结果将如预期:

filtered = filtered.toDF(*filtered.columns)
filtered.join(in_df,(in_df.C1 == filtered.C2), 'left_anti').show()
    +---+---+---+
    | C1| C2| C3|
    +---+---+---+
    |  4| 11|  D|
    +---+---+---+
Run Code Online (Sandbox Code Playgroud)

为什么需要这个 .toDF?

MaF*_*aFF 4

in_df.C1实际上指的是一个filtered列,如以下代码所示:

in_df = sqlContext.createDataFrame([[1,None,'A'],[2,1,'B'],[3,None,'C'],[4,11,'D']],['C1','C2','C3'])
filtered = in_df.filter(in_df.C2.isNotNull()).select("C2")
filtered.join(in_df,(in_df.C1 == filtered.C2), 'left_anti').show()
Run Code Online (Sandbox Code Playgroud)

Py4JJavaError:调用 o699.join 时发生错误。:org.apache.spark.sql.AnalysisException:无法解析in_df.C1给定输入列的“”:[C2,C1,C2,C3];;'加入 LeftAnti, ('in_df.C1 = 'filtered.C2) :- 项目 [C2#891L] : +- 过滤器 isnotnull(C2#891L) : +- 逻辑RDD [C1#890L, C2#891L, C3#892] +- 逻辑RDD [C1#900L、C2#901L、C3#902]

所以基本上,当加入 2 个数据帧时,您实际上使用的是条件filtered.C1 == filtered.C2

filtered = in_df.filter(in_df.C2.isNotNull())
filtered.join(in_df,(filtered.C1 == filtered.C2), 'left_anti').show()

    +---+---+---+
    | C1| C2| C3|
    +---+---+---+
    |  2|  1|  B|
    |  4| 11|  D|
    +---+---+---+
Run Code Online (Sandbox Code Playgroud)

您可能已经更改了数据框的名称,但仍然可以通过调用来引用其中的列in_df.Ci。为了确保您引用正确的数据框,您可以使用别名:

import pyspark.sql.functions as psf
filtered.alias("filtered").join(in_df.alias("in_df"),(psf.col("in_df.C1") == psf.col("filtered.C2")), 'left_anti').show()

    +---+---+---+
    | C1| C2| C3|
    +---+---+---+
    |  4| 11|  D|
    +---+---+---+
Run Code Online (Sandbox Code Playgroud)

处理列名歧义的最佳方法是从一开始就避免它们(重命名列或使用数据框的别名)。