在Zeppelin和Spark中解析CSV中的日期时间信息

Jin*_*Yoo 5 csv datetime scala apache-spark

我正在尝试读取CSV文件并构建数据框.

像打击一样的CSV格式.我使用ISO8602日期/时间格式进行数据/时间字符串表示.

2015-6-29T12:0:0,b82debd63cffb1490f8c9c647ca97845,G1J8RX22EGKP,2015-6-29T12:0:5,2015-6-29T12:0:6,0QA97RAM1GIV,2015-6-29T12:0:10,2015-6-29T12:0:11,2015-6-29T12:0:12,2015-6-29T12:5:42,1
2015-6-29T12:20:0,0d60c871bd9180275f1e4104d4b7ded0,5HNB7QZSUI2C,2015-6-29T12:20:5,2015-6-29T12:20:6,KSL2LB0R6367,2015-6-29T12:20:10,2015-6-29T12:20:11,2015-6-29T12:20:12,2015-6-29T12:25:13,1
......
Run Code Online (Sandbox Code Playgroud)

为了加载这些数据,我在Zeppelin中编写了scala代码,如下所示

import org.apache.spark.sql.types.DateType
import org.apache.spark.sql.functions._
import org.joda.time.DateTime
import org.joda.time.format.DateTimeFormat
import sys.process._

val logCSV = sc.textFile ("log_table.csv")

case class record(
    callingTime:DateTime, 
    userID:String, 
    CID:String, 
    serverConnectionTime:DateTime, 
    serverResponseTime:DateTime, 
    connectedAgentID:String, 
    beginCallingTime:DateTime, 
    endCallingTime:DateTime, 
    Succeed:Int)


val formatter = DateTimeFormat.forPattern("yyyy-mm-dd'T'kk:mm:ss")

val logTable = logCSV.map(s => s.split(",") ).map(
    s => record(
            formatter.parseDateTime( s(0) ), 
            s(1),
            s(2),
            formatter.parseDateTime( s(3) ), 
            formatter.parseDateTime( s(4) ), 
            s(5),
            formatter.parseDateTime( s(6) ), 
            formatter.parseDateTime( s(7) ),            
            s(8).toInt
        )
).toDF()
Run Code Online (Sandbox Code Playgroud)

它像下面那样犯了错误.主要问题是DateTime不可序列化.

logCSV: org.apache.spark.rdd.RDD[String] = log_table.csv MapPartitionsRDD[38] at textFile at <console>:169
defined class record
formatter: org.joda.time.format.DateTimeFormatter = org.joda.time.format.DateTimeFormatter@46051d99
org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:1623)
    at org.apache.spark.rdd.RDD.map(RDD.scala:286)
Run Code Online (Sandbox Code Playgroud)

然后我想知道我如何处理Scala中的日期/时间信息.你可以帮帮我吗?

hd1*_*hd1 2

虽然DateTime不可序列化,但如果您使用DateTimeFormatterparseMillis方法,您将获得一个 long,它是免费桥接到 Long(可序列化)的。要从 Long 中获取 DateTime,请使用构造函数。DateTime(longInstance.longValue())