使用Spark`DataFrame`的`unionAll`出了什么问题?

Mar*_*nne 20 scala dataframe apache-spark apache-spark-sql

使用Spark 1.5.0并给出以下代码,我希望unionAll DataFrame基于它们的列名进行联合.在代码中,我使用一些FunSuite传递SparkContext sc:

object Entities {

  case class A (a: Int, b: Int)
  case class B (b: Int, a: Int)

  val as = Seq(
    A(1,3),
    A(2,4)
  )

  val bs = Seq(
    B(5,3),
    B(6,4)
  )
}

class UnsortedTestSuite extends SparkFunSuite {

  configuredUnitTest("The truth test.") { sc =>
    val sqlContext = new SQLContext(sc)
    import sqlContext.implicits._
    val aDF = sc.parallelize(Entities.as, 4).toDF
    val bDF = sc.parallelize(Entities.bs, 4).toDF
    aDF.show()
    bDF.show()
    aDF.unionAll(bDF).show
  }
}
Run Code Online (Sandbox Code Playgroud)

输出:

+---+---+
|  a|  b|
+---+---+
|  1|  3|
|  2|  4|
+---+---+

+---+---+
|  b|  a|
+---+---+
|  5|  3|
|  6|  4|
+---+---+

+---+---+
|  a|  b|
+---+---+
|  1|  3|
|  2|  4|
|  5|  3|
|  6|  4|
+---+---+
Run Code Online (Sandbox Code Playgroud)

为什么结果包含混合的"b"和"a"列,而不是根据列名对齐列?听起来像一个严重的错误!?

zer*_*323 37

它根本不像一个bug.您看到的是标准的SQL行为,每个主要的RDMBS,包括PostgreSQL,MySQL,OracleMS SQL都表现完全相同.您将找到与名称链接的SQL Fiddle示例.

引用PostgreSQL手册:

为了计算两个查询的并集,交集或差异,这两个查询必须是"联合兼容",这意味着它们返回相同数量的列,并且相应的列具有兼容的数据类型

列名称(不包括set操作中的第一个表)将被忽略.

这种行为直接来自关系代数,其中基本构建块是一个元组.由于元组是有序的,因此两组元组的并集对于您在此处获得的输出是等效的(忽略重复处理).

如果你想使用名称匹配,你可以做这样的事情

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.col

def unionByName(a: DataFrame, b: DataFrame): DataFrame = {
  val columns = a.columns.toSet.intersect(b.columns.toSet).map(col).toSeq
  a.select(columns: _*).unionAll(b.select(columns: _*))
}
Run Code Online (Sandbox Code Playgroud)

要检查名称和类型,应该足以替换columns为:

a.dtypes.toSet.intersect(b.dtypes.toSet).map{case (c, _) => col(c)}.toSeq
Run Code Online (Sandbox Code Playgroud)