Spark Dataframe:访问地图功能中的下一条记录

mha*_*dad 3 scala apache-spark

我有一个带时间戳列的DF,按此列排序.有没有办法做到这一点:对于每条记录,访问下一条记录来计算两条线之间的时间差异?我不认为这在地图功能中是可能的,因为可以在不同的节点上处理这两行.

谢谢!

Dan*_*ula 5

对于Spark 1.4或更高版本,如果您可以使用Hive Context,则以下代码可能适合您:

import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.LongType
import org.apache.spark.sql._

val hc = new HiveContext(sc)
val df = hc.read.format("...").load("...")

val timestamp_column = df("timestamp_column")
val next_row_timestamp = lead(timestamp_column, 1).over(Window.orderBy(timestamp_column))

val newDF = df.withColumn("time_difference", next_row_timestamp.cast(LongType) - timestamp_column.cast(LongType))
Run Code Online (Sandbox Code Playgroud)

说明:

在这段代码中,我使用lead(e: Column, offset: Int)functions包(doc)中提供的窗口函数.该函数实际上创建了一个新列,其中列e(timestamp_column在示例中)中的数据被offset(1在示例中)中的数据所覆盖.要正常工作,必须跟一个over(window: WindowSpec)调用,该调用使用Window对象定义一个窗口.该窗口可以由分区和订单组成.在这种情况下,我只使用订单设置Window.orderBy.

最后,我添加一个列,其中两列之间的差异(或毫秒?不确定)与使用的原始DataFrame相同withColumn.

有关更多详细信息,以下链接可以很好地解释这个想法,并提供示例:https: //databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html


编辑:

正如评论中指出的那样,上述解决方案效率非常低.作为替代方案,可以使用RDD解决方案:

val newRDD = df.rdd.zipWithIndex.flatMap {
  case (row, idx) => (0 to 1).map { lag => (idx - lag, row) }
}
.groupByKey
.values
.map { pair =>
  val pairArray = pair.toArray
  val timeDiff = {
    if (pairArray.length == 1) null
    else pairArray(1).getAs[java.sql.Timestamp]("timestamp_column").getTime - pairArray(0).getAs[java.sql.Timestamp]("timestamp_column").getTime
  }
  Row.merge(Row(timeDiff), pairArray(0))
}

val newSchema = StructType(StructField("time_diff", LongType, true) +: df.schema.fields)
val newDf = df.sqlContext.createDataFrame(newRDD, newSchema)
Run Code Online (Sandbox Code Playgroud)

结果数据框newDF将有一个新列"time_diff",其中包含当前行与下一行之间的时间差(以毫秒为单位).

  • 这是一种选择.另一个是从MLlib滑动.窗口函数的问题是没有`PARTITION BY`这几乎等同于`repartition(1)`. (2认同)