Car*_*Pun 1 scala dataframe apache-spark apache-spark-sql
假设我有一个数据帧(存储在scala val中df
),其中包含来自csv的数据:
time,temperature
0,65
1,67
2,62
3,59
Run Code Online (Sandbox Code Playgroud)
我从文件中读取这个作为scala语言中的spark数据帧没有问题.
我想添加一个过滤列(通过过滤器我的意思是信号处理移动平均过滤),(比方说我想做(T[n]+T[n-1])/2.0
):
time,temperature,temperatureAvg
0,65,(65+0)/2.0
1,67,(67+65)/2.0
2,62,(62+67)/2.0
3,59,(59+62)/2.0
Run Code Online (Sandbox Code Playgroud)
(实际上,对于第一行说,我想要32.5
而不是(65+0)/2.0
.我写了它来澄清预期的2步时间过滤操作输出)
那么如何实现呢?我不熟悉火花数据帧操作,它沿着列迭代地组合行...
Spark 2.0+
在Spark 2.0及更高版本中,可以使用window
函数作为输入groupBy
.它允许你指定windowDuration
,slideDuration
和startTime
(偏移).它仅适用于TimestampType
列,但找到解决方法并不困难.在您的情况下,它将需要一些额外的步骤来纠正边界,但一般解决方案可以表示如下所示:
import org.apache.spark.sql.functions.{window, avg}
df
.withColumn("ts", $"time".cast("timestamp"))
.groupBy(window($"ts", windowDuration="2 seconds", slideDuration="1 second"))
.avg("temperature")
Run Code Online (Sandbox Code Playgroud)
Spark <2.0
如果有一种自然的方式来划分数据,您可以使用窗口函数,如下所示:
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.mean
val w = Window.partitionBy($"id").orderBy($"time").rowsBetween(-1, 0)
val df = sc.parallelize(Seq(
(1L, 0, 65), (1L, 1, 67), (1L, 2, 62), (1L, 3, 59)
)).toDF("id", "time", "temperature")
df.select($"*", mean($"temperature").over(w).alias("temperatureAvg")).show
// +---+----+-----------+--------------+
// | id|time|temperature|temperatureAvg|
// +---+----+-----------+--------------+
// | 1| 0| 65| 65.0|
// | 1| 1| 67| 66.0|
// | 1| 2| 62| 64.5|
// | 1| 3| 59| 60.5|
// +---+----+-----------+--------------+
Run Code Online (Sandbox Code Playgroud)
您可以使用lead
/ lag
functions 创建具有任意权重的窗口:
lit(0.6) * $"temperature" +
lit(0.3) * lag($"temperature", 1) +
lit(0.2) * lag($"temperature", 2)
Run Code Online (Sandbox Code Playgroud)
没有partitionBy
条款仍然可能,但效率极低.如果是这种情况,您将无法使用DataFrames
.相反,您可以使用sliding
RDD(例如,参见在Spark中操作RDD中的邻居元素).也有火花的时间序列包可能对你有用.
归档时间: |
|
查看次数: |
1176 次 |
最近记录: |