Spark/Scala:前进填充最后一次观察

MrE*_*MrE 26 scala apache-spark apache-spark-sql

使用Spark 1.4.0,Scala 2.10

我一直试图找出一种方法来使用最后一次已知的观察来转发填充空值,但我没有看到一种简单的方法.我认为这是一件非常常见的事情,但找不到显示如何执行此操作的示例.

我看到函数向前转移填充NaN的值,或滞后/超前函数来填充或移位数据偏移量,但没有任何东西可以获取最后的已知值.

在线查看,我在R中看到很多关于同一件事的Q/A,但在Spark/Scala中没有.

我正在考虑在日期范围内进行映射,从结果中过滤出NaN并选择最后一个元素,但我想我对语法感到困惑.

使用DataFrames我尝试类似的东西

import org.apache.spark.sql.expressions.Window

val sqlContext = new HiveContext(sc)

var spec = Window.orderBy("Date")
val df = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load("test.csv")

val df2 = df.withColumn("testForwardFill", (90 to 0).map(i=>lag(df.col("myValue"),i,0).over(spec)).filter(p=>p.getItem.isNotNull).last)
Run Code Online (Sandbox Code Playgroud)

但这并没有让我任何地方.

过滤器部分不起作用; map函数返回一个spark.sql.Columns序列,但是filter函数需要返回一个Boolean,所以我需要从Column中获取一个值来测试,但似乎只有Column方法返回一个Column.

有没有办法在Spark上更"简单"地做到这一点?

感谢您的输入

编辑:

简单示例示例输入:

2015-06-01,33
2015-06-02,
2015-06-03,
2015-06-04,
2015-06-05,22
2015-06-06,
2015-06-07,
...
Run Code Online (Sandbox Code Playgroud)

预期产量:

2015-06-01,33
2015-06-02,33
2015-06-03,33
2015-06-04,33
2015-06-05,22
2015-06-06,22
2015-06-07,22
Run Code Online (Sandbox Code Playgroud)

注意:

  1. 我有很多列,其中许多都有这种缺失的数据模式,但不是在相同的日期/时间.如果我需要,我将一次完成一列变换.

编辑:

按照@ zero323的回答我试过这样:

    import org.apache.spark.sql.Row
    import org.apache.spark.rdd.RDD

    val rows: RDD[Row] = df.orderBy($"Date").rdd


    def notMissing(row: Row): Boolean = { !row.isNullAt(1) }

    val toCarry: scala.collection.Map[Int,Option[org.apache.spark.sql.Row]] = rows.mapPartitionsWithIndex{
   case (i, iter) => Iterator((i, iter.filter(notMissing(_)).toSeq.lastOption)) }
.collectAsMap

    val toCarryBd = sc.broadcast(toCarry)

    def fill(i: Int, iter: Iterator[Row]): Iterator[Row] = { if (iter.contains(null)) iter.map(row => Row(toCarryBd.value(i).get(1))) else iter }

    val imputed: RDD[Row] = rows.mapPartitionsWithIndex{ case (i, iter) => fill(i, iter)}
Run Code Online (Sandbox Code Playgroud)

广播变量最终作为没有空值的值列表.这是进步但我仍然无法使映射工作.但我什么都没得到,因为索引i中没有映射到原始数据,它映射到没有null的子集.

我在这里错过了什么?

编辑和解决方案(来自@ zero323的回答):

import org.apache.spark.sql.expressions.Window

val sqlContext = new HiveContext(sc)

var spec = Window.partitionBy("id").orderBy("Date")
val df = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load("test.csv")

val df2 = df.withColumn("test", coalesce((0 to 90).map(i=>lag(df.col("test"),i,0).over(spec)): _*))
Run Code Online (Sandbox Code Playgroud)

如果您使用的是RDD而不是DataFrame,请参阅下面的零323答案以获取更多选项.上面的解决方案可能不是最有效的,但对我有用.如果您正在寻求优化,请查看RDD解决方案.

zer*_*323 18

初步答案(单个时间序列假设):

首先,如果你不能提供PARTITION BY条款,请尝试避免窗口函数.它将数据移动到单个分区,因此大多数时候它根本不可行.

你能做的就是填补RDD使用空白mapPartitionsWithIndex.由于您没有提供示例数据或预期输出,因此将此视为伪代码而不是真正的Scala程序:

  • 首先DataFrame按日期排序并转换为RDD

    import org.apache.spark.sql.Row
    import org.apache.spark.rdd.RDD
    
    val rows: RDD[Row] = df.orderBy($"Date").rdd
    
    Run Code Online (Sandbox Code Playgroud)
  • next让我们找到每个分区的最后一个非null观察

    def notMissing(row: Row): Boolean = ???
    
    val toCarry: scala.collection.Map[Int,Option[org.apache.spark.sql.Row]] = rows
      .mapPartitionsWithIndex{ case (i, iter) => 
        Iterator((i, iter.filter(notMissing(_)).toSeq.lastOption)) }
      .collectAsMap
    
    Run Code Online (Sandbox Code Playgroud)
  • 并将其转换Map为广播

    val toCarryBd = sc.broadcast(toCarry)
    
    Run Code Online (Sandbox Code Playgroud)
  • 最后映射分区再次填补空白:

    def fill(i: Int, iter: Iterator[Row]): Iterator[Row] = {
      // If it is the beginning of partition and value is missing
      // extract value to fill from toCarryBd.value
      // Remember to correct for empty / only missing partitions
      // otherwise take last not-null from the current partition
    }
    
    val imputed: RDD[Row] = rows
      .mapPartitionsWithIndex{ case (i, iter) => fill(i, iter) } 
    
    Run Code Online (Sandbox Code Playgroud)
  • 最后转换回DataFrame

编辑(每组数据的分区/时间序列):

魔鬼在细节.如果你的数据完全被分区,那么整个问题就可以解决了groupBy.让我们假设您只是按类型"v"分区,T并且Date是一个整数时间戳:

def fill(iter: List[Row]): List[Row] = {
  // Just go row by row and fill with last non-empty value
  ???
}

val groupedAndSorted = df.rdd
  .groupBy(_.getAs[T]("k"))
  .mapValues(_.toList.sortBy(_.getAs[Int]("Date")))

val rows: RDD[Row] = groupedAndSorted.mapValues(fill).values.flatMap(identity)

val dfFilled = sqlContext.createDataFrame(rows, df.schema)
Run Code Online (Sandbox Code Playgroud)

这样您就可以同时填充所有列.

可以使用DataFrames而不是来回转换为RDD吗?

这取决于,虽然它不太可能有效.如果最大间隙相对较小,您可以执行以下操作:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.{WindowSpec, Window}
import org.apache.spark.sql.Column

val maxGap: Int = ???  // Maximum gap between observations
val columnsToFill: List[String] = ???  // List of columns to fill
val suffix: String = "_" // To disambiguate between original and imputed 

// Take lag 1 to maxGap and coalesce
def makeCoalesce(w: WindowSpec)(magGap: Int)(suffix: String)(c: String) = {
  // Generate lag values between 1 and maxGap
  val lags = (1 to maxGap).map(lag(col(c), _)over(w))
  // Add current, coalesce and set alias
  coalesce(col(c) +: lags: _*).alias(s"$c$suffix")
}


// For each column you want to fill nulls apply makeCoalesce
val lags: List[Column] = columnsToFill.map(makeCoalesce(w)(maxGap)("_"))


// Finally select
val dfImputed = df.select($"*" :: lags: _*)
Run Code Online (Sandbox Code Playgroud)

它可以很容易地调整到每列使用不同的最大间隙.

一个更简单的方式来实现类似的结果在最新版的Spark是使用last具有ignoreNulls:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

val w = Window.partitionBy($"k").orderBy($"Date")
  .rowsBetween(Window.unboundedPreceding, -1)

df.withColumn("value", coalesce($"value", last($"value", true).over(w)))
Run Code Online (Sandbox Code Playgroud)

虽然可以在partitionBy全局范围内删除子句并应用此方法,但对于大型数据集而言,这会非常昂贵.