为什么Spark reduceByKey的结果不一致

Vik*_*del 4 hadoop scala apache-spark

我试图通过使用scala的spark来计算每行的迭代次数.
以下是我的意见:

1 vikram
2 sachin
3 shobit
4 alok
5 akul
5 akul
1 vikram
1 vikram
3 shobit
10 ashu
5 akul
1 vikram
2 sachin
7 vikram

现在我创建2个单独的RDD,如下所示.

val f1 = sc.textFile("hdfs:///path to above data file")
val m1 = f1.map( s => (s.split(" ")(0),1) ) //creating a tuple (key,1)
//now if i create a RDD as
val rd1 = m1.reduceByKey((a,b) => a+b )
rd1.collect().foreach(println)
//I get a proper output i.e (it gives correct output every time)
//output: (4,1) (2,2) (7,1) (5,3) (3,2) (1,4) (10,1)

//but if i create a RDD as
val rd2 = m1.reduceByKey((a,b) => a+1 )
rd2.collect().foreach(println)
//I get a inconsistent result i.e some times i get this (WRONG)
//output: (4,1) (2,2) (7,1) (5,2) (3,2) (1,2) (10,1)
//and sometimes I get this as output (CORRECT)
//output: (4,1) (2,2) (7,1) (5,3) (3,2) (1,4) (10,1) 
Run Code Online (Sandbox Code Playgroud)

我无法理解为什么会发生这种情况以及在哪里使用什么.我也尝试过创建RDD

val m2 = f1.map(s => (s,1))
val rd3 = m2.reduceByKey((a,b) => a+1 )
// Then also same issue occurs with a+1 but every thing works fine with a+b
Run Code Online (Sandbox Code Playgroud)

Tza*_*har 7

reduceByKey假设传递的函数是可交换的关联的(正如文档清楚地说明的那样).并且 - 你的第一个功能(a, b) => a + b ,但(a, b) => a+1 不是.

为什么? 首先 - reduceByKey将提供的函数应用于每个分区,然后应用于所有分区的组合结果.换句话说,b并非总是1如此,因此使用a+1简单是不正确的.

考虑以下场景 - 输入包含4条记录,分为两个分区:

(aa, 1)
(aa, 1)

(aa, 1)
(cc, 1)
Run Code Online (Sandbox Code Playgroud)

reduceByKey(f) 此输入可能计算如下:

val intermediate1 = f((aa, 1), (aa, 1)) 
val intermediate2 = f((aa, 1), (cc, 1))

val result = f(intermediate2, intermediate1)
Run Code Online (Sandbox Code Playgroud)

现在,让我们跟着这个 f = (a, b) => a + b

val intermediate1 = f((aa, 1), (aa, 1))       // (aa, 2)
val intermediate2 = f((aa, 1), (cc, 1))       // (aa, 1), (cc, 1)

val result = f(intermediate2, intermediate1)  // (aa, 3), (cc, 1)
Run Code Online (Sandbox Code Playgroud)

并与f = (a, b) => a + 1:

val intermediate1 = f((aa, 1), (bb, 1))       // (aa, 2)
val intermediate2 = f((aa, 1), (cc, 1))       // (aa, 1), (cc, 1)

// this is where it goes wrong:
val result = f(intermediate2, intermediate1)  // (aa, 2), (cc, 1)
Run Code Online (Sandbox Code Playgroud)

主要的是 - 中间计算的顺序不能得到保证,并且可能在执行之间发生变化,对于后一种非交换函数的情况,这意味着结果有时是错误的.