Spark和SparkSQL:如何模仿窗口功能?

Mar*_*nne 10 scala window-functions apache-spark apache-spark-sql

描述

给定一个数据帧 df

id |       date
---------------
 1 | 2015-09-01
 2 | 2015-09-01
 1 | 2015-09-03
 1 | 2015-09-04
 2 | 2015-09-04
Run Code Online (Sandbox Code Playgroud)

我想创建一个运行计数器或索引,

  • 按相同的ID分组
  • 按该组中的日期排序,

从而

id |       date |  counter
--------------------------
 1 | 2015-09-01 |        1
 1 | 2015-09-03 |        2
 1 | 2015-09-04 |        3
 2 | 2015-09-01 |        1
 2 | 2015-09-04 |        2
Run Code Online (Sandbox Code Playgroud)

这是我可以通过窗口功能实现的,例如

val w = Window.partitionBy("id").orderBy("date")
val resultDF = df.select( df("id"), rowNumber().over(w) )
Run Code Online (Sandbox Code Playgroud)

不幸的是,Spark 1.4.1不支持常规数据帧的窗口函数:

org.apache.spark.sql.AnalysisException: Could not resolve window function 'row_number'. Note that, using window functions currently requires a HiveContext;
Run Code Online (Sandbox Code Playgroud)

问题

  • 如何在不使用窗口函数的情况下在当前Spark 1.4.1上实现上述计算?
  • Spark中何时支持常规数据帧的窗口函数?

谢谢!

zer*_*323 7

你也可以HiveContext用于本地DataFrames,除非你有充分的理由不这样做,否则无论如何都可能是一个好主意.这是一个默认的SQLContext可用spark-shellpyspark外壳(如现在sparkR似乎使用纯SQLContext)和它的解析器是通过推荐星火SQL和数据帧指南.

import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.rowNumber

object HiveContextTest {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("Hive Context")
    val sc = new SparkContext(conf)
    val sqlContext = new HiveContext(sc)
    import sqlContext.implicits._

    val df = sc.parallelize(
        ("foo", 1) :: ("foo", 2) :: ("bar", 1) :: ("bar", 2) :: Nil
    ).toDF("k", "v")

    val w = Window.partitionBy($"k").orderBy($"v")
    df.select($"k", $"v", rowNumber.over(w).alias("rn")).show
  }
}
Run Code Online (Sandbox Code Playgroud)


Kir*_*rst 6

您可以使用RDD执行此操作.就个人而言,我发现RDD的API更有意义 - 我并不总是希望我的数据像数据帧一样"平坦".

val df = sqlContext.sql("select 1, '2015-09-01'"
    ).unionAll(sqlContext.sql("select 2, '2015-09-01'")
    ).unionAll(sqlContext.sql("select 1, '2015-09-03'")
    ).unionAll(sqlContext.sql("select 1, '2015-09-04'")
    ).unionAll(sqlContext.sql("select 2, '2015-09-04'"))

// dataframe as an RDD (of Row objects)
df.rdd 
  // grouping by the first column of the row
  .groupBy(r => r(0)) 
  // map each group - an Iterable[Row] - to a list and sort by the second column
  .map(g => g._2.toList.sortBy(row => row(1).toString))     
  .collect()
Run Code Online (Sandbox Code Playgroud)

以上结果如下:

Array[List[org.apache.spark.sql.Row]] = 
Array(
  List([1,2015-09-01], [1,2015-09-03], [1,2015-09-04]), 
  List([2,2015-09-01], [2,2015-09-04]))
Run Code Online (Sandbox Code Playgroud)

如果你想要在'组'中的位置,你可以使用zipWithIndex.

df.rdd.groupBy(r => r(0)).map(g => 
    g._2.toList.sortBy(row => row(1).toString).zipWithIndex).collect()

Array[List[(org.apache.spark.sql.Row, Int)]] = Array(
  List(([1,2015-09-01],0), ([1,2015-09-03],1), ([1,2015-09-04],2)),
  List(([2,2015-09-01],0), ([2,2015-09-04],1)))
Run Code Online (Sandbox Code Playgroud)

可以Row使用FlatMap将其平移回一个简单的List/Array 对象,但是如果您需要在'group'上执行任何不是一个好主意的事情.

像这样使用RDD的缺点是从DataFrame转换为RDD并再次返回是很繁琐的.