cri*_*aru 4 apache-spark spark-dataframe
我有一个从csv文件读取的ML数据框。它包含三种类型的列:
ID时间戳记Feature1 Feature2 ... Feature_n
其中n是〜500(在ML术语中为500个特征)。数据集中的行总数约为1.6亿。
由于这是先前完全连接的结果,因此许多功能没有设置值。
我的目标是运行一个“填充”功能(fillna样式为python pandas),其中每个空功能值都将根据ID和Date设置为该列的先前可用值。
我正在尝试通过以下spark 2.2.1代码实现这一目标:
val rawDataset = sparkSession.read.option("header", "true").csv(inputLocation)
val window = Window.partitionBy("ID").orderBy("DATE").rowsBetween(-50000, -1)
val columns = Array(...) //first 30 columns initially, just to see it working
val rawDataSetFilled = columns.foldLeft(rawDataset) { (originalDF, columnToFill) =>
originalDF.withColumn(columnToFill, coalesce(col(columnToFill), last(col(columnToFill), ignoreNulls = true).over(window)))
}
Run Code Online (Sandbox Code Playgroud)
我正在Spark 2.2.1上的Amazon EMR的4 m4.large实例上运行此作业。并启用了动态分配。
作业运行了2小时以上而未完成。
我在代码级别上做错了吗?给定数据和实例的大小,我认为它应该在合理的时间内完成?而且我什至没有尝试使用全部500列,只有30列!
在容器日志中,我看到的都是许多这样的日志:
INFO codegen.CodeGenerator:166.677493毫秒内生成的代码
INFO execute.ExternalAppendOnlyUnsafeRowArray:已达到4096行的溢出阈值,切换到org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter
我尝试将参数spark.sql.windowExec.buffer.spill.threshold设置为更大的值,而没有任何影响。还有其他我应该知道的设置吗?那两条线是我在任何容器日志中看到的唯一一行。
在Ganglia中,我看到大多数CPU内核在完全使用情况附近达到峰值,但是内存使用情况低于可用的最大值。所有执行者均已分配并正在工作。
我设法不使用withColumn调用就重写了对折逻辑。显然,它们对于大量列可能非常慢,因此我也遇到了stackoverflow错误。
我很想知道为什么会有如此巨大的差异-以及查询计划执行在幕后究竟发生了什么,这使得withColumns重复调用如此缓慢。
证明非常有用的链接:Spark Jira问题和此stackoverflow问题
var rawDataset = sparkSession.read.option("header", "true").csv(inputLocation)
val window = Window.partitionBy("ID").orderBy("DATE").rowsBetween(Window.unboundedPreceding, Window.currentRow)
rawDataset = rawDataset.select(rawDataset.columns.map(column => coalesce(col(column), last(col(column), ignoreNulls = true).over(window)).alias(column)): _*)
rawDataset.write.option("header", "true").csv(outputLocation)
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1822 次 |
| 最近记录: |