Jiv*_*van 4 scala apache-spark apache-spark-dataset apache-spark-encoders
在以下代码段中,该tryParquet函数尝试从Parquet文件加载数据集(如果存在).如果没有,它会计算,持久并返回提供的数据集计划:
import scala.util.{Try, Success, Failure}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.Dataset
sealed trait CustomRow
case class MyRow(
id: Int,
name: String
) extends CustomRow
val ds: Dataset[MyRow] =
Seq((1, "foo"),
(2, "bar"),
(3, "baz")).toDF("id", "name").as[MyRow]
def tryParquet[T <: CustomRow](session: SparkSession, path: String, target: Dataset[T]): Dataset[T] =
Try(session.read.parquet(path)) match {
case Success(df) => df.as[T] // <---- compile error here
case Failure(_) => {
target.write.parquet(path)
target
}
}
val readyDS: Dataset[MyRow] =
tryParquet(spark, "/path/to/file.parq", ds)
Run Code Online (Sandbox Code Playgroud)
但是,这会产生编译错误df.as[T]:
无法找到存储在数据集中的类型的编码器.导入spark.implicits._支持原始类型(Int,String等)和产品类型(case类).
将来的版本中将添加对序列化其他类型的支持.
案例成功(df)=> df.as [T]
可以通过使tryParquet强制df转换返回无类型DataFrame并让调用者强制转换为所需的构造函数来避免此问题.但是,在我们希望函数内部管理类型的情况下是否有任何解决方案?
Jiv*_*van 11
看起来可以通过Encoder在type参数中使用:
import org.apache.spark.sql.Encoder
def tryParquet[T <: CustomRow: Encoder](...)
Run Code Online (Sandbox Code Playgroud)
这样编译器就可以证明df.as[T]在构造对象时提供了一个编码器.
| 归档时间: |
|
| 查看次数: |
1798 次 |
| 最近记录: |