如何获得Spark RDD的SQL row_number等价物?

Gle*_*ker 25 sql row-number apache-spark rdd

我需要为包含许多列的数据表生成row_numbers的完整列表.

在SQL中,这将如下所示:

select
   key_value,
   col1,
   col2,
   col3,
   row_number() over (partition by key_value order by col1, col2 desc, col3)
from
   temp
;
Run Code Online (Sandbox Code Playgroud)

现在,让我们说在Spark中我有一个形式为(K,V)的RDD,其中V =(col1,col2,col3),所以我的条目就像

(key1, (1,2,3))
(key1, (1,4,7))
(key1, (2,2,3))
(key2, (5,5,5))
(key2, (5,5,9))
(key2, (7,5,5))
etc.
Run Code Online (Sandbox Code Playgroud)

我想使用sortBy(),sortWith(),sortByKey(),zipWithIndex等命令对它们进行排序,并使用正确的row_number创建一个新的RDD.

(key1, (1,2,3), 2)
(key1, (1,4,7), 1)
(key1, (2,2,3), 3)
(key2, (5,5,5), 1)
(key2, (5,5,9), 2)
(key2, (7,5,5), 3)
etc.
Run Code Online (Sandbox Code Playgroud)

(我不关心括号,所以表格也可以是(K,(col1,col2,col3,rownum))而不是)

我该怎么做呢?

这是我的第一次尝试:

val sample_data = Seq(((3,4),5,5,5),((3,4),5,5,9),((3,4),7,5,5),((1,2),1,2,3),((1,2),1,4,7),((1,2),2,2,3))

val temp1 = sc.parallelize(sample_data)

temp1.collect().foreach(println)

// ((3,4),5,5,5)
// ((3,4),5,5,9)
// ((3,4),7,5,5)
// ((1,2),1,2,3)
// ((1,2),1,4,7)
// ((1,2),2,2,3)

temp1.map(x => (x, 1)).sortByKey().zipWithIndex.collect().foreach(println)

// ((((1,2),1,2,3),1),0)
// ((((1,2),1,4,7),1),1)
// ((((1,2),2,2,3),1),2)
// ((((3,4),5,5,5),1),3)
// ((((3,4),5,5,9),1),4)
// ((((3,4),7,5,5),1),5)

// note that this isn't ordering with a partition on key value K!

val temp2 = temp1.???
Run Code Online (Sandbox Code Playgroud)

另请注意,函数sortBy不能直接应用于RDD,但必须首先运行collect(),然后输出也不是RDD,而是数组

temp1.collect().sortBy(a => a._2 -> -a._3 -> a._4).foreach(println)

// ((1,2),1,4,7)
// ((1,2),1,2,3)
// ((1,2),2,2,3)
// ((3,4),5,5,5)
// ((3,4),5,5,9)
// ((3,4),7,5,5)
Run Code Online (Sandbox Code Playgroud)

这里有一些进展,但仍未分区:

val temp2 = sc.parallelize(temp1.map(a => (a._1,(a._2, a._3, a._4))).collect().sortBy(a => a._2._1 -> -a._2._2 -> a._2._3)).zipWithIndex.map(a => (a._1._1, a._1._2._1, a._1._2._2, a._1._2._3, a._2 + 1))

temp2.collect().foreach(println)

// ((1,2),1,4,7,1)
// ((1,2),1,2,3,2)
// ((1,2),2,2,3,3)
// ((3,4),5,5,5,4)
// ((3,4),5,5,9,5)
// ((3,4),7,5,5,6)
Run Code Online (Sandbox Code Playgroud)

dnl*_*rky 26

row_number() over (partition by ... order by ...)功能已添加到Spark 1.4中.这个答案使用PySpark/DataFrames.

创建一个测试DataFrame:

from pyspark.sql import Row, functions as F

testDF = sc.parallelize(
    (Row(k="key1", v=(1,2,3)),
     Row(k="key1", v=(1,4,7)),
     Row(k="key1", v=(2,2,3)),
     Row(k="key2", v=(5,5,5)),
     Row(k="key2", v=(5,5,9)),
     Row(k="key2", v=(7,5,5))
    )
).toDF()
Run Code Online (Sandbox Code Playgroud)

添加分区的行号:

from pyspark.sql.window import Window

(testDF
 .select("k", "v",
         F.rowNumber()
         .over(Window
               .partitionBy("k")
               .orderBy("k")
              )
         .alias("rowNum")
        )
 .show()
)

+----+-------+------+
|   k|      v|rowNum|
+----+-------+------+
|key1|[1,2,3]|     1|
|key1|[1,4,7]|     2|
|key1|[2,2,3]|     3|
|key2|[5,5,5]|     1|
|key2|[5,5,9]|     2|
|key2|[7,5,5]|     3|
+----+-------+------+
Run Code Online (Sandbox Code Playgroud)


Gui*_*e G 5

这是你提出的一个有趣的问题。我将用 Python 回答它,但我相信您将能够无缝地转换为 Scala。

这是我将如何解决它:

1- 简化您的数据:

temp2 = temp1.map(lambda x: (x[0],(x[1],x[2],x[3])))
Run Code Online (Sandbox Code Playgroud)

temp2 现在是一个“真正的”键值对。它看起来像这样:

[
((3, 4), (5, 5, 5)),  
((3, 4), (5, 5, 9)),   
((3, 4), (7, 5, 5)),   
((1, 2), (1, 2, 3)),  
((1, 2), (1, 4, 7)),   
((1, 2), (2, 2, 3))
Run Code Online (Sandbox Code Playgroud)

]

2-然后,使用group-by功能重现PARTITION BY的效果:

temp3 = temp2.groupByKey()
Run Code Online (Sandbox Code Playgroud)

temp3 现在是一个有 2 行的 RDD:

[((1, 2), <pyspark.resultiterable.ResultIterable object at 0x15e08d0>),  
 ((3, 4), <pyspark.resultiterable.ResultIterable object at 0x15e0290>)]
Run Code Online (Sandbox Code Playgroud)

3- 现在,您需要为 RDD 的每个值应用排名函数。在 python 中,我将使用简单的排序函数(枚举将创建您的 row_number 列):

 temp4 = temp3.flatMap(lambda x: tuple([(x[0],(i[1],i[0])) for i in enumerate(sorted(x[1]))])).take(10)
Run Code Online (Sandbox Code Playgroud)

请注意,要实现您的特定订单,您需要提供正确的“key”参数(在 python 中,我只会创建一个像这样的 lambda 函数:

lambda tuple : (tuple[0],-tuple[1],tuple[2])
Run Code Online (Sandbox Code Playgroud)

最后(没有关键参数函数,它看起来像这样):

[
((1, 2), ((1, 2, 3), 0)), 
((1, 2), ((1, 4, 7), 1)), 
((1, 2), ((2, 2, 3), 2)), 
((3, 4), ((5, 5, 5), 0)), 
((3, 4), ((5, 5, 9), 1)), 
((3, 4), ((7, 5, 5), 2))
Run Code Online (Sandbox Code Playgroud)

]

希望有帮助!

祝你好运。