如何在spark中的每个分区中对数据进行排序?

Guo*_*Guo 2 apache-spark

有一些数据:

a   2
b   2
c   2
a   1
c   3
a   3
c   1
b   3
b   1
Run Code Online (Sandbox Code Playgroud)

当我重新分区数据,没有排序时,代码是:

val sc = new SparkContext
val file = sc.textFile(args(0)).map { a => {
           val splits = a.split("\t")
           (new MyObject(splits(0), splits(1).toInt),"") } }
           .partitionBy(new MyPartitioner(3)) //.sortByKey()    no sort
Run Code Online (Sandbox Code Playgroud)

结果是:

//file:part-00000
(a  2,)
(a  1,)
(a  3,)

//file:part-00001
(b  2,)
(b  3,)
(b  1,)

//file:part-00002
(c  2,)
(c  3,)
(c  1,)
Run Code Online (Sandbox Code Playgroud)

当我重新分区数据和排序时,代码是:

val sc = new SparkContext
val file = sc.textFile(args(0)).map { a => {
           val splits = a.split("\t")
           (new MyObject(splits(0), splits(1).toInt),"") } }
           .partitionBy(new MyPartitioner(3)).sortByKey() 
Run Code Online (Sandbox Code Playgroud)

结果是(这不是我想要的,排序的数据会影响原始分区):

//file:part-00000
(a  1,)
(a  2,)
(a  3,)
(b  1,)

//file:part-00001
(b  2,)
(b  3,)
(c  1,)

//file:part-00002
(c  2,)
(c  3,)
Run Code Online (Sandbox Code Playgroud)

我期望的结果是:

//file:part-00000
(a  1,)
(a  2,)
(a  3,)

//file:part-00001
(b  1,)
(b  2,)
(b  3,)

//file:part-00002
(c  1,)
(c  2,)
(c  3,)
Run Code Online (Sandbox Code Playgroud)

你可以帮帮我吗?非常感谢你!

sec*_*ree 6

sortWithinPartitions功能Datasets也有效。

http://spark.apache.org/docs/2.2.0/api/scala/index.html#org.apache.spark.sql.Dataset

因此,您可以使用以下样式

df.repartition(col("A"), col("B")).sortWithinPartitions(desc("C")) ...
Run Code Online (Sandbox Code Playgroud)