Hbf*_*Hbf 2 java enums scala apache-spark
PairRDD当 RDD 的键是或包含枚举时,对 Spark的某些操作无法正常工作。
例如,下面的 Spark 代码需要两周的工作日并按工作日计算它们:
import java.time.DayOfWeek
val weekdays: Seq[(DayOfWeek, Int)] = DayOfWeek.values().map(dow => (dow, 1))
val numPartitions = 2 * weekdays.size
val result = sc
.parallelize(weekdays ++ weekdays, numPartitions)
.reduceByKey(_ + _)
.collect
.toSeq
println(result)
Run Code Online (Sandbox Code Playgroud)
在输出中,我希望每个工作日(例如,MONDAY)计数为 2,但是,在我的集群上,我得到:
WrappedArray(
(THURSDAY,1), (SATURDAY,1), (WEDNESDAY,2), (SATURDAY,1),
(MONDAY,2), (TUESDAY,2), (THURSDAY,1), (FRIDAY,2), (SUNDAY,2)
)
Run Code Online (Sandbox Code Playgroud)
如果您在具有单个节点(或设置numPartitions为 1)的集群上运行此程序,则结果是正确的(即,所有计数均为 2)。
SparkPairRDD的操作,如aggregateByKey(), reduceByKey(),combineByKey()采用可选参数来指定PartitionerSpark 要使用的 。如果您没有明确指定分区器,HashPartitioner则使用Spark 的,它调用行的键的hashCode()方法并使用它来将行分配给分区。但是,hashCode()不能保证枚举在不同的 JVM 进程上是相同的——即使它们运行在相同的 Java 版本上。因此,SparkxyzByKey()操作无法正常工作。
在上面的例子中,(THURSDAY, 1)输入中有两对,每对都在不同的执行器上进行处理。该示例使用HashPartitioner具有 14 (= numPartitions) 个分区的 。由于(THURSDAY, 1).hashCode() % 14在这两个执行器上产生不同的结果,因此这两行被发送到不同的执行器进行归约,从而导致错误的结果。
底线:不要HashPartitioner与哈希码在不同 JVM 进程上不一致的对象一起使用。特别是,不能保证以下对象在不同的 JVM 进程上产生相同的哈希码:
enum的sealed trait基于Scala的枚举:sealed trait TraitEnum
object TEA extends TraitEnum
object TEB extends TraitEnum
Run Code Online (Sandbox Code Playgroud)
abstract class基于Scala的枚举:sealed abstract class AbstractClassEnum
object ACA extends AbstractClassEnum
object ACB extends AbstractClassEnum
Run Code Online (Sandbox Code Playgroud)
hashCode()实现)。但是,case class基于Scala的枚举具有一致的哈希码,因此可以安全使用:
sealed case class CaseClassEnum(…) # “…" must be a non-empty list of parameters
object CCA extends CaseClassEnum(…)
object CCB extends CaseClassEnum(…)
Run Code Online (Sandbox Code Playgroud)
附加信息:
| 归档时间: |
|
| 查看次数: |
873 次 |
| 最近记录: |