DeltaLake 合并具有空值的列

MMa*_*udi 6 merge apache-spark apache-spark-sql delta-lake

我正在使用 DeltaLake API 使用下面的代码更新表中的行

DeltaTable.forPath(sparkSession, cleanDataPath)
          .as("target")
          .merge(df.as("source"), "target.desk_mirror_name = source.desk_mirror_name AND target.price = source.price AND target.valuationdate = source.valuationdate AND target.valuationversion = source.valuationversion")
          .whenMatched()
          .updateAll()
          .whenNotMatched()
          .insertAll()
          .execute();
Run Code Online (Sandbox Code Playgroud)

这应该匹配源表和目标表之间的所有列,除了列valuationtag

合并之前,目标表如下所示

+----------------+------+-------------+----------------+------------+
|desk_mirror_name| price|valuationdate|valuationversion|valuationtag|
+----------------+------+-------------+----------------+------------+
|          Sample|499.97|   2021-06-10|    210611170317|210611170317|
|          Sample|967.93|   2021-06-10|    210611170317|210611170317|
|          Sample| 500.0|   2021-06-10|    210611170317|210611170317|
+----------------+------+-------------+----------------+------------+
Run Code Online (Sandbox Code Playgroud)

源表(应更新目标表)如下

+----------------+------+-------------+----------------+------------+
|desk_mirror_name| price|valuationdate|valuationversion|valuationtag|
+----------------+------+-------------+----------------+------------+
|          Sample|499.97|   2021-06-10|    210611170317|    OFFICIAL|
|          Sample| 500.0|   2021-06-10|    210611170317|    OFFICIAL|
|          Sample|967.93|   2021-06-10|    210611170317|    OFFICIAL|
+----------------+------+-------------+----------------+------------+
Run Code Online (Sandbox Code Playgroud)

valuationtag更改为OFFICIAL。有了这个,更新后的表是

+----------------+------+-------------+----------------+------------+
|desk_mirror_name| price|valuationdate|valuationversion|valuationtag|
+----------------+------+-------------+----------------+------------+
|          Sample|499.97|   2021-06-10|    210611170317|    OFFICIAL|
|          Sample| 500.0|   2021-06-10|    210611170317|    OFFICIAL|
|          Sample|967.93|   2021-06-10|    210611170317|    OFFICIAL|
+----------------+------+-------------+----------------+------------+
Run Code Online (Sandbox Code Playgroud)

到目前为止,一切都很好。

当列(在两个表中)包含空值时,问题就开始了。假设desk_mirror_name目标表中的列更改为空

+----------------+------+-------------+----------------+------------+
|desk_mirror_name| price|valuationdate|valuationversion|valuationtag|
+----------------+------+-------------+----------------+------------+
|            null|499.97|   2021-06-10|    210611170317|210611170317|
|            null|967.93|   2021-06-10|    210611170317|210611170317|
|            null| 500.0|   2021-06-10|    210611170317|210611170317|
+----------------+------+-------------+----------------+------------+
Run Code Online (Sandbox Code Playgroud)

对于具有完全相同数据的源表,除了valuationtag更改为OFFICIAL之外,更新后的表奇怪的是插入了新行,而不是合并。结果如下

+----------------+------+-------------+----------------+------------+
|desk_mirror_name| price|valuationdate|valuationversion|valuationtag|
+----------------+------+-------------+----------------+------------+
|            null|499.97|   2021-06-10|    210611170317|210611170317|
|            null|967.93|   2021-06-10|    210611170317|210611170317|
|            null| 500.0|   2021-06-10|    210611170317|210611170317|
|            null|967.93|   2021-06-10|    210611170317|    OFFICIAL|
|            null| 500.0|   2021-06-10|    210611170317|    OFFICIAL|
|            null|499.97|   2021-06-10|    210611170317|    OFFICIAL|
+----------------+------+-------------+----------------+------------+
Run Code Online (Sandbox Code Playgroud)

似乎 DeltaLake 没有正确处理desk_mirror_name,它在源表和目标表中都有空值。

遇到这样的具体情况该如何处理呢?

Kar*_*raj 12

这是 Spark 中的预期行为。Apache Spark 支持标准比较运算符,例如>, >=, =, < and <=.这些运算符的结果未知或NULL其中一个操作数或两个操作数未知或NULL。为了比较 NULL 值是否相等,Spark 提供了一个空安全等于运算符(<=>),当其中一个操作数为 NULL 时返回 False,当两个操作数均为 NULL 时返回 True。参考链接

由于这个原因,Null被认为是WHEN NOT MATCHED条款只能有INSERT行动。新行是根据指定的列和相应的表达式生成的。不需要指定目标表中的所有列。对于未指定的目标列,NULL插入。