Int*_*ers 2 python join apache-spark-sql pyspark databricks
假设我们有两个数据帧,我们想要将它们与左反连接进行比较:
data1 = [
(1, 11, 20, None),
(2, 12, 22, 31),
]
data2 = [
(1, 11, 20, None),
(2, 12, 22, 31),
]
schema = StructType([ \
StructField("value_1",IntegerType(), True), \
StructField("value_2",IntegerType(), True), \
StructField("value_3",IntegerType(), True), \
StructField("value_4",IntegerType(), True), \
])
df1 = spark.createDataFrame(data=data1,schema=schema)
df2 = spark.createDataFrame(data=data2,schema=schema)
Run Code Online (Sandbox Code Playgroud)
如何通过多个(所有)列 nullsafe 连接这些数据帧?我想出的唯一解决方案如下:
df = df1.join(df2, \
((df1.value_1.eqNullSafe(df2.value_1)) &
(df1.value_2.eqNullSafe(df2.value_2)) &
(df1.value_3.eqNullSafe(df2.value_3)) &
(df1.value_4.eqNullSafe(df2.value_4))),
"leftanti" \
)
Run Code Online (Sandbox Code Playgroud)
但不幸的是,我们现在必须处理大量列的动态列表。我们如何以某种方式重写此连接,以便我们可以提供要连接的列的列表。
感谢和BR
据我理解问题陈述,您希望根据提供的列列表创建动态连接条件。我们可以使用reduce()fromfunctools模块来做到这一点。
join_cols = ['value_1', 'value_2', 'value_3', 'value_4']
from functools import reduce
join_condition = reduce(lambda x, y: x & y, [df1[k].eqNullSafe(df2[k]) for k in join_cols])
print(join_condition)
# Column<'((((value_1 <=> value_1) AND (value_2 <=> value_2)) AND (value_3 <=> value_3)) AND (value_4 <=> value_4))'>
Run Code Online (Sandbox Code Playgroud)
可以直接使用join_condition里面的参数.join()。
df = df1.join(df2, join_condition, "leftanti")
Run Code Online (Sandbox Code Playgroud)