将pyspark数据帧与另一个数据帧进行比较

Shi*_*ijo 5 python dataframe apache-spark-sql pyspark

我有2个数据帧来比较它们具有相同的列数,并且比较结果应该具有不匹配的字段以及值和ID.

数据帧一

+-----+---+--------+
| name| id|    City|
+-----+---+--------+
|  Sam|  3| Toronto|
| BALU| 11|     YYY|
|CLAIR|  7|Montreal|
|HELEN| 10|  London|
|HELEN| 16|  Ottawa|
+-----+---+--------+
Run Code Online (Sandbox Code Playgroud)

数据帧二

+-------------+-----------+-------------+
|Expected_name|Expected_id|Expected_City|
+-------------+-----------+-------------+
|          SAM|          3|      Toronto|
|         BALU|         11|          YYY|
|        CLARE|          7|     Montreal|
|        HELEN|         10|        Londn|
|        HELEN|         15|       Ottawa|
+-------------+-----------+-------------+
Run Code Online (Sandbox Code Playgroud)

预期产出

+---+------------+--------------+-----+
| ID|Actual_value|Expected_value|Field|
+---+------------+--------------+-----+
|  7|       CLAIR|         CLARE| name|
|  3|         Sam|           SAM| name|
| 10|      London|         Londn| City|
+---+------------+--------------+-----+
Run Code Online (Sandbox Code Playgroud)

创建示例数据

from pyspark.sql import SQLContext
from pyspark.context import SparkContext
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from pyspark.sql import SparkSession

sc = SparkContext()
sql_context = SQLContext(sc)

spark = SparkSession.builder.getOrCreate()

spark.sparkContext.setLogLevel("ERROR") # log only on fails

df_Actual = sql_context.createDataFrame(
    [("Sam", 3,'Toronto'), ("BALU", 11,'YYY'), ("CLAIR", 7,'Montreal'), 
     ("HELEN", 10,'London'), ("HELEN", 16,'Ottawa')],
    ["name", "id","City"]
)

df_Expected = sql_context.createDataFrame(
     [("SAM", 3,'Toronto'), ("BALU", 11,'YYY'), ("CLARE", 7,'Montreal'), 
      ("HELEN", 10,'Londn'), ("HELEN", 15,'Ottawa')],
     ["Expected_name", "Expected_id","Expected_City"]
)
Run Code Online (Sandbox Code Playgroud)

为Result创建空数据框

field = [
    StructField("ID",StringType(), True),
    StructField("Actual_value", StringType(), True), 
    StructField("Expected_value", StringType(), True),
    StructField("Field", StringType(), True)
]

schema = StructType(field)
Df_Result = sql_context.createDataFrame(sc.emptyRDD(), schema)
Run Code Online (Sandbox Code Playgroud)

加入预期和实际的id

df_cobined = df_Actual.join(df_Expected, (df_Actual.id == df_Expected.Expected_id))

col_names=df_Actual.schema.names
Run Code Online (Sandbox Code Playgroud)

遍历每列以查找不匹配

for col_name in col_names:

    #Filter for column values not matching
    df_comp= df_cobined.filter(col(col_name)!=col("Expected_"+col_name ))\
        .select(col('id'),col(col_name),col("Expected_"+col_name ))

    #Add not matching column name
    df_comp = df_comp.withColumn("Field", lit(col_name))

    #Add to final result
    Df_Result = Df_Result.union(df_comp)
Df_Result.show()
Run Code Online (Sandbox Code Playgroud)

此代码按预期工作.但是,在实际情况中,我有更多列和数百万行进行比较.使用此代码,完成比较需要更多时间.有没有更好的方法来提高性能并获得相同的结果?

Shi*_*ijo 1

对于这个正在寻找答案的人,我转置了数据框,然后做了比较。

from pyspark.sql.functions import array, col, explode, struct, lit
def Transposedf(df, by,colheader):

# Filter dtypes and split into column names and type description
cols, dtypes = zip(*((c, t) for (c, t) in df.dtypes if c not in by))
# Spark SQL supports only homogeneous columns
assert len(set(dtypes)) == 1, "All columns have to be of the same type"

# Create and explode an array of (column_name, column_value) structs
kvs = explode(array([ struct(lit(c).alias("Field"), col(c).alias(colheader)) for c in cols ])).alias("kvs")

return df.select(by + [kvs]).select(by + ["kvs.Field", "kvs."+colheader])
Run Code Online (Sandbox Code Playgroud)

然后比较看起来像这样

def Compare_df(df_Expected,df_Actual):
  df_combined = (df_Actual
    .join(df_Expected, ((df_Actual.id == df_Expected.id) 
                        & (df_Actual.Field == df_Expected.Field) 
                        & (df_Actual.Actual_value != df_Expected.Expected_value)))
    .select([df_Actual.account_unique_id,df_Actual.Field,df_Actual.Actual_value,df_Expected.Expected_value])
    )
      return df_combined 
Run Code Online (Sandbox Code Playgroud)

我将这两个函数称为

df_Actual=Transposedf(df_Actual, ["id"],'Actual_value')
df_Expected=Transposedf(df_Expected, ["id"],'Expected_value')

#Compare the expected and actual
df_result=Compare_df(df_Expected,df_Actual)
Run Code Online (Sandbox Code Playgroud)