joda DateTime格式导致spark RDD函数中的空指针错误

bou*_*eli 9 scala apache-spark

异常消息如下

用户类引发异常:作业因阶段失败而中止:阶段1.0中的任务0失败4次,最近失败:阶段1.0中失去的任务0.3(TID 11,10.215.155.82):org.joda中的java.lang.NullPointerException. org.joda.time.ff上的org.joda.time.ff.CachedDateTimeZone.getOffset(CachedDateTimeZone.java:103)中的time.tz.CachedDateTimeZone.getInfo(CachedDateTimeZone.java:133)org.joda.time.format.DateTimeFormatter.printTo(DateTimeFormatter.java: 676)org.joda.time.format.DateTimeFormatter.printTo(DateTimeFormatter.java:521)atg.joda.time.format.DateTimeFormatter.print(DateTimeFormatter.java:625)org.joda.time.base.AbstractDateTime .toString(AbstractDateTime.java:328)at com.xxx.ieg.face.demo.DateTimeNullReferenceReappear $$ anonfun $ 3 $$ anonfun $ apply $ 1.apply(DateTimeNullReferenceReappear.scala:41)at com.xxx.ieg.face.demo .DateTimeNullReferenceReappear $$ anonfun $ 3 $$ anonfun $ apply $ 1.apply(DateTimeNullReferenceReappear.scala:41)at scala.collection.TraversableLike $$ anonfun $ groupBy $ 1.apply(TraversableLike.sc ala:328)在scala.collection.TraversableLike $$ anonfun $ groupBy $ 1.apply(TraversableLike.scala:327)at scala.collection.Iterator $ class.foreach(Iterator.scala:727)at org.apache.spark.util .collection.CompactBuffer $$ anon $ 1.foreach(CompactBuffer.scala:113)at scala.collection.IterableLike $ class.foreach(IterableLike.scala:72)at org.apache.spark.util.collection.CompactBuffer.foreach(CompactBuffer) .scala:28)at scala.collection.TraversableLike $ class.groupBy(TraversableLike.scala:327)atg.apache.spark.util.collection.CompactBuffer.groupBy(CompactBuffer.scala:28)at com.xxx.ieg. face.demo.DateTimeNullReferenceReappear $$ anonfun $ 3.apply(DateTimeNullReferenceReappear.scala:41)at com.xxx.ieg.face.demo.DateTimeNullReferenceReappear $$ anonfun $ 3.apply(DateTimeNullReferenceReappear.scala:40)at scala.collection.Iterator $ $ anon $ 11.next(Iterator.scala:328)at scala.collection.Iterator $$ anon $ 10.next(Iterator.scala:312)at scala.collection.Iterator $ class.foreach(Iterator.scala:727)at scala .collection.Abstr actIterator.foreach(Iterator.scala:1157)at scala.collection.generic.Growable $ class.$ plus $ plus $ eq(Growable.scala:48)at scala.collection.mutable.ArrayBuffer.$ plus $ plus $ eq( ArrayBuffer.scala:103)scala.collection.mutable.ArrayBuffer.$ plus $ plus $ eq(ArrayBuffer.scala:47)at scala.collection.TraversableOnce $ class.to(TraversableOnce.scala:273)at scala.collection. AbstractIterator.to(Iterator.scala:1157)scala.collection.TraversableOnce $ class.toBuffer(TraversableOnce.scala:265)at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)at scala.collection.TraversableOnce $ class .toArray(TraversableOnce.scala:252)at org.apache.spark.rdd.RDD $$ anonfun $ 26.apply(RDD.scala:1081)的scala.collection.AbstractIterator.toArray(Iterator.scala:1157).位于org.apache.spark的org.apache.spark.SparkContext $$ anonfun $ runJob $ 4.apply(SparkContext.scala:1314)的apache.spark.rdd.RDD $$ anonfun $ 26.apply(RDD.scala:1081). org.apache.spark.s中的SparkContext $$ anonfun $ runJob $ 4.apply(SparkContext.scala:1314)cheduler.ResultTask.runTask(ResultTask.scala:61)atg.apache.spark.scheduler.Task.run(Task.scala:56)at org.apache.spark.executor.Executor $ TaskRunner.run(Executor.scala: 196)java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)at java.util.concurrent.ThreadPoolExecutor $ Worker.run(ThreadPoolExecutor.java:615)at java.lang.Thread.run(Thread.java) :744)

我的代码如下:

import org.apache.hadoop.conf.Configuration
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext._
import org.apache.spark.{ SparkConf, SparkContext }
import org.joda.time.DateTime
import org.joda.time.format.{ DateTimeFormat, DateTimeFormatter }




object DateTimeNullReferenceReappear extends App {

  case class Record(uin: String = "", date: DateTime = null, value: Double = 0.0) 

  val cfg = new Configuration
  val sparkConf = new SparkConf()
  sparkConf.setAppName("bourne_exception_reappear")
  val sc = new SparkContext(sparkConf)

val data = TDWSparkContext.tdwTable(   // this function just read data from an data warehouse
  sc,
  tdwuser = FaceConf.TDW_USER,
  tdwpasswd = FaceConf.TDW_PASSWORD,
  dbName = "my_db",
  tblName = "my_table",
  parts = Array("p_20150323", "p_20150324", "p_20150325", "p_20150326", "p_20150327", "p_20150328", "p_20150329"))
  .map(row => {
    Record(uin = row(2),
      date = DateTimeFormat.forPattern("yyyyMMdd").parseDateTime(row(0)),
      value = row(4).toDouble)
  }).map(x => (x.uin, (x.date, x.value)))
  .groupByKey
  .map(x => {
    x._2.groupBy(_._1.toString("yyyyMMdd")).mapValues(_.map(_._2).sum)   // throw exception here
  })

//      val data = TDWSparkContext.tdwTable(  // It works, as I don't user datetime toString in the groupBy 
//      sc,
//      tdwuser = FaceConf.TDW_USER,
//      tdwpasswd = FaceConf.TDW_PASSWORD,
//      dbName = "hy",
//      tblName = "t_dw_cf_oss_tblogin",
//      parts = Array("p_20150323", "p_20150324", "p_20150325", "p_20150326", "p_20150327", "p_20150328", "p_20150329"))
//      .map(row => {
//        Record(uin = row(2),
//          date = DateTimeFormat.forPattern("yyyyMMdd").parseDateTime(row(0)),
//          value = row(4).toDouble)
//      }).map(x => (x.uin, (x.date.toString("yyyyMMdd"), x.value)))
//      .groupByKey
//      .map(x => {
//        x._2.groupBy(_._1).mapValues(_.map(_._2).sum)
//      })

  data.take(10).map(println)

}
Run Code Online (Sandbox Code Playgroud)

所以,似乎在groupBy中调用toString会导致异常,所以有人可以解释它吗?

谢谢

Mar*_*ier 10

您需要禁用Kryo,使用Kryo JodaTime Serializers,或者避免序列化DateTime对象,即传递Longs.