Spark unionAll多个数据帧

ech*_*cho 24 scala apache-spark apache-spark-sql

对于一组数据帧

val df1 = sc.parallelize(1 to 4).map(i => (i,i*10)).toDF("id","x")
val df2 = sc.parallelize(1 to 4).map(i => (i,i*100)).toDF("id","y")
val df3 = sc.parallelize(1 to 4).map(i => (i,i*1000)).toDF("id","z")
Run Code Online (Sandbox Code Playgroud)

把他们所有人联合起来

df1.unionAll(df2).unionAll(df3)
Run Code Online (Sandbox Code Playgroud)

是否有更优雅和可扩展的方式为任意数量的数据帧执行此操作,例如

Seq(df1, df2, df3) 
Run Code Online (Sandbox Code Playgroud)

zer*_*323 38

最简单的解决办法是reduceunion(unionAll在火花<2.0):

val dfs = Seq(df1, df2, df3)
dfs.reduce(_ union _)
Run Code Online (Sandbox Code Playgroud)

这是相对简洁的,不应该从堆外存储移动数据,但扩展沿着每个联合的沿袭需要非线性时间来执行计划分析.如果你试图合并大量的,可能会有什么问题DataFrames.

您还可以转换为RDDs并使用SparkContext.union:

dfs match {
  case h :: Nil => Some(h)
  case h :: _   => Some(h.sqlContext.createDataFrame(
                     h.sqlContext.sparkContext.union(dfs.map(_.rdd)),
                     h.schema
                   ))
  case Nil  => None
}
Run Code Online (Sandbox Code Playgroud)

它保持谱系短分析成本低,但否则它比DataFrames直接合并效率低.

  • 如果有很多(比如超过 20 个)DataFrame,性能如何? (6认同)
  • pySpark中的等效代码如何? (2认同)
  • 对大量 DF 的性能也很好奇 (2认同)

TH2*_*H22 19

对于pyspark,您可以执行以下操作:

from functools import reduce
from pyspark.sql import DataFrame

dfs = [df1,df2,df3]
df = reduce(DataFrame.unionAll, dfs)
Run Code Online (Sandbox Code Playgroud)

数据帧中列的顺序应该相同,这一点也不值得。如果您没有正确的列顺序,这可以默默地给出意想不到的结果!!

如果您使用的是pyspark 2.3或更高版本,则可以使用unionByName,因此不必重新排序列。

  • 请记住粗体提到的一点。 (5认同)
  • 我如何添加像“allowMissingColumns=True”这样的参数? (5认同)