将 Scalaz 与 Spark 一起使用时出现不可序列化的异常

Kon*_*nos 2 scala scalaz apache-spark

我做了一个简单的例子来尝试将scalaz 库代码与 Apache Spark 1.5集成。

这是一个简单的 Spark 程序来说明我的问题:

package test

import org.apache.spark.{ SparkConf, SparkContext }
import org.apache.spark.rdd.RDD

import ca.crim.deti.re.spark.sparkConf
import scalaz._
import scalaz.Scalaz._

object TestSpark {

  def main(args: Array[String]) = {
    val conf = new SparkConf().setAppName("Test").setMaster("local")
    val SC = new SparkContext(conf)
    val c = SC.parallelize(List(1, 2, 3, 4, 5))

    println(func1(c).count) // WORKS
    println(func2(c).count) // DOES NOT WORK.. NotSerializableException
  }

  // WORKS!
  def func1(rdd: RDD[Int]) = {
    rdd.filter { i => f(i, i) }
  }
  // DOES NOT WORK!
  def func2[I: Equal](rdd: RDD[I]) = {
    rdd.filter { i => f(i, i) }
  }

  def f[I: Equal](i1: I, i2: I) = {
    i1 === i2
  }
}
Run Code Online (Sandbox Code Playgroud)

我想func2通过Equal在函数定义中使用来完成工作。

在本地模式下在 Spark 环境中使用 执行时func2,出现以下异常:

使用 scalaz 时 Spark 环境异常

ade*_*rtc 5

由于您的函数有一个Equal[I]约束,Spark 正在关闭它并在进行分发时尝试对其进行序列化。由于scalaz.Equal类型类不是Serializable( https://github.com/scalaz/scalaz/blob/v7.2.0/core/src/main/scala/scalaz/Equal.scala#L10 ) Spark 在运行时失败了。

您可以Serialziable通过使用MeatLockerTwitter 的 chill 库来解决这个事实:https : //github.com/twitter/chill#the-meatlocker

或者,cats and algebra library(有一个Equal类似于你上面使用的类型类)有可序列化的类型类,你应该能够像上面那样使用那些没有问题的。