Scala/Spark:不可变的数据帧和内存

Jak*_*ke 3 scala apache-spark

我是Scala的新手.我有Java和R的经验

我对DataFrames和内存管理的不变性感到困惑.原因是这样的:

R中的Dataframe也是不可变的.随后,在R中发现它是不可行的.(简单地说)当处理大量列时,每次转换都会导致新的Dataframe.1000个连续列上的1000次连续操作将导致1000个Dataframe对象).现在,大多数数据科学家更喜欢R的data.table,它通过引用在单个data.table对象上执行操作.

Scala的数据框(对新手)似乎有类似的问题.例如,以下代码在重命名1000列时似乎创建了1000个数据帧.尽管有foldLeft(),但每次调用withColumn()都会创建一个新的DataFrame实例.

那么,我是否相信Scala中非常有效的垃圾收集,或者我是否需要尝试限制创建的不可变实例的数量.如果是后者,我应该考虑哪些技巧?

def castAllTypedColumnsTo(df: DataFrame,
                        sourceType: DataType, targetType: DataType): 
DataFrame =
{

val columnsToBeCasted = df.schema
  .filter(s => s.dataType == sourceType)

if (columnsToBeCasted.length > 0)
{
  println(s"Found ${columnsToBeCasted.length} columns " +
    s"(${columnsToBeCasted.map(s => s.name).mkString(",")})" +
    s" - casting to ${targetType.typeName.capitalize}Type")
}

columnsToBeCasted.foldLeft(df)
{ (foldedDf, col) =>
  castColumnTo(foldedDf, col.name, targetType)
}
}
Run Code Online (Sandbox Code Playgroud)

此方法将在每次调用时返回一个新实例

  private def castColumnTo(df: DataFrame, cn: String, tpe: DataType): 
DataFrame =
{

//println("castColumnTo")
df.withColumn(cn, df(cn).cast(tpe)

)
}
Run Code Online (Sandbox Code Playgroud)

Joe*_*e K 6

差异基本上是懒惰.返回的每个新DataFrame都未在内存中实现.它只存储基础DataFrame和应该应用于它的函数.它本质上是一个如何创建一些数据的执行计划,而不是数据本身.

当实际执行并将结果保存到某个地方时,所有1000个操作可以并行应用于每一行,因此您将获得1个额外的输出DataFrame.Spark将尽可能多的操作压缩在一起,并且不会实现任何不必要的内容或者未明确请求保存或缓存的内容.