首先按值排序JavaPairRDD然后按键排序

Jay*_*Jay 12 java hadoop apache-spark

我正在尝试按值对RDD进行排序,如果多个值相等,那么我需要通过按字典顺序键这些值.

代码:

JavaPairRDD <String,Long> rddToSort = rddMovieReviewReducedByKey.mapToPair(new PairFunction < Tuple2 < String, MovieReview > , String, Long > () {

    @Override
    public Tuple2 < String, Long > call(Tuple2 < String, MovieReview > t) throws Exception {
        return new Tuple2 < String, Long > (t._1, t._2.count);
    }
});
Run Code Online (Sandbox Code Playgroud)

到目前为止我所做的是,使用takeOrdered并提供一个CustomComperator,但由于takeOrdered无法处理大量数据,因此在运行代码时它会一直存在(它占用了操作系统无法处理的大量内存):

List < Tuple2 < String, Long >> rddSorted = rddMovieReviewReducedByKey.mapToPair(new PairFunction < Tuple2 < String, MovieReview > , String, Long > () {

    @Override
    public Tuple2 < String, Long > call(Tuple2 < String, MovieReview > t) throws Exception {
        return new Tuple2 < String, Long > (t._1, t._2.count);
    }
}).takeOrdered(newTopMovies, MapLongValueComparator.VALUE_COMP);
Run Code Online (Sandbox Code Playgroud)

Comperator:

    static class MapLongValueComparator implements Comparator < Tuple2 < String, Long >> , Serializable {
        private static final long serialVersionUID = 1L;

        private static final MapLongValueComparator VALUE_COMP = new MapLongValueComparator();

        @Override
        public int compare(Tuple2 < String, Long > o1, Tuple2 < String, Long > o2) {
            if (o1._2.compareTo(o2._2) == 0) {
                return o1._1.compareTo(o2._1);
            }
            return -o1._2.compareTo(o2._2);
        }
}
Run Code Online (Sandbox Code Playgroud)

错误:

16/06/30 21:09:23 INFO scheduler.DAGScheduler: Job 18 failed: takeOrdered at MovieAnalyzer.java:708, took 418.149182 s
Run Code Online (Sandbox Code Playgroud)

你会如何排序这个RDD?你如何TopKMovies考虑考虑价值,并且在字面上按字典顺序键.

谢谢.

Jay*_*Jay 3

<String, Long>在将PairRDD 映射到< Tuple2<String,Long> , Long>PairRDD后,使用带有比较器和分区的 sortByKey 解决了问题

JavaPairRDD <Tuple2<String,Long>, Long> sortedRdd = rddMovieReviewReducedByKey.mapToPair(new PairFunction < Tuple2 < String, MovieReview > , Tuple2<String,Long>, Long > () {

    @Override
    public Tuple2 < Tuple2<String,Long>, Long > call(Tuple2 < String, MovieReview > t) throws Exception {
        return new Tuple2 < Tuple2<String,Long>, Long > (new Tuple2<String,Long>(t._1,t._2.count), t._2.count);
    }
}).sortByKey(new TupleMapLongComparator(), true, 100);


JavaPairRDD <String,Long> sortedRddToPairs = sortedRdd.mapToPair(new PairFunction<Tuple2<Tuple2<String,Long>,Long>, String, Long>() {

    @Override
    public Tuple2<String, Long> call(
            Tuple2<Tuple2<String, Long>, Long> t) throws Exception {
        return new Tuple2 < String, Long > (t._1._1, t._1._2);
    }

});
Run Code Online (Sandbox Code Playgroud)

比较器:

private class TupleMapLongComparator implements Comparator<Tuple2<String,Long>>, Serializable {
    @Override
    public int compare(Tuple2<String,Long> tuple1, Tuple2<String,Long> tuple2) {

        if (tuple1._2.compareTo(tuple2._2) == 0) {
            return tuple1._1.compareTo(tuple2._1);
        }
        return -tuple1._2.compareTo(tuple2._2);
    }
}
Run Code Online (Sandbox Code Playgroud)