de-*_*ner 2 python dataframe apache-spark apache-spark-sql pyspark
I'm trying to compare two data frames with have same number of columns i.e. 4 columns with id as key column in both data frames
df1 = spark.read.csv("/path/to/data1.csv")
df2 = spark.read.csv("/path/to/data2.csv")
Run Code Online (Sandbox Code Playgroud)
Now I want to append new column to DF2 i.e. column_names which is the list of the columns with different values than df1
df2.withColumn("column_names",udf())
Run Code Online (Sandbox Code Playgroud)
DF1
+------+---------+--------+------+
| id | |name | sal | Address |
+------+---------+--------+------+
| 1| ABC | 5000 | US |
| 2| DEF | 4000 | UK |
| 3| GHI | 3000 | JPN |
| 4| JKL | 4500 | CHN |
+------+---------+--------+------+
Run Code Online (Sandbox Code Playgroud)
DF2:
+------+---------+--------+------+
| id | |name | sal | Address |
+------+---------+--------+------+
| 1| ABC | 5000 | US |
| 2| DEF | 4000 | CAN |
| 3| GHI | 3500 | JPN |
| 4| JKL_M | 4800 | CHN |
+------+---------+--------+------+
Run Code Online (Sandbox Code Playgroud)
Now I want DF3
DF3:
+------+---------+--------+------+--------------+
| id | |name | sal | Address | column_names |
+------+---------+--------+------+--------------+
| 1| ABC | 5000 | US | [] |
| 2| DEF | 4000 | CAN | [address] |
| 3| GHI | 3500 | JPN | [sal] |
| 4| JKL_M | 4800 | CHN | [name,sal] |
+------+---------+--------+------+--------------+
Run Code Online (Sandbox Code Playgroud)
I saw this SO question, How to compare two dataframe and print columns that are different in scala. Tried that, however the result is different.
I'm thinking of going with a UDF function by passing row from each dataframe to udf and compare column by column and return column list. However for that both the data frames should be in sorted order so that same id rows will be sent to udf. Sorting is costly operation here. Any solution?
假设我们可以使用 id 来连接这两个数据集,我认为不需要 UDF。这可以通过使用内部连接、数组和array_remove 等函数来解决。
首先让我们创建两个数据集:
df1 = spark.createDataFrame([
[1, "ABC", 5000, "US"],
[2, "DEF", 4000, "UK"],
[3, "GHI", 3000, "JPN"],
[4, "JKL", 4500, "CHN"]
], ["id", "name", "sal", "Address"])
df2 = spark.createDataFrame([
[1, "ABC", 5000, "US"],
[2, "DEF", 4000, "CAN"],
[3, "GHI", 3500, "JPN"],
[4, "JKL_M", 4800, "CHN"]
], ["id", "name", "sal", "Address"])
Run Code Online (Sandbox Code Playgroud)
首先我们在两个数据集之间进行内部连接,然后我们df1[col] != df2[col]为每一列生成条件,除了id。当列不相等时,我们返回列名,否则返回一个空字符串。条件列表将由数组的项组成,最后我们从中删除空项:
from pyspark.sql.functions import col, array, when, array_remove
# get conditions for all columns except id
conditions_ = [when(df1[c]!=df2[c], lit(c)).otherwise("") for c in df1.columns if c != 'id']
select_expr =[
col("id"),
*[df2[c] for c in df2.columns if c != 'id'],
array_remove(array(*conditions_), "").alias("column_names")
]
df1.join(df2, "id").select(*select_expr).show()
# +---+-----+----+-------+------------+
# | id| name| sal|Address|column_names|
# +---+-----+----+-------+------------+
# | 1| ABC|5000| US| []|
# | 3| GHI|3500| JPN| [sal]|
# | 2| DEF|4000| CAN| [Address]|
# | 4|JKL_M|4800| CHN| [name, sal]|
# +---+-----+----+-------+------------+
Run Code Online (Sandbox Code Playgroud)
这是您的解决方案UDF,我动态更改了dataframe名字,以便在检查过程中不会出现歧义。请查看下面的代码,如有任何疑问请告诉我。
>>> from pyspark.sql.functions import *
>>> df.show()
+---+----+----+-------+
| id|name| sal|Address|
+---+----+----+-------+
| 1| ABC|5000| US|
| 2| DEF|4000| UK|
| 3| GHI|3000| JPN|
| 4| JKL|4500| CHN|
+---+----+----+-------+
>>> df1.show()
+---+----+----+-------+
| id|name| sal|Address|
+---+----+----+-------+
| 1| ABC|5000| US|
| 2| DEF|4000| CAN|
| 3| GHI|3500| JPN|
| 4|JKLM|4800| CHN|
+---+----+----+-------+
>>> df2 = df.select([col(c).alias("x_"+c) for c in df.columns])
>>> df3 = df1.join(df2, col("id") == col("x_id"), "left")
//udf declaration
>>> def CheckMatch(Column,r):
... check=''
... ColList=Column.split(",")
... for cc in ColList:
... if(r[cc] != r["x_" + cc]):
... check=check + "," + cc
... return check.replace(',','',1).split(",")
>>> CheckMatchUDF = udf(CheckMatch)
//final column that required to select
>>> finalCol = df1.columns
>>> finalCol.insert(len(finalCol), "column_names")
>>> df3.withColumn("column_names", CheckMatchUDF(lit(','.join(df1.columns)),struct([df3[x] for x in df3.columns])))
.select(finalCol)
.show()
+---+----+----+-------+------------+
| id|name| sal|Address|column_names|
+---+----+----+-------+------------+
| 1| ABC|5000| US| []|
| 2| DEF|4000| CAN| [Address]|
| 3| GHI|3500| JPN| [sal]|
| 4|JKLM|4800| CHN| [name, sal]|
+---+----+----+-------+------------+
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
24279 次 |
| 最近记录: |