如何将时间戳记作为额外列添加到数据框

jac*_*hik 4 immutability apache-spark rdd spark-dataframe

*大家好,

我对大家有一个简单的问题。我有一个使用createStream方法从kafka流创建的RDD。现在我想在转换为数据帧之前将时间戳记作为此rdd的值添加。我尝试使用withColumn()向数据框添加值,但返回此错误*

val topicMaps = Map("topic" -> 1)
    val now = java.util.Calendar.getInstance().getTime()

    val messages = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc, kafkaConf, topicMaps, StorageLevel.MEMORY_ONLY_SER)

      messages.foreachRDD(rdd =>
          {

            val sqlContext = new org.apache.spark.sql.SQLContext(sc)
            import sqlContext.implicits._

            val dataframe = sqlContext.read.json(rdd.map(_._2))



        val d =dataframe.withColumn("timeStamp_column",dataframe.col("now"))
Run Code Online (Sandbox Code Playgroud)

val d = dataframe.withColumn(“ timeStamp_column”,dataframe.col(“ now”))org.apache.spark.sql.AnalysisException:无法解析(动作,device_os_ver,device_type,event_name,item_name,lat, ,lon,memberid,productUpccd,tenantid);在org.apache.spark.sql.DataFrame $$ anonfun $ resolve $ 1.apply(DataFrame.scala:15

据我所知,DataFrames不可更改,因为它们是不可变的,但RDD也是不可变的。那么什么是最好的方法。如何将值添加到RDD(将时间戳动态添加到RDD)。

ven*_*kat 7

尝试使用current_timestamp函数。

current_timestamp() //org.apache.spark.sql.functions._    
df.withColumn("time_stamp", lit(current_timestamp()))
Run Code Online (Sandbox Code Playgroud)


Jav*_*tón 4

要添加具有时间戳等常量的新列,您可以使用lit函数:

import org.apache.spark.sql.functions._
val newDF = oldDF.withColumn("timeStamp_column", lit(System.currentTimeMillis))
Run Code Online (Sandbox Code Playgroud)