Sur*_*aja 1 apache-spark apache-spark-sql
上下文
我正在使用Spark 1.5.
我有一个文件records.txt,它是ctrl A分隔的,在该文件中,第31个索引是用于subscriber_id.对于某些记录,subscriber_id为空.subscriber_id记录不为空.
这里subscriber_id(UK8jikahasjp23)位于最后一个属性之前:
99^A2013-12-11^A23421421412^qweqweqw2222^A34232432432^A365633049^A1^A6yudgfdhaf9923^AAC^APrimary DTV^AKKKR DATA+ PVR3^AGrundig^AKKKR PVR3^AKKKR DATA+ PVR3^A127b146^APVR3^AYes^ANo^ANo^ANo^AYes^AYes^ANo^A2017-08-07 21:27:30.000000^AYes^ANo^ANo^A6yudgfdhaf9923^A7290921396551747605^A2013-12-11 16:00:03.000000^A7022497306379992936^AUK8jikahasjp23^A
Run Code Online (Sandbox Code Playgroud)
subscriber_id记录为空:
23^A2013-12-11^A23421421412^qweqweqw2222^A34232432432^A365633049^A1^A6yudgfdhaf9923^AAC^APrimary DTV^AKKKR DATA+ PVR3^AGrundig^AKKKR PVR3^AKKKR DATA+ PVR3^A127b146^APVR3^AYes^ANo^ANo^ANo^AYes^AYes^ANo^A2017-08-07 21:27:30.000000^AYes^ANo^ANo^A6yudgfdhaf9923^A7290921396551747605^A2013-12-11 16:00:03.000000^A7022497306379992936^A^A
Run Code Online (Sandbox Code Playgroud)
问题
我收到了带有空subscriber_id的记录的java.lang.ArrayIndexOutOfBoundsException.
为什么spark 为字段subscriber_id的空值抛出 java.lang.ArrayIndexOutOfBoundsException?
16/08/20 10:22:18 WARN scheduler.TaskSetManager:阶段8.0中丢失的任务31.0:java.lang.ArrayIndexOutOfBoundsException:31
case class CustomerCard(accountNumber:String, subscriber_id:String,subscriptionStatus:String )
object CustomerCardProcess {
val log = LoggerFactory.getLogger(this.getClass.getName)
def doPerform(sc: SparkContext, sqlContext: HiveContext, custCardRDD: RDD[String]): DataFrame = {
import sqlContext.implicits._
log.info("doCustomerCardProcess method started")
val splitRDD = custCardRDD.map(elem => elem.split("\\u0001"))
val schemaRDD = splitRDD.map(arr => new CustomerCard( arr(3).trim, arr(31).trim,arr(8).trim))
schemaRDD.toDF().registerTempTable("customer_card")
val custCardDF = sqlContext.sql(
"""
|SELECT
|accountNumber,
|subscriber_id
|FROM
|customer_card
|WHERE
|subscriptionStatus IN('AB', 'AC', 'PC')
|AND accountNumber IS NOT NULL AND LENGTH(accountNumber) > 0
""".stripMargin)
log.info("doCustomerCardProcess method ended")
custCardDF
}
}
Run Code Online (Sandbox Code Playgroud)
错误
13/09/12 23:22:18 WARN scheduler.TaskSetManager:阶段8.0中的丢失任务31.0(TID 595,:java.lang.ArrayIndexOutOfBoundsException:31 at com.org.CustomerCardProcess $$ anonfun $ 2.apply(CustomerCardProcess.scala: 23)at com.org.CustomerCardProcess $$ anonfun $ 2.apply(CustomerCardProcess.scala:23)at scala.collection.Iterator $$ anon $ 11.next(Iterator.scala:328)at scala.collection.Iterator $$ anon $ 11 .next(Iterator.scala:328)at scala.collection.Iterator $$ anon $ 14.hasNext(Iterator.scala:389)at scala.collection.Iterator $$ anon $ 11.hasNext(Iterator.scala:327)at scala. collection.Iterator $$ anon $ 11.hasNext(Iterator.scala:327)atg.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.insertAll(BypassMergeSortShuffleWriter.java:118)org.apache.spark.shuffle.sort.SortShuffleWriter.在Org.apache.spache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)的org.apache上的org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)中编写(SortShuffleWriter.scala:73) .spark.scheduler.Task .run(Task.scala:88)位于java的java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)的org.apache.spark.executor.Executor $ TaskRunner.run(Executor.scala:214). util.concurrent.ThreadPoolExecutor $ Worker.run(ThreadPoolExecutor.java:615)at java.lang.Thread.run(Thread.java:745)
任何人都可以帮我解决这个问题吗?
该split函数忽略了分割线末端的所有空字段.所以,
更改以下行
val splitRDD = custCardRDD.map(elem => elem.split("\\u0001"))
Run Code Online (Sandbox Code Playgroud)
至
val splitRDD = custCardRDD.map(elem => elem.split("\\u0001", -1))
Run Code Online (Sandbox Code Playgroud)
-1 告诉考虑所有空字段.
| 归档时间: |
|
| 查看次数: |
2965 次 |
| 最近记录: |