cer*_*ier 5 scala apache-spark apache-spark-sql
使用Spark 2.x,我似乎无法使用由case类组成的Row的RDD创建Dataframe.
它在Spark 1.6.x上运行良好,但在2.x运行异常时失败:
java.lang.RuntimeException: Timestamp is not a valid external type for schema of struct<seconds:bigint,nanos:int>
Run Code Online (Sandbox Code Playgroud)
之前是来自Catalyst的一堆生成代码.
这是片段(我正在做的简化版):
package main
import org.apache.spark.sql.{SparkSession, Row}
import org.apache.spark.sql.types.{IntegerType, LongType, StructField, StructType}
object Test {
case class Timestamp(seconds: Long, nanos: Int)
val TIMESTAMP_TYPE = StructType(List(
StructField("seconds", LongType, false),
StructField("nanos", IntegerType, false)
))
val SCHEMA = StructType(List(
StructField("created_at", TIMESTAMP_TYPE, true)
))
def main(args: Array[String]) {
val spark = SparkSession.builder().getOrCreate()
val rowRDD = spark.sparkContext.parallelize(Seq((0L, 0))).map {
case (seconds: Long, nanos: Int) => {
Row(Timestamp(seconds, nanos))
}
}
spark.createDataFrame(rowRDD, SCHEMA).show(1)
}
}
Run Code Online (Sandbox Code Playgroud)
我不确定这是一个Spark bug还是我在文档中遗漏的东西(我知道Spark 2.x引入了运行时行编码验证,也许这是相关的)
非常感谢
我不确定它是否是一个错误,但混合动态类型Row,案例类和显式架构没有多大意义.使用Rows和架构:
import collection.mutable._
import collection.JavaConverters._
spark.createDataFrame(ArrayBuffer(Row(Row(0L, 0))).asJava, SCHEMA)
Run Code Online (Sandbox Code Playgroud)
或案例类:
import spark.implicits._
Seq(Tuple1(Timestamp(0L, 0))).toDF("created_at")
Run Code Online (Sandbox Code Playgroud)
否则你只是两次做同样的工作.
注意:
如果你想表达可以为空的字段,你可以使用Options.例如
case class Record(created_at: Option[Timestamp])
case class Timestamp(seconds: Long, nanos: Option[Int])
Seq(Record(Some(Timestamp(0L, Some(0))))).toDF
Run Code Online (Sandbox Code Playgroud)
将生成架构在哪里created_at和created_at.milliseconds可以NULL,但created_at.seconds必须设置,如果created_at 不是NULL.
| 归档时间: |
|
| 查看次数: |
2699 次 |
| 最近记录: |