Vin*_*ent 6 scala mapreduce avro apache-spark
我是相对较新的火花,我正在尝试同时按多个键分组数据.
我有一些我映射的数据,所以最终看起来像这样:
((K1,K2,K3),(V1,V2))
我的目标是分组(K1,K2,K3)并分别将V1和V2加起来以得到:
((K1,K2,K3),(SUM(V1),SUM(V2))
这是我到目前为止的代码:
val filepath = "file.avro"
val sc = new SparkContext(sparkConf)
val sqlContext = new SQLContext(sc)
val data = sqlContext.read.avro(filepath)
val dataRDD = data.rdd
val mappedDataRDD = dataRDD.map{
case (v, w, x, y, z) => ((v,w,x), (y, z))
}.reduceByKey((x,y)=> ???)
Run Code Online (Sandbox Code Playgroud)
所以我正在寻找如何reduceByKey所以我可以按(v,w,x)键分组并对y和z求和.
我认为您正在寻找并且应该使用的是aggregateByKey.
该方法采用两个参数组。第一个参数组取累加器的起始值。第二个参数组有两个功能,
现在您可以按如下方式使用它,
val (accZeroY, accZeroZ): (Long, Long) = (0, 0)
val mappedDataRDD = dataRDD
.map({
case (v, w, x, y, z) => ((v,w,x), (y, z))
})
.aggregateByKey((accZeroY, accZeroZ))(
{ case ((accY, accZ), (y, z)) => (accY + y, accZ + z) }
{ case ((accY1, accZ1), (accY2, accZ2)) => (accY1 + accY2, accZ1 + accZ2) }
)
Run Code Online (Sandbox Code Playgroud)
正如您应该已经观察到的,在这种情况下,第二个参数组中的两个函数实际上是相同的。仅当与或type of the needed accumulation中的值类型相同时才有可能。key-value-RDDPairRDD
在这种情况下,您还可以使用reduceByKey它,您可以将其视为aggregateByKey将相同的函数作为两个函数参数传递,
val mappedDataRDD = dataRDD
.map({
case (v, w, x, y, z) => ((v,w,x), (y, z))
})
.reduceByKey(
{ case ((accY, accZ), (y, z)) => (accY + y, accZ + z) }
)
Run Code Online (Sandbox Code Playgroud)
但在我看来,你should NOT使用reduceBykey. 我建议使用的原因aggregateByKey是因为大型数据集上的值累积有时会产生超出您的类型范围的结果。
例如,在您的情况下,我怀疑您(x, y)实际上是 an(Int, Int)并且您想使用(v, w, x)as key 来累积它。Int但每当你大量添加时......请记住,结果可能会超出你Int可以处理的范围。
所以...你会希望你的积累类型是具有更大范围的东西,(Int, Int)喜欢(Long, Long)但reduceByKey不允许你这样做。所以......我会说也许你正在寻找并且应该使用aggregateByKey