pyspark - 使用 OR 条件连接

Ric*_*c S 2 python join dataframe apache-spark pyspark

如果至少满足两个条件之一,我想加入两个 pyspark 数据帧。

玩具数据:

df1 = spark.createDataFrame([
    (10, 1, 666),
    (20, 2, 777),
    (30, 1, 888),
    (40, 3, 999),
    (50, 1, 111),
    (60, 2, 222),
    (10, 4, 333),
    (50, None, 444),
    (10, 0, 555),
    (50, 0, 666)
    ],
    ['var1', 'var2', 'other_var'] 
)

df2 = spark.createDataFrame([
    (10, 1),
    (20, 2),
    (30, None),
    (30, 0)
    ],
    ['var1_', 'var2_'] 
)
Run Code Online (Sandbox Code Playgroud)

我想维护所有那些存在于 的不同值中的行df1var1或者df2.var1_ 存在 var2于 的不同值中的所有行df2.var2_(但不是在该值为 0 的情况下)。

因此,预期输出将是

+----+----+---------+-----+-----+
|var1|var2|other_var|var1_|var2_|
+----+----+---------+-----+-----+
|  10|   1|      666|   10|    1|   # join on both var1 and var2
|  20|   2|      777|   20|    2|   # join on both var1 and var2
|  30|   1|      888|   10|    1|   # join on both var1 and var2
|  50|   1|      111|   10|    1|   # join on var2
|  60|   2|      222|   20|    2|   # join on var2
|  10|   4|      333|   10|    1|   # join on var1
|  10|   0|      555|   10|    1|   # join on var1
+----+----+---------+-----+-----+
Run Code Online (Sandbox Code Playgroud)

在其他尝试中,我尝试过

cond = [(df1.var1 == (df2.select('var1_').distinct()).var1_) | (df1.var2 == (df2.filter(F.col('var2_') != 0).select('var2_').distinct()).var2_)]
df1\
    .join(df2, how='inner', on=cond)\
    .show()

+----+----+---------+-----+-----+
|var1|var2|other_var|var1_|var2_|
+----+----+---------+-----+-----+
|  10|   1|      666|   10|    1|
|  20|   2|      777|   20|    2|
|  30|   1|      888|   10|    1|
|  50|   1|      111|   10|    1|
|  30|   1|      888|   30| null|
|  30|   1|      888|   30|    0|
|  60|   2|      222|   20|    2|
|  10|   4|      333|   10|    1|
|  10|   0|      555|   10|    1|
|  10|   0|      555|   30|    0|
|  50|   0|      666|   30|    0|
+----+----+---------+-----+-----+
Run Code Online (Sandbox Code Playgroud)

但我获得的行数比预期多,并且行var2 == 0也被保留。

我究竟做错了什么?

注意:我没有使用该.isin方法,因为我的实际数据df2大约有 20k 行,并且我在这里读到,这种具有大量 ID 的方法可能会产生较差的性能。

mck*_*mck 5

尝试以下条件:

cond = (df2.var2_ != 0) & ((df1.var1 == df2.var1_) | (df1.var2 == df2.var2_))
df1\
    .join(df2, how='inner', on=cond)\
    .show()

+----+----+---------+-----+-----+
|var1|var2|other_var|var1_|var2_|
+----+----+---------+-----+-----+
|  10|   1|      666|   10|    1|
|  30|   1|      888|   10|    1|
|  20|   2|      777|   20|    2|
|  50|   1|      111|   10|    1|
|  60|   2|      222|   20|    2|
|  10|   4|      333|   10|    1|
|  10|   0|      555|   10|    1|
+----+----+---------+-----+-----+
Run Code Online (Sandbox Code Playgroud)

条件应仅包括要连接的两个数据帧中的列。如果要删除var2_ = 0,可以将它们作为连接条件,而不是作为过滤器。

也不需要指定distinct,因为它不影响相等条件,并且还增加了不必要的步骤。