Apache Spark中的DataFrame相等

Sim*_*Sim 23 scala dataframe apache-spark rdd apache-spark-sql

假设df1并且df2DataFrameApache Spark 中的两个,使用两种不同的机制计算,例如,Spark SQL与Scala/Java/Python API.

是否存在一种惯用的方法来确定两个数据帧是否相等(相等,同构),其中等价由数据确定(每行的列名和列值)是否相同,除了行和列的排序?

这个问题的动机是,通常有很多方法来计算一些大数据结果,每种方法都有自己的权衡.在探讨这些权衡时,重要的是要保持正确性,因此需要检查有意义的测试数据集的等效性/相等性.

Hol*_*den 14

Apache Spark测试套件中有一些标准方法,但其中大多数涉及在本地收集数据,如果您想在大型DataFrame上进行相等测试,那么这可能不是一个合适的解决方案.

首先检查模式,然后你可以与df3进行交集并验证df1,df2和df3的计数是否相等(但是这只有在没有重复行的情况下才有效,如果有不同的重复行,这个方法仍然可以返回true).

另一个选择是获取两个DataFrame的底层RDD,映射到(Row,1),执行reduceByKey来计算每个Row的数量,然后将两个生成的RDD组合在一起,然后进行常规聚合并返回false任何迭代器都不相等.

  • 在重复行的情况下,附加一个额外的“count”列(当然通过计算functions.agg或通过SQL)然后将交集作为df3怎么样? (2认同)

Nic*_*mas 9

我不知道惯用语,但我认为你可以得到一种强大的方法来比较DataFrames,如下所述.(我使用PySpark进行说明,但这种方法可以使用多种语言.)

a = spark.range(5)
b = spark.range(5)

a_prime = a.groupBy(sorted(a.columns)).count()
b_prime = b.groupBy(sorted(b.columns)).count()

assert a_prime.subtract(b_prime).count() == b_prime.subtract(a_prime).count() == 0
Run Code Online (Sandbox Code Playgroud)

此方法可以正确处理DataFrame可能具有重复行,不同顺序的行和/或不同顺序列的情况.

例如:

a = spark.createDataFrame([('nick', 30), ('bob', 40)], ['name', 'age'])
b = spark.createDataFrame([(40, 'bob'), (30, 'nick')], ['age', 'name'])
c = spark.createDataFrame([('nick', 30), ('bob', 40), ('nick', 30)], ['name', 'age'])

a_prime = a.groupBy(sorted(a.columns)).count()
b_prime = b.groupBy(sorted(b.columns)).count()
c_prime = c.groupBy(sorted(c.columns)).count()

assert a_prime.subtract(b_prime).count() == b_prime.subtract(a_prime).count() == 0
assert a_prime.subtract(c_prime).count() != 0
Run Code Online (Sandbox Code Playgroud)

这种方法非常昂贵,但鉴于需要执行完全差异,大部分费用是不可避免的.这应该可以扩展,因为它不需要在本地收集任何东西.如果你放松了比较应该考虑重复行的约束,那么你可以放弃groupBy()并且只是做subtract(),这可能会显着加快速度.

  • 请注意,这不适用于任何无法排序的数据类型,例如地图,在这种情况下,您可能必须删除这些列并单独执行. (2认同)

Pow*_*ers 6

火花快速测试库中有使数据帧比较(我是图书馆的创建者)两种方法:

assertSmallDataFrameEquality方法在驱动程序节点上收集DataFrames并进行比较

def assertSmallDataFrameEquality(actualDF: DataFrame, expectedDF: DataFrame): Unit = {
  if (!actualDF.schema.equals(expectedDF.schema)) {
    throw new DataFrameSchemaMismatch(schemaMismatchMessage(actualDF, expectedDF))
  }
  if (!actualDF.collect().sameElements(expectedDF.collect())) {
    throw new DataFrameContentMismatch(contentMismatchMessage(actualDF, expectedDF))
  }
}
Run Code Online (Sandbox Code Playgroud)

assertLargeDataFrameEquality方法比较分布在多台计算机上的DataFrames(代码基本上是从spark-testing-base复制的)

def assertLargeDataFrameEquality(actualDF: DataFrame, expectedDF: DataFrame): Unit = {
  if (!actualDF.schema.equals(expectedDF.schema)) {
    throw new DataFrameSchemaMismatch(schemaMismatchMessage(actualDF, expectedDF))
  }
  try {
    actualDF.rdd.cache
    expectedDF.rdd.cache

    val actualCount = actualDF.rdd.count
    val expectedCount = expectedDF.rdd.count
    if (actualCount != expectedCount) {
      throw new DataFrameContentMismatch(countMismatchMessage(actualCount, expectedCount))
    }

    val expectedIndexValue = zipWithIndex(actualDF.rdd)
    val resultIndexValue = zipWithIndex(expectedDF.rdd)

    val unequalRDD = expectedIndexValue
      .join(resultIndexValue)
      .filter {
        case (idx, (r1, r2)) =>
          !(r1.equals(r2) || RowComparer.areRowsEqual(r1, r2, 0.0))
      }

    val maxUnequalRowsToShow = 10
    assertEmpty(unequalRDD.take(maxUnequalRowsToShow))

  } finally {
    actualDF.rdd.unpersist()
    expectedDF.rdd.unpersist()
  }
}
Run Code Online (Sandbox Code Playgroud)

assertSmallDataFrameEquality 对于较小的DataFrame比较,速度更快,并且我发现它足以用于我的测试套件。

  • 对于 Pyspark 人员:提供的函数考虑了排序。如果您只关心内容,请将第二个条件替换为:`if df1.orderBy(*df1.columns).collect() !=df2.orderBy(*df2.columns).collect():` (3认同)
  • 看起来是个不错的图书馆! (2认同)
  • 对于我们这些在这里绊倒并实现收集的人来说,与 `!actualDF.collect().sameElements(expectedDF.collect())` 进行比较。请注意下面的帖子,并警惕“sameElements()”的荒谬之处 /sf/ask/2030595031/# 29008501 (2认同)

小智 5

爪哇:

assert resultDs.union(answerDs).distinct().count() == resultDs.intersect(answerDs).count();
Run Code Online (Sandbox Code Playgroud)


小智 5

尝试执行以下操作:

df1.except(df2).isEmpty
Run Code Online (Sandbox Code Playgroud)

  • 当“df2”大于“df1”的情况下,这不起作用。也许如果你通过添加 `&& df2. except(df1).isEmpty` 使其对称... (3认同)

Enr*_*coM 5

一种可扩展且简单的方法是比较两个DataFrames 并计算不匹配的行数:

df1.diff(df2).where($"diff" != "N").count
Run Code Online (Sandbox Code Playgroud)

如果该数字不为零,则两个DataFrames 不相等。

该转换由spark-extensiondiff提供。

它标识插入、更改、删除和更改的行。

  • 如果您指的是 `df1.collect() != df2.collect()` PySpark 解决方案,则这根本不可扩展。两个 DataFrame 都被加载到驱动程序的内存中。上面的“diff”转换随集群扩展,这意味着如果您的集群可以处理 DataFrame,它就可以处理 diff。那么答案是:是的。 (2认同)

LeO*_* Li 5

有 4 个选项,具体取决于您是否有重复的行。

假设我们有两个DataFrames,z1 和 z1。选项 1/2 适用于没有重复的行。您可以在 中尝试这些spark-shell

  • 选项1:直接执行 except
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.Column

def isEqual(left: DataFrame, right: DataFrame): Boolean = {
   if(left.columns.length != right.columns.length) return false // column lengths don't match
   if(left.count != right.count) return false // record count don't match
   return left.except(right).isEmpty && right.except(left).isEmpty
}
Run Code Online (Sandbox Code Playgroud)
  • 选项2:按列生成行哈希
def createHashColumn(df: DataFrame) : Column = {
   val colArr = df.columns
   md5(concat_ws("", (colArr.map(col(_))) : _*))
}

val z1SigDF = z1.select(col("index"), createHashColumn(z1).as("signature_z1"))
val z2SigDF = z2.select(col("index"), createHashColumn(z2).as("signature_z2"))
val joinDF = z1SigDF.join(z2SigDF, z1SigDF("index") === z2SigDF("index")).where($"signature_z1" =!= $"signature_z2").cache
// should be 0
joinDF.count
Run Code Online (Sandbox Code Playgroud)
  • 选项 3:使用GroupBy(对于具有重复行的 DataFrame)
val z1Grouped = z1.groupBy(z1.columns.map(c => z1(c)).toSeq : _*).count().withColumnRenamed("count", "recordRepeatCount")
val z2Grouped = z2.groupBy(z2.columns.map(c => z2(c)).toSeq : _*).count().withColumnRenamed("count", "recordRepeatCount")

val inZ1NotInZ2 = z1Grouped.except(z2Grouped).toDF()
val inZ2NotInZ1 = z2Grouped.except(z1Grouped).toDF()
// both should be size 0
inZ1NotInZ2.show
inZ2NotInZ1.show
Run Code Online (Sandbox Code Playgroud)
  • 选项 4,使用exceptAll,它也适用于具有重复行的数据
// Source Code: https://github.com/apache/spark/blob/50538600ec/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L2029
val inZ1NotInZ2 = z1.exceptAll(z2).toDF()
val inZ2NotInZ1 = z2.exceptAll(z1).toDF()
// same here, // both should be size 0
inZ1NotInZ2.show
inZ2NotInZ1.show
Run Code Online (Sandbox Code Playgroud)