具有大量列的数据框上的火花窗口功能

cri*_*aru 4 apache-spark spark-dataframe

我有一个从csv文件读取的ML数据框。它包含三种类型的列:

ID时间戳记Feature1 Feature2 ... Feature_n

其中n是〜500(在ML术语中为500个特征)。数据集中的行总数约为1.6亿。

由于这是先前完全连接的结果,因此许多功能没有设置值。

我的目标是运行一个“填充”功能(fillna样式为python pandas),其中每个空功能值都将根据ID和Date设置为该列的先前可用值。

我正在尝试通过以下spark 2.2.1代码实现这一目标:

 val rawDataset = sparkSession.read.option("header", "true").csv(inputLocation)

 val window = Window.partitionBy("ID").orderBy("DATE").rowsBetween(-50000, -1)

 val columns = Array(...) //first 30 columns initially, just to see it working

val rawDataSetFilled = columns.foldLeft(rawDataset) { (originalDF, columnToFill) =>
      originalDF.withColumn(columnToFill, coalesce(col(columnToFill), last(col(columnToFill), ignoreNulls = true).over(window)))
    }
Run Code Online (Sandbox Code Playgroud)

我正在Spark 2.2.1上的Amazon EMR的4 m4.large实例上运行此作业。并启用了动态分配。

作业运行了2小时以上而未完成。

我在代码级别上做错了吗?给定数据和实例的大小,我认为它应该在合理的时间内完成?而且我什至没有尝试使用全部500列,只有30列!

在容器日志中,我看到的都是许多这样的日志:

INFO codegen.CodeGenerator:166.677493毫秒内生成的代码

INFO execute.ExternalAppendOnlyUnsafeRowArray:已达到4096行的溢出阈值,切换到org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter

我尝试将参数spark.sql.windowExec.buffer.spill.threshold设置为更大的值,而没有任何影响。还有其他我应该知道的设置吗?那两条线是我在任何容器日志中看到的唯一一行。

在Ganglia中,我看到大多数CPU内核在完全使用情况附近达到峰值,但是内存使用情况低于可用的最大值。所有执行者均已分配并正在工作。

cri*_*aru 5

我设法不使用withColumn调用就重写了对逻辑。显然,它们对于大量列可能非常慢,因此我也遇到了stackoverflow错误。

我很想知道为什么会有如此巨大的差异-以及查询计划执行在幕后究竟发生了什么,这使得withColumns重复调用如此缓慢。

证明非常有用的链接:Spark Jira问题此stackoverflow问题

    var rawDataset = sparkSession.read.option("header", "true").csv(inputLocation)    
    val window = Window.partitionBy("ID").orderBy("DATE").rowsBetween(Window.unboundedPreceding, Window.currentRow)
    rawDataset = rawDataset.select(rawDataset.columns.map(column => coalesce(col(column), last(col(column), ignoreNulls = true).over(window)).alias(column)): _*)
    rawDataset.write.option("header", "true").csv(outputLocation)
Run Code Online (Sandbox Code Playgroud)