Mag*_*n88 3 scala apache-spark apache-spark-sql
使用 Spark 2.x,我正在使用数据帧。
val proposals = spark.read
.option("header", true)
.option("inferSchema", true)
.option("delimiter", ";")
.csv("/proposals.txt.gz")
proposals.printSchema()
Run Code Online (Sandbox Code Playgroud)
效果很好并给出:
root
|-- MARKETCODE: string (nullable = true)
|-- REFDATE: string (nullable = true)
|-- UPDTIME: string (nullable = true)
|-- UPDTIMEMSEC: integer (nullable = true)
|-- ENDTIME: string (nullable = true)
|-- ENDTIMEMSEC: integer (nullable = true)
|-- BONDCODE: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)
现在我想计算以毫秒为单位的时间,因此编写了一个函数:
def time2usecs( time:String, msec:Int )={
val Array(hour,minute,seconds) = time.split(":").map( _.toInt )
msec + seconds.toInt*1000 + minute.toInt*60*1000 + hour.toInt*60*60*1000
}
time2usecs( "08:13:44", 111 )
time2usecs: (time: String, msec: Int)Int
res90: Int = 29624111
Run Code Online (Sandbox Code Playgroud)
谜题最后的和平可能是这样的:
proposals.withColumn( "utime",
proposals.select("UPDTIME","UPDTIMEMSEC")
.map( (t,tms) => time2usecs(t,tms) ))
Run Code Online (Sandbox Code Playgroud)
但我不知道如何完成这个df.select(column1, column2).map(...)部分。
在 Spark 中的数据帧列上使用方法的常见方法是定义一个UDF(用户定义函数,请参阅此处以获取更多信息)。对于你的情况:
import org.apache.spark.sql.functions.udf
import spark.implicits._
val time2usecs = udf((time: String, msec: Int) => {
val Array(hour,minute,seconds) = time.split(":").map( _.toInt )
msec + seconds.toInt*1000 + minute.toInt*60*1000 + hour.toInt*60*60*1000
})
val df2 = df.withColumn("utime", time2usecs($"UPDTIME", $"UPDTIMEMSEC"))
Run Code Online (Sandbox Code Playgroud)
spark.implicits._在此导入以允许使用$该col()函数的简写。
| 归档时间: |
|
| 查看次数: |
16845 次 |
| 最近记录: |