1.5.1| 星火流 | 带有 SQL createDataFrame 的 NullPointerException

Yuk*_*ura 1 apache-spark spark-streaming apache-spark-sql

我正在使用 Spark 1.5.1。

在流中context我得到SQLContext如下

SQLContext sqlContext = SQLContext.getOrCreate(records.context()); DataFrame dataFrame = sqlContext.createDataFrame(record, SchemaRecord.class); dataFrame.registerTempTable("records");

record 是一个 JavaRDD 每个 Record 具有以下结构

public class SchemaRecord implements Serializable {

private static final long serialVersionUID = 1L; 
private String msisdn; 
private String application_type; 
//private long uplink_bytes = 0L;
}
Run Code Online (Sandbox Code Playgroud)

当 msisdn 和 application_type 等字段类型只是字符串时,一切正常。

当我添加另一个字段,如Uplink_bytes 是Long类型时,我在createDataFrame 处得到以下NullPointer Exception

Exception in thread "main" java.lang.NullPointerException
at org.spark-project.guava.reflect.TypeToken.method(TypeToken.java:465)
at 
org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$2.apply(JavaTypeInference.scala:103)
at 
org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$2.apply(JavaTypeInference.scala:102)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
at org.apache.spark.sql.
catalyst.JavaTypeInference$.org$apache$spark$sql$catalyst$JavaTypeInference$$inferDataType(JavaTypeInference.scala:102)
at org.apache.spark.sql.catalyst.JavaTypeInference$.inferDataType(JavaTypeInference.scala:47)
at org.apache.spark.sql.SQLContext.getSchema(SQLContext.scala:1031)
at org.apache.spark.sql.SQLContext.createDataFrame(SQLContext.scala:519)
at org.apache.spark.sql.SQLContext.createDataFrame(SQLContext.scala:548)
Run Code Online (Sandbox Code Playgroud)

请建议

man*_*uel 5

您的问题可能是您的模型类不是一个干净的 JavaBean。目前 Spark 没有代码来处理具有 setter 但没有 getter 方法的属性。你可以简单地尝试这样的事情来检查 Spark 如何理解你的类:

PropertyDescriptor[] props = Introspector.getBeanInfo(YourClass.class).getPropertyDescriptors();
for(PropertyDescriptor prop:props) {
    System.out.println(prop.getDisplayName());
    System.out.println("\t"+prop.getReadMethod());
    System.out.println("\t"+prop.getWriteMethod());
}
Run Code Online (Sandbox Code Playgroud)

内省器还将只有 setter 的字段识别为 preoperties,这会在 Spark 中抛出 NullPointerException。