如何在Spark中按分区对键/值进行分组?

man*_*ato 4 parallel-processing apache-spark spark-streaming

我有一个Spark Streaming应用程序,它每秒接收几条JSON消息,每个消息都有一个标识其源的ID.

使用此ID作为键,我能够执行a MapPartitionsToPair,从而创建一个JavaPairDStream,其中包含键/值对的RDD,每个分区一个键值对(因此,如果我收到5个JSON消息,例如,我得到一个RDD与5个分区,每个分区都将消息的ID作为密钥,并将JSON消息本身作为值).

我现在想做的是,我想将具有相同键的所有值分组到同一个分区中.因此,例如,如果我有3个带有键'a'的分区和2个带有键'b'的分区,我想创建一个带有2个分区而不是5个分区的新RDD,每个分区包含一个键所具有的所有值,一个用于'a'和一个'b'.

我怎么能做到这一点?到目前为止这是我的代码:

JavaReceiverInputDStream<String> streamData2 = ssc.socketTextStream(args[0], Integer.parseInt(args[1]),
            StorageLevels.MEMORY_AND_DISK_SER);

JavaPairDStream<String,String> streamGiveKey= streamData2.mapPartitionsToPair(new PairFlatMapFunction<Iterator<String>, String, String>() {
        @Override
        public Iterable<Tuple2<String, String>> call(Iterator<String> stringIterator) throws Exception {

            ArrayList<Tuple2<String,String>>a= new ArrayList<Tuple2<String, String>>();

            while (stringIterator.hasNext()){
                String c=stringIterator.next();
                if(c==null){
                    return null;

                }

                JsonMessage retMap = new Gson().fromJson(c,JsonMessage.class);
                String key= retMap.getSid();
                Tuple2<String,String> b= new Tuple2<String,String>(key,c);
                a.add(b);

                System.out.print(b._1+"_"+b._2);
                // }
                //break;
            }


            return a;
        }
    });
Run Code Online (Sandbox Code Playgroud)

//我创建了一个JavaPairDStream,其中每个分区包含一个键/值对.

我尝试使用grouByKey(),但无论消息的数量是多少,我的分区号都是2.

我该怎么做?非常感谢.

jav*_*dba 5

你可以使用

groupByKey(Integer numPartitions)
Run Code Online (Sandbox Code Playgroud)

并设置numPartitions等于您拥有的不同键的数量.

但是......你需要知道前面有多少个不同的按键.你有那些信息吗?可能不是.那么......你需要做一些额外的(/冗余的)工作.例如使用

countByKey
Run Code Online (Sandbox Code Playgroud)

作为第一步.这比groupByKey快 - 所以至少你没有把总处理时间加倍.

更新 OP问他们为什么默认获得2个分区.

默认groupByKey使用defaultPartitioner()方法

groupByKey(defaultPartitioner(self))
Run Code Online (Sandbox Code Playgroud)
  • Partitioner从具有最大基数的父分区中选择.

- 或者它会使用 spark.default.parallelism