根据 pyspark 条件使用其他列值覆盖列值

Use*_*345 6 apache-spark pyspark

我有一个data framepyspark像下面。

df.show()

+-----------+------------+-------------+
|customer_id|product_name|      country|
+-----------+------------+-------------+
|   12870946|        null|       Poland|
|     815518|       MA401|United States|
|    3138420|     WG111v2|           UK|
|    3178864|    WGR614v6|United States|
|    7456796|       XE102|United States|
|   21893468|     AGM731F|United States|
+-----------+------------+-------------+
Run Code Online (Sandbox Code Playgroud)

我有另一个数据框,如下所示 df1.show()

+-----------+------------+
|customer_id|product_name|
+-----------+------------+
|   12870946|     GS748TS|
|     815518|       MA402|
|    3138420|        null|
|    3178864|    WGR614v6|
|    7456796|       XE102|
|   21893468|     AGM731F|
|       null|       AE171|
+-----------+------------+
Run Code Online (Sandbox Code Playgroud)

现在我想fuller outer join在这些表上做一个并更新product_name列值,如下所示。

1) Overwrite the values in `df` using values in `df1` if there are values in `df1`.
2) if there are `null` values or `no` values in `df1` then leave the values in `df` as they are 
Run Code Online (Sandbox Code Playgroud)

expected result

+-----------+------------+-------------+
|customer_id|product_name|      country|
+-----------+------------+-------------+
|   12870946|     GS748TS|       Poland|
|     815518|       MA402|United States|
|    3138420|     WG111v2|           UK|
|    3178864|    WGR614v6|United States|
|    7456796|       XE102|United States|
|   21893468|     AGM731F|United States|
|       null|       AE171|         null|
+-----------+------------+-------------+
Run Code Online (Sandbox Code Playgroud)

我做了如下

import pyspark.sql.functions as f
df2 = df.join(df1, df.customer_id == df1.customer_id, 'full_outer').select(df.customer_id, f.coalesce(df.product_name, df1.product_name).alias('product_name'), df.country)
Run Code Online (Sandbox Code Playgroud)

但我得到的结果是不同的

df2.show()

+-----------+------------+-------------+
|customer_id|product_name|      country|
+-----------+------------+-------------+
|   12870946|        null|       Poland|
|     815518|       MA401|United States|
|    3138420|     WG111v2|           UK|
|    3178864|    WGR614v6|United States|
|    7456796|       XE102|United States|
|   21893468|     AGM731F|United States|
|       null|       AE171|         null|
+-----------+------------+-------------+
Run Code Online (Sandbox Code Playgroud)

我怎样才能得到 expected result

pau*_*ult 6

您编写的代码为我生成了正确的输出,因此我无法重现您的问题。我看过其他帖子,其中在执行连接时使用别名解决了问题,因此这里是您的代码的稍微修改版本,它将执行相同的操作:

import pyspark.sql.functions as f

df.alias("r").join(df1.alias("l"), on="customer_id", how='full_outer')\
    .select(
        "customer_id",
        f.coalesce("r.product_name", "l.product_name").alias('product_name'),
        "country"
    )\
    .show()
#+-----------+------------+-------------+
#|customer_id|product_name|      country|
#+-----------+------------+-------------+
#|    7456796|       XE102|United States|
#|    3178864|    WGR614v6|United States|
#|       null|       AE171|         null|
#|     815518|       MA401|United States|
#|    3138420|     WG111v2|           UK|
#|   12870946|     GS748TS|       Poland|
#|   21893468|     AGM731F|United States|
#+-----------+------------+-------------+
Run Code Online (Sandbox Code Playgroud)

当我运行您的代码时,我也得到了相同的结果(转载如下):

df.join(df1, df.customer_id == df1.customer_id, 'full_outer')\
    .select(
        df.customer_id,
        f.coalesce(df.product_name, df1.product_name).alias('product_name'),
        df.country
    )\
    .show()
Run Code Online (Sandbox Code Playgroud)

我正在使用 spark 2.1 和 python 2.7.13。