如何在Scala Spark Job中的多个键上使用ReduceByKey

Vin*_*ent 6 scala mapreduce avro apache-spark

我是相对较新的火花,我正在尝试同时按多个键分组数据.

我有一些我映射的数据,所以最终看起来像这样:

((K1,K2,K3),(V1,V2))

我的目标是分组(K1,K2,K3)并分别将V1和V2加起来以得到:

((K1,K2,K3),(SUM(V1),SUM(V2))

这是我到目前为止的代码:

val filepath  = "file.avro"
val sc = new SparkContext(sparkConf)
val sqlContext = new SQLContext(sc)            
val data = sqlContext.read.avro(filepath)
val dataRDD = data.rdd

val mappedDataRDD = dataRDD.map{
   case (v, w, x, y, z) => ((v,w,x), (y, z))
}.reduceByKey((x,y)=> ???)
Run Code Online (Sandbox Code Playgroud)

所以我正在寻找如何reduceByKey所以我可以按(v,w,x)键分组并对y和z求和.

Sar*_*ngh 5

我认为您正在寻找并且应该使用的是aggregateByKey.

该方法采用两个参数组。第一个参数组取累加器的起始值。第二个参数组有两个功能,

  1. 将事物累积到累加器中的函数。
  2. 结合两个累加器的函数。

现在您可以按如下方式使用它,

val (accZeroY, accZeroZ): (Long, Long) = (0, 0) 

val mappedDataRDD = dataRDD
  .map({
    case (v, w, x, y, z) => ((v,w,x), (y, z))
  })
  .aggregateByKey((accZeroY, accZeroZ))(
    { case ((accY, accZ), (y, z)) =>  (accY + y, accZ + z) }
    { case ((accY1, accZ1), (accY2, accZ2)) => (accY1 + accY2, accZ1 + accZ2) }
  )
Run Code Online (Sandbox Code Playgroud)

正如您应该已经观察到的,在这种情况下,第二个参数组中的两个函数实际上是相同的。仅当与或type of the needed accumulation中的值类型相同时才有可能。key-value-RDDPairRDD

在这种情况下,您还可以使用reduceByKey它,您可以将其视为aggregateByKey将相同的函数作为两个函数参数传递,

val mappedDataRDD = dataRDD
  .map({
    case (v, w, x, y, z) => ((v,w,x), (y, z))
  })
  .reduceByKey(
    { case ((accY, accZ), (y, z)) =>  (accY + y, accZ + z) }
  )
Run Code Online (Sandbox Code Playgroud)

但在我看来,你should NOT使用reduceBykey. 我建议使用的原因aggregateByKey是因为大型数据集上的值累积有时会产生超出您的类型范围的结果。

例如,在您的情况下,我怀疑您(x, y)实际上是 an(Int, Int)并且您想使用(v, w, x)as key 来累积它。Int但每当你大量添加时......请记住,结果可能会超出你Int可以处理的范围。

所以...你会希望你的积累类型是具有更大范围的东西,(Int, Int)喜欢(Long, Long)reduceByKey不允许你这样做。所以......我会说也许你正在寻找并且应该使用aggregateByKey