如何使用Scala在Spark 2.1中将毫秒级字符串列转换为毫秒级时间戳?

kei*_*fly 7 datetime scala apache-spark

我在Scala中使用Spark 2.1。

如何将以毫秒为单位的字符串列转换为以毫秒为单位的时间戳?

我从问题更好的方法中尝试了以下代码,将字符串字段转换为Spark中的时间戳

import org.apache.spark.sql.functions.unix_timestamp
val tdf = Seq((1L, "05/26/2016 01:01:01.601"), (2L, "#$@#@#")).toDF("id", "dts")
val tts = unix_timestamp($"dts", "MM/dd/yyyy HH:mm:ss.SSS").cast("timestamp")
tdf.withColumn("ts", tts).show(2, false)
Run Code Online (Sandbox Code Playgroud)

但是我得到的结果没有毫秒:

+---+-----------------------+---------------------+
|id |dts                    |ts                   |
+---+-----------------------+---------------------+
|1  |05/26/2016 01:01:01.601|2016-05-26 01:01:01.0|
|2  |#$@#@#                 |null                 |
+---+-----------------------+---------------------+
Run Code Online (Sandbox Code Playgroud)

kei*_*fly 7

具有SimpleDateFormat的UDF可以工作。这个想法来自Ram Ghadiyaram与UDF 逻辑的联系。

import java.text.SimpleDateFormat
import java.sql.Timestamp
import org.apache.spark.sql.functions.udf
import scala.util.{Try, Success, Failure}

val getTimestamp: (String => Option[Timestamp]) = s => s match {
  case "" => None
  case _ => {
    val format = new SimpleDateFormat("MM/dd/yyyy' 'HH:mm:ss.SSS")
    Try(new Timestamp(format.parse(s).getTime)) match {
      case Success(t) => Some(t)
      case Failure(_) => None
    }    
  }
}

val getTimestampUDF = udf(getTimestamp)
val tdf = Seq((1L, "05/26/2016 01:01:01.601"), (2L, "#$@#@#")).toDF("id", "dts")
val tts = getTimestampUDF($"dts")
tdf.withColumn("ts", tts).show(2, false)
Run Code Online (Sandbox Code Playgroud)

输出:

+---+-----------------------+-----------------------+
|id |dts                    |ts                     |
+---+-----------------------+-----------------------+
|1  |05/26/2016 01:01:01.601|2016-05-26 01:01:01.601|
|2  |#$@#@#                 |null                   |
+---+-----------------------+-----------------------+
Run Code Online (Sandbox Code Playgroud)


Pau*_*vis 6

有比制作UDF更简单的方法。只需解析毫秒数据并将其添加到unix时间戳中(以下代码与pyspark一起使用,并且应该与scala等效)非常接近:

timeFmt = "yyyy/MM/dd HH:mm:ss.SSS"
df = df.withColumn('ux_t', unix_timestamp(df.t, format=timeFmt) + substring(df.t, -3, 3).cast('float')/1000)
Run Code Online (Sandbox Code Playgroud)

结果:“ 2017/03/05 14:02:41.865”转换为1488722561.865