PySpark:使用另一个数据框插入或更新数据框

nav*_*vin 3 python upsert apache-spark-sql pyspark pyspark-dataframes

我有两个数据框,DF1 和 DF2。DF1 为主,DF2 为增量。来自DF2 的数据应插入DF1 或用于更新DF1 数据。

假设 DF1 具有以下格式:

证件号码 开始日期 数量
1 2016-01-01 4650 22
2 2016-01-02 3130 45
1 2016-01-03 4456 22
2 2016-01-15 1234 45

DF2 包含以下内容:

证件号码 开始日期 数量
1 2016-01-01 8650 52
2 2016-01-02 7130 65
1 2016-01-06 3456 20
2 2016-01-20 2345 19
3 2016-02-02 1345 19

我需要组合两个数据帧,如果 DF2 的“id_no”和“开始日期”与 DF1 匹配,则应在 DF1 中替换它,如果不匹配,则应将其插入到 DF1 中。“id_no”不是唯一的。

预期结果:

证件号码 开始日期 数量
1 2016-01-01 8650 52
2 2016-01-02 7130 65
1 2016-01-03 4456 22
2 2016-01-15 1234 45
1 2016-01-06 3456 20
2 2016-01-20 2345 19
3 2016-02-02 1345 19

Psi*_*dom 5

您可以在id_noand上连接两个数据框start_date,然后coalesceamountdays列中先加入列df2

import pyspark.sql.functions as f

df1.alias('a').join(
    df2.alias('b'), ['id_no', 'start_date'], how='outer'
).select('id_no', 'start_date', 
    f.coalesce('b.amount', 'a.amount').alias('amount'), 
    f.coalesce('b.days', 'a.days').alias('days')
).show()

+-----+----------+------+----+
|id_no|start_date|amount|days|
+-----+----------+------+----+
|    1|2016-01-06|  3456|  20|
|    2|2016-01-20|  2345|  19|
|    1|2016-01-03|  4456|  22|
|    3|2016-02-02|  1345|  19|
|    2|2016-01-15|  1234|  45|
|    1|2016-01-01|  8650|  52|
|    2|2016-01-02|  7130|  65|
+-----+----------+------+----+
Run Code Online (Sandbox Code Playgroud)

如果您有更多列:

cols = ['amount', 'days']

df1.alias('a').join(
    df2.alias('b'), ['id_no', 'start_date'], how='outer'
).select('id_no', 'start_date', 
    *(f.coalesce('b.' + col, 'a.' + col).alias(col) for col in cols)
).show()
+-----+----------+------+----+
|id_no|start_date|amount|days|
+-----+----------+------+----+
|    1|2016-01-06|  3456|  20|
|    2|2016-01-20|  2345|  19|
|    1|2016-01-03|  4456|  22|
|    3|2016-02-02|  1345|  19|
|    2|2016-01-15|  1234|  45|
|    1|2016-01-01|  8650|  52|
|    2|2016-01-02|  7130|  65|
+-----+----------+------+----+
Run Code Online (Sandbox Code Playgroud)