如何在数据集中使用java.time.LocalDate(使用java.lang.UnsupportedOperationException失败:找不到编码器)?

Jim*_*yan 11 scala apache-spark apache-spark-sql

  • Spark 2.1.1
  • 斯卡拉2.11.8
  • Java 8
  • Linux Ubuntu 16.04 LTS

我想将我的RDD转换为数据集.对于这一点,我用的implicits方法toDS()是给我下面的错误:

Exception in thread "main" java.lang.UnsupportedOperationException: No Encoder found for java.time.LocalDate
- field (class: "java.time.LocalDate", name: "date")
- root class: "observatory.TemperatureRow"
    at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:602)
    at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.apply(ScalaReflection.scala:596)
    at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.apply(ScalaReflection.scala:587)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
    at scala.collection.immutable.List.flatMap(List.scala:344)
Run Code Online (Sandbox Code Playgroud)

在我的情况下,我必须使用类型java.time.LocalDate,我不能使用java.sql.data.我已经读过我需要将informe Spark变换为Java类型转换为Sql类型,我这个方向,我构建了下面的2个implicits函数:

implicit def toSerialized(t: TemperatureRow): EncodedTemperatureRow = EncodedTemperatureRow(t.date.toString, t.location, t.temperature)
implicit def fromSerialized(t: EncodedTemperatureRow): TemperatureRow = TemperatureRow(LocalDate.parse(t.date), t.location, t.temperature)
Run Code Online (Sandbox Code Playgroud)

下面,我的应用程序的一些代码:

case class Location(lat: Double, lon: Double)

case class TemperatureRow(
                             date: LocalDate,
                             location: Location,
                             temperature: Double
                         )

case class EncodedTemperatureRow(
                             date: String,
                             location: Location,
                             temperature: Double

val s = Seq[TemperatureRow](
                    TemperatureRow(LocalDate.parse("2017-01-01"), Location(1.4,5.1), 4.9),
                    TemperatureRow(LocalDate.parse("2014-04-05"), Location(1.5,2.5), 5.5)
                )

import spark.implicits._
val temps: RDD[TemperatureRow] = sc.parallelize(s)
val tempsDS = temps.toDS
Run Code Online (Sandbox Code Playgroud)

我不知道为什么Spark搜索编码器java.time.LocalDate,我提供了隐含的转换TemperatureRowEncodedTemperatureRow...

Jac*_*ski 12

java.time.LocalDate不支持Spark 2.2(我一直试图Encoder为该类型编写一段时间而且失败了).

您必须转换java.time.LocalDate为其他一些受支持的类型,java.sql.Timestamp或者java.sql.Date是受支持的候选者.

  • 并澄清任何登陆此处的人:即使版本2.2的Spark也不能处理JDK8日期/时间类.https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala中的转换仅查看预先JDK8日期/时间类. (2认同)
  • Spark 3 似乎会支持它。请参阅 https://issues.apache.org/jira/browse/SPARK-27222 和 https://github.com/apache/spark/commit/0f4f8160e6d01d2e263adcf39d53bd0a03fc1b73#diff-f52e4a77ff9291d86359d609a9757781 (2认同)