枚举是 Spark PairRDD 导致问题的关键

Hbf*_*Hbf 2 java enums scala apache-spark

PairRDD当 RDD 的键是或包含枚举时,对 Spark的某些操作无法正常工作。

例如,下面的 Spark 代码需要两周的工作日并按工作日计算它们:

import java.time.DayOfWeek
val weekdays: Seq[(DayOfWeek, Int)] = DayOfWeek.values().map(dow => (dow, 1))
val numPartitions = 2 * weekdays.size
val result = sc
  .parallelize(weekdays ++ weekdays, numPartitions)
  .reduceByKey(_ + _)
  .collect
  .toSeq
println(result)
Run Code Online (Sandbox Code Playgroud)

在输出中,我希望每个工作日(例如,MONDAY)计数为 2,但是,在我的集群上,我得到:

WrappedArray(
  (THURSDAY,1), (SATURDAY,1), (WEDNESDAY,2), (SATURDAY,1),
  (MONDAY,2), (TUESDAY,2), (THURSDAY,1), (FRIDAY,2), (SUNDAY,2)
)
Run Code Online (Sandbox Code Playgroud)

如果您在具有单个节点(或设置numPartitions为 1)的集群上运行此程序,则结果是正确的(即,所有计数均为 2)。

Hbf*_*Hbf 6

SparkPairRDD的操作,如aggregateByKey(), reduceByKey()combineByKey()采用可选参数来指定PartitionerSpark 要使用的 。如果您没有明确指定分区器,HashPartitioner则使用Spark 的,它调用行的键的hashCode()方法并使用它来将行分配给分区。但是,hashCode()不能保证枚举在不同的 JVM 进程上是相同的——即使它们运行在相同的 Java 版本上。因此,SparkxyzByKey()操作无法正常工作。

在上面的例子中,(THURSDAY, 1)输入中有两对,每对都在不同的执行器上进行处理。该示例使用HashPartitioner具有 14 (= numPartitions) 个分区的 。由于(THURSDAY, 1).hashCode() % 14在这两个执行器上产生不同的结果,因此这两行被发送到不同的执行器进行归约,从而导致错误的结果。

底线:不要HashPartitioner与哈希码在不同 JVM 进程上不一致的对象一起使用。特别是,不能保证以下对象在不同的​​ JVM 进程上产生相同的哈希码:

  • Javaenum
  • sealed trait基于Scala的枚举:
sealed trait TraitEnum
object TEA extends TraitEnum
object TEB extends TraitEnum
Run Code Online (Sandbox Code Playgroud)
  • abstract class基于Scala的枚举:
sealed abstract class AbstractClassEnum
object ACA extends AbstractClassEnum
object ACB extends AbstractClassEnum
Run Code Online (Sandbox Code Playgroud)
  • 包含上述类型之一的嵌套对象的任何键(并且没有自定义hashCode()实现)。

但是,case class基于Scala的枚举具有一致的哈希码,因此可以安全使用:

sealed case class CaseClassEnum(…) # “…" must be a non-empty list of parameters
object CCA extends CaseClassEnum(…)
object CCB extends CaseClassEnum(…)
Run Code Online (Sandbox Code Playgroud)

附加信息: