oth*_*r15 4 apache-spark-sql pyspark pyspark-sql sklearn-pandas
我正在为Pyspark中的分类器准备输入数据.我一直在SparkSQL中使用聚合函数来提取平均值和方差等功能.这些按活动,名称和窗口分组.通过将unix时间戳除以10000来分解为10秒的时间窗来计算窗口.
sample = sqlContext.sql("SELECT activity, name, window, avg(acc_x) as avgX , variance(acc_x) as varX FROM data GROUP BY activity,name,window ORDER BY activity,name,window")
Run Code Online (Sandbox Code Playgroud)
结果看起来像
Activity Name Window AvgX VarX
Walk accelerometer 95875 2.0 1.0
Run Code Online (Sandbox Code Playgroud)
我现在要做的是计算X中每个点的平均斜率.
为此,我需要时间戳,窗口和X.我已经在Python中实现了逻辑,使用数组,这就是它的样子 - 计算每个点之间的斜率,然后获得平均斜率.理想情况下,我想在UDAF中这样做,Pyspark尚不支持.(看起来像这样,如果下面的函数被称为斜率.那么在sql中你可以做到slope(timestamp, X) as avgSlopeX
编辑 - 更改输入,使其更清晰. 所以,我正在做的就是计算每个点之间的斜率,然后返回该窗口中斜率的平均值.所以,当我得到每个窗口的平均值和方差时,我也希望得到平均斜率.
#sample input
timestamp = [1464703425544,1464703426534,1464703427551,1464703428587,1464703429512,1464703430493,1464703431505,1464703432543,1464703433513,1464703434529]
values = [1021.31,1021.26,1021.19,1021.19,1021.1,1021.1,1021.1, 1021.05,1021.02]
i = 0;
slope = 0.0;
totalSlope = 0.0;
while (i < len(timestamp) - 1):
y2 = values[i+1];
y1 = values[i];
x2 = timestamp[i + 1];
x1 = timestamp[i];
slope = ((y2-y1)/(x2-x1));
totalSlope = totalSlope + slope;
i=i+1
avgSlope = (totalSlope/len(x_values))
Run Code Online (Sandbox Code Playgroud)
我该如何实现呢?我应该尝试转换为pandas数据帧然后转换为numpy数组吗?如果是这样,我怎样才能确保数据仍然正确映射,同时记住sql查询中的GROUP BY活动,名称窗口.
一般来说,这不是UDAF的工作,因为UDAF没有提供任何定义订单的方法.看起来你真正需要的是窗口函数和标准聚合的某种组合.
from pyspark.sql.functions import col, lag, avg
from pyspark.sql.window import Window
df = ...
## DataFrame[activity: string, name: string, window: bigint,
## timestamp: bigint, value: float]
group = ["activity", "name", "window"]
w = (Window()
.partitionBy(*group)
.orderBy("timestamp"))
v_diff = col("value") - lag("value", 1).over(w)
t_diff = col("timestamp") - lag("timestamp", 1).over(w)
slope = v_diff / t_diff
df.withColumn("slope", slope).groupBy(*group).agg(avg(col("slope")))
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
2196 次 |
| 最近记录: |