字段中的空值会生成MatchError

the*_*tom 2 scala apache-spark

以下是有趣的:

val rddSTG = sc.parallelize(
      List ( ("RTD","ANT","SOYA BEANS", "20161123", "20161123", 4000, "docid11", null, 5) , 
             ("RTD","ANT","SOYA BEANS", "20161124", "20161123", 6000, "docid11",  null, 4) ,
             ("RTD","ANT","BANANAS", "20161124", "20161123", 7000, "docid11", null, 9) ,    
             ("HAM","ANT","CORN", "20161123", "20161123", 1000, "docid22", null, 33),
             ("LIS","PAR","BARLEY", "20161123", "20161123", 11111, "docid33", null, 44)
           )
                          )

val dataframe = rddSTG.toDF("ORIG", "DEST", "PROD", "PLDEPDATE", "PLARRDATE", "PLCOST", "docid", "ACTARRDATE", "mutationseq")
dataframe.createOrReplaceTempView("STG")
spark.sql("SELECT * FROM STG ORDER BY PLDEPDATE DESC").show()
Run Code Online (Sandbox Code Playgroud)

它产生如下错误:

scala.MatchError: Null (of class scala.reflect.internal.Types$TypeRef$$anon$6)
Run Code Online (Sandbox Code Playgroud)

一旦我将其中一个null值更改为non-null,它的工作就会生效。我想我明白了,因为无法在现场进行推断,但这似乎很奇怪。有想法吗?

小智 5

问题是Any-scala中的泛型类型太多。在您的情况下NULL被视为ANY类型。

Spark只是不知道如何序列化NULL

我们应该明确提供一些特定的类型。

由于在Scala中无法将null分配给原始类型,因此可以使用String来匹配列其他值的数据类型。

所以试试这个:

case class Record(id: Int, name: String, score: Int, flag: String)
val sampleRdd = spark.sparkContext.parallelize(
  Seq(
    (1, null.asInstanceOf[String], 100, "YES"),
    (2, "RAKTOTPAL", 200, "NO"),
    (3, "BORDOLOI", 300, "YES"),
    (4, null.asInstanceOf[String], 400, "YES")))

sampleRdd.toDF("ID", "NAME", "SCORE","FLAG")
Run Code Online (Sandbox Code Playgroud)

这样,df将保留空值。

另一种方式

case class

case class Record(id: Int, name: String, score: Int, flag: String)

val sampleRdd = spark.sparkContext.parallelize(
  Seq(
    Record(1, null.asInstanceOf[String], 100, "YES"),
    Record(2, "RAKTOTPAL", 200, "NO"),
    Record(3, "BORDOLOI", 300, "YES"),
    Record(4, null.asInstanceOf[String], 400, "YES")))
sampleRdd.toDF()
Run Code Online (Sandbox Code Playgroud)