MMa*_*udi 6 merge apache-spark apache-spark-sql delta-lake
我正在使用 DeltaLake API 使用下面的代码更新表中的行
DeltaTable.forPath(sparkSession, cleanDataPath)
.as("target")
.merge(df.as("source"), "target.desk_mirror_name = source.desk_mirror_name AND target.price = source.price AND target.valuationdate = source.valuationdate AND target.valuationversion = source.valuationversion")
.whenMatched()
.updateAll()
.whenNotMatched()
.insertAll()
.execute();
Run Code Online (Sandbox Code Playgroud)
这应该匹配源表和目标表之间的所有列,除了列valuationtag
合并之前,目标表如下所示
+----------------+------+-------------+----------------+------------+
|desk_mirror_name| price|valuationdate|valuationversion|valuationtag|
+----------------+------+-------------+----------------+------------+
| Sample|499.97| 2021-06-10| 210611170317|210611170317|
| Sample|967.93| 2021-06-10| 210611170317|210611170317|
| Sample| 500.0| 2021-06-10| 210611170317|210611170317|
+----------------+------+-------------+----------------+------------+
Run Code Online (Sandbox Code Playgroud)
源表(应更新目标表)如下
+----------------+------+-------------+----------------+------------+
|desk_mirror_name| price|valuationdate|valuationversion|valuationtag|
+----------------+------+-------------+----------------+------------+
| Sample|499.97| 2021-06-10| 210611170317| OFFICIAL|
| Sample| 500.0| 2021-06-10| 210611170317| OFFICIAL|
| Sample|967.93| 2021-06-10| 210611170317| OFFICIAL|
+----------------+------+-------------+----------------+------------+
Run Code Online (Sandbox Code Playgroud)
仅valuationtag
更改为OFFICIAL。有了这个,更新后的表是
+----------------+------+-------------+----------------+------------+
|desk_mirror_name| price|valuationdate|valuationversion|valuationtag|
+----------------+------+-------------+----------------+------------+
| Sample|499.97| 2021-06-10| 210611170317| OFFICIAL|
| Sample| 500.0| 2021-06-10| 210611170317| OFFICIAL|
| Sample|967.93| 2021-06-10| 210611170317| OFFICIAL|
+----------------+------+-------------+----------------+------------+
Run Code Online (Sandbox Code Playgroud)
到目前为止,一切都很好。
当列(在两个表中)包含空值时,问题就开始了。假设desk_mirror_name
目标表中的列更改为空
+----------------+------+-------------+----------------+------------+
|desk_mirror_name| price|valuationdate|valuationversion|valuationtag|
+----------------+------+-------------+----------------+------------+
| null|499.97| 2021-06-10| 210611170317|210611170317|
| null|967.93| 2021-06-10| 210611170317|210611170317|
| null| 500.0| 2021-06-10| 210611170317|210611170317|
+----------------+------+-------------+----------------+------------+
Run Code Online (Sandbox Code Playgroud)
对于具有完全相同数据的源表,除了valuationtag
更改为OFFICIAL之外,更新后的表奇怪的是插入了新行,而不是合并。结果如下
+----------------+------+-------------+----------------+------------+
|desk_mirror_name| price|valuationdate|valuationversion|valuationtag|
+----------------+------+-------------+----------------+------------+
| null|499.97| 2021-06-10| 210611170317|210611170317|
| null|967.93| 2021-06-10| 210611170317|210611170317|
| null| 500.0| 2021-06-10| 210611170317|210611170317|
| null|967.93| 2021-06-10| 210611170317| OFFICIAL|
| null| 500.0| 2021-06-10| 210611170317| OFFICIAL|
| null|499.97| 2021-06-10| 210611170317| OFFICIAL|
+----------------+------+-------------+----------------+------------+
Run Code Online (Sandbox Code Playgroud)
似乎 DeltaLake 没有正确处理desk_mirror_name
,它在源表和目标表中都有空值。
遇到这样的具体情况该如何处理呢?
Kar*_*raj 12
这是 Spark 中的预期行为。Apache Spark 支持标准比较运算符,例如>, >=, =, < and <=.
这些运算符的结果未知或NULL
其中一个操作数或两个操作数未知或NULL
。为了比较 NULL 值是否相等,Spark 提供了一个空安全等于运算符(<=>)
,当其中一个操作数为 NULL 时返回 False,当两个操作数均为 NULL 时返回 True。参考链接。
由于这个原因,Null
被认为是WHEN NOT MATCHED
条款只能有INSERT
行动。新行是根据指定的列和相应的表达式生成的。不需要指定目标表中的所有列。对于未指定的目标列,NULL
插入。
归档时间: |
|
查看次数: |
4888 次 |
最近记录: |