Mar*_*nne 10 scala window-functions apache-spark apache-spark-sql
给定一个数据帧 df
id | date
---------------
1 | 2015-09-01
2 | 2015-09-01
1 | 2015-09-03
1 | 2015-09-04
2 | 2015-09-04
Run Code Online (Sandbox Code Playgroud)
我想创建一个运行计数器或索引,
从而
id | date | counter
--------------------------
1 | 2015-09-01 | 1
1 | 2015-09-03 | 2
1 | 2015-09-04 | 3
2 | 2015-09-01 | 1
2 | 2015-09-04 | 2
Run Code Online (Sandbox Code Playgroud)
这是我可以通过窗口功能实现的,例如
val w = Window.partitionBy("id").orderBy("date")
val resultDF = df.select( df("id"), rowNumber().over(w) )
Run Code Online (Sandbox Code Playgroud)
不幸的是,Spark 1.4.1不支持常规数据帧的窗口函数:
org.apache.spark.sql.AnalysisException: Could not resolve window function 'row_number'. Note that, using window functions currently requires a HiveContext;
Run Code Online (Sandbox Code Playgroud)
谢谢!
你也可以HiveContext
用于本地DataFrames
,除非你有充分的理由不这样做,否则无论如何都可能是一个好主意.这是一个默认的SQLContext
可用spark-shell
和pyspark
外壳(如现在sparkR
似乎使用纯SQLContext
)和它的解析器是通过推荐星火SQL和数据帧指南.
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.rowNumber
object HiveContextTest {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Hive Context")
val sc = new SparkContext(conf)
val sqlContext = new HiveContext(sc)
import sqlContext.implicits._
val df = sc.parallelize(
("foo", 1) :: ("foo", 2) :: ("bar", 1) :: ("bar", 2) :: Nil
).toDF("k", "v")
val w = Window.partitionBy($"k").orderBy($"v")
df.select($"k", $"v", rowNumber.over(w).alias("rn")).show
}
}
Run Code Online (Sandbox Code Playgroud)
您可以使用RDD执行此操作.就个人而言,我发现RDD的API更有意义 - 我并不总是希望我的数据像数据帧一样"平坦".
val df = sqlContext.sql("select 1, '2015-09-01'"
).unionAll(sqlContext.sql("select 2, '2015-09-01'")
).unionAll(sqlContext.sql("select 1, '2015-09-03'")
).unionAll(sqlContext.sql("select 1, '2015-09-04'")
).unionAll(sqlContext.sql("select 2, '2015-09-04'"))
// dataframe as an RDD (of Row objects)
df.rdd
// grouping by the first column of the row
.groupBy(r => r(0))
// map each group - an Iterable[Row] - to a list and sort by the second column
.map(g => g._2.toList.sortBy(row => row(1).toString))
.collect()
Run Code Online (Sandbox Code Playgroud)
以上结果如下:
Array[List[org.apache.spark.sql.Row]] =
Array(
List([1,2015-09-01], [1,2015-09-03], [1,2015-09-04]),
List([2,2015-09-01], [2,2015-09-04]))
Run Code Online (Sandbox Code Playgroud)
如果你想要在'组'中的位置,你可以使用zipWithIndex
.
df.rdd.groupBy(r => r(0)).map(g =>
g._2.toList.sortBy(row => row(1).toString).zipWithIndex).collect()
Array[List[(org.apache.spark.sql.Row, Int)]] = Array(
List(([1,2015-09-01],0), ([1,2015-09-03],1), ([1,2015-09-04],2)),
List(([2,2015-09-01],0), ([2,2015-09-04],1)))
Run Code Online (Sandbox Code Playgroud)
您可以Row
使用FlatMap将其平移回一个简单的List/Array 对象,但是如果您需要在'group'上执行任何不是一个好主意的事情.
像这样使用RDD的缺点是从DataFrame转换为RDD并再次返回是很繁琐的.
归档时间: |
|
查看次数: |
12717 次 |
最近记录: |