将字符串字段转换为Spark中的时间戳的更好方法

use*_*109 20 scala apache-spark apache-spark-sql

我有一个CSV,其中一个字段是特定格式的日期时间.我无法直接在我的Dataframe中导入它,因为它需要是一个时间戳.所以我将它作为字符串导入并将其转换为Timestamp这样的

import java.sql.Timestamp
import java.text.SimpleDateFormat
import java.util.Date
import org.apache.spark.sql.Row

def getTimestamp(x:Any) : Timestamp = {
    val format = new SimpleDateFormat("MM/dd/yyyy' 'HH:mm:ss")
    if (x.toString() == "") 
    return null
    else {
        val d = format.parse(x.toString());
        val t = new Timestamp(d.getTime());
        return t
    }
}

def convert(row : Row) : Row = {
    val d1 = getTimestamp(row(3))
    return Row(row(0),row(1),row(2),d1)
}
Run Code Online (Sandbox Code Playgroud)

使用Dataframe API或spark-sql有更好,更简洁的方法吗?上述方法需要创建RDD并再次为Dataframe提供架构.

zer*_*323 48

Spark> = 2.2

从2.2开始,您可以直接提供格式字符串:

import org.apache.spark.sql.functions.to_timestamp

val ts = to_timestamp($"dts", "MM/dd/yyyy HH:mm:ss")

df.withColumn("ts", ts).show(2, false)

// +---+-------------------+-------------------+
// |id |dts                |ts                 |
// +---+-------------------+-------------------+
// |1  |05/26/2016 01:01:01|2016-05-26 01:01:01|
// |2  |#$@#@#             |null               |
// +---+-------------------+-------------------+
Run Code Online (Sandbox Code Playgroud)

Spark> = 1.6,<2.2

您可以使用Spark 1.5中引入的日期处理功能.假设您有以下数据:

val df = Seq((1L, "05/26/2016 01:01:01"), (2L, "#$@#@#")).toDF("id", "dts")
Run Code Online (Sandbox Code Playgroud)

您可以使用unix_timestamp解析字符串并将其强制转换为时间戳

import org.apache.spark.sql.functions.unix_timestamp

val ts = unix_timestamp($"dts", "MM/dd/yyyy HH:mm:ss").cast("timestamp")

df.withColumn("ts", ts).show(2, false)

// +---+-------------------+---------------------+
// |id |dts                |ts                   |
// +---+-------------------+---------------------+
// |1  |05/26/2016 01:01:01|2016-05-26 01:01:01.0|
// |2  |#$@#@#             |null                 |
// +---+-------------------+---------------------+
Run Code Online (Sandbox Code Playgroud)

正如您所看到的,它涵盖了解析和错误处理.格式字符串应与Java兼容SimpleDateFormat.

Spark> = 1.5,<1.6

你必须使用这样的东西:

unix_timestamp($"dts", "MM/dd/yyyy HH:mm:ss").cast("double").cast("timestamp")
Run Code Online (Sandbox Code Playgroud)

要么

(unix_timestamp($"dts", "MM/dd/yyyy HH:mm:ss") * 1000).cast("timestamp")
Run Code Online (Sandbox Code Playgroud)

由于SPARK-11724.

Spark <1.5

你应该可以使用exprHiveContext.


jar*_*daf 6

我还没有使用Spark SQL,但我认为这将是更惯用的scala(null使用不被认为是一个好习惯):

def getTimestamp(s: String) : Option[Timestamp] = s match {
  case "" => None
  case _ => {
    val format = new SimpleDateFormat("MM/dd/yyyy' 'HH:mm:ss")
    Try(new Timestamp(format.parse(s).getTime)) match {
      case Success(t) => Some(t)
      case Failure(_) => None
    }    
  }
}
Run Code Online (Sandbox Code Playgroud)

请注意我假设您Row事先知道元素类型(如果您从csv文件中读取它们,它们都是String),这就是为什么我使用正确的类型String而不是Any(一切都是子类型Any).

它还取决于您希望如何处理解析异常.在这种情况下,如果发生解析异常,None则只返回a.

您可以进一步使用它:

rows.map(row => Row(row(0),row(1),row(2), getTimestamp(row(3))
Run Code Online (Sandbox Code Playgroud)