为什么在创建自定义案例类的数据集时"无法找到存储在数据集中的类型的编码器"?

cla*_*lay 53 scala apache-spark apache-spark-dataset apache-spark-encoders

使用Scala 2.11.8的Spark 2.0(最终版).以下超级简单代码会产生编译错误Error:(17, 45) Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases.

import org.apache.spark.sql.SparkSession

case class SimpleTuple(id: Int, desc: String)

object DatasetTest {
  val dataList = List(
    SimpleTuple(5, "abc"),
    SimpleTuple(6, "bcd")
  )

  def main(args: Array[String]): Unit = {
    val sparkSession = SparkSession.builder.
      master("local")
      .appName("example")
      .getOrCreate()

    val dataset = sparkSession.createDataset(dataList)
  }
}
Run Code Online (Sandbox Code Playgroud)

zer*_*323 77

Spark Datasets需要Encoders即将存储的数据类型.对于常见类型(原子,产品类型),有许多预定义的编码器可用,但您必须首先导入它们SparkSession.implicits才能使其工作:

val sparkSession: SparkSession = ???
import sparkSession.implicits._
val dataset = sparkSession.createDataset(dataList)
Run Code Online (Sandbox Code Playgroud)

或者,您可以直接提供明确的

import org.apache.spark.sql.{Encoder, Encoders}

val dataset = sparkSession.createDataset(dataList)(Encoders.product[SimpleTuple])
Run Code Online (Sandbox Code Playgroud)

或隐含的

implicit val enc: Encoder[SimpleTuple] = Encoders.product[SimpleTuple]
val dataset = sparkSession.createDataset(dataList)
Run Code Online (Sandbox Code Playgroud)

Encoder 对于存储的类型.

注意,EnocdersEncoders为原子类型提供了许多预定义,Encoders对于复杂的类型,可以派生出来ExpressionEncoder.

进一步阅读:

  • @DanCiborowski-MSFT `???` 是 [NotImplementedError](https://www.scala-lang.org/api/current/scala/NotImplementedError.html) - 换句话说,根据您的要求填写空白。 (2认同)

MrP*_*per 45

对于其他用户(您的用户是正确的),请注意您case classobject范围之外定义也很重要.所以:

失败:

object DatasetTest {
  case class SimpleTuple(id: Int, desc: String)

  val dataList = List(
    SimpleTuple(5, "abc"),
    SimpleTuple(6, "bcd")
  )

  def main(args: Array[String]): Unit = {
    val sparkSession = SparkSession.builder
      .master("local")
      .appName("example")
      .getOrCreate()
    val dataset = sparkSession.createDataset(dataList)
  }
}
Run Code Online (Sandbox Code Playgroud)

添加implicits仍然失败并出现相同的错误:

object DatasetTest {
  case class SimpleTuple(id: Int, desc: String)

  val dataList = List(
    SimpleTuple(5, "abc"),
    SimpleTuple(6, "bcd")
  )

  def main(args: Array[String]): Unit = {
    val sparkSession = SparkSession.builder
      .master("local")
      .appName("example")
      .getOrCreate()

    import sparkSession.implicits._
    val dataset = sparkSession.createDataset(dataList)
  }
}
Run Code Online (Sandbox Code Playgroud)

作品:

case class SimpleTuple(id: Int, desc: String)

object DatasetTest {   
  val dataList = List(
    SimpleTuple(5, "abc"),
    SimpleTuple(6, "bcd")
  )

  def main(args: Array[String]): Unit = {
    val sparkSession = SparkSession.builder
      .master("local")
      .appName("example")
      .getOrCreate()

    import sparkSession.implicits._
    val dataset = sparkSession.createDataset(dataList)
  }
}
Run Code Online (Sandbox Code Playgroud)

这是相关的错误:https://issues.apache.org/jira/browse/SPARK-13​​540,所以希望它将在Spark 2的下一个版本中修复.

(编辑:看起来那个bugfix实际上是在Spark 2.0.0中...所以我不确定为什么这仍然失败).

  • 这似乎仍然是Spark 2.4中的一个问题 (4认同)
  • 对于spark 3.0.1,它仍然失败,并且case class需要在对象范围之外。 (3认同)