Shu*_*ail 5 dataframe apache-spark rdd apache-spark-sql pyspark
我有来自车辆的CSV时间序列数据,其中包含以下信息:
数据如下所示:
trip-id | timestamp | speed
001 | 1538204192 | 44.55
001 | 1538204193 | 47.20 <-- start of brake
001 | 1538204194 | 42.14
001 | 1538204195 | 39.20
001 | 1538204196 | 35.30
001 | 1538204197 | 32.22 <-- end of brake
001 | 1538204198 | 34.80
001 | 1538204199 | 37.10
...
001 | 1538204221 | 55.30
001 | 1538204222 | 57.20 <-- start of brake
001 | 1538204223 | 54.60
001 | 1538204224 | 52.15
001 | 1538204225 | 49.27
001 | 1538204226 | 47.89 <-- end of brake
001 | 1538204227 | 50.57
001 | 1538204228 | 53.72
...
Run Code Online (Sandbox Code Playgroud)
当speed基于的2个连续记录减少时,发生制动事件timestamp。
我想从数据中提取的制动事件的事件而言start timestamp,end timestamp,start speed和end speed。
+-------------+---------------+-------------+-----------+---------+
| breakID|start timestamp|end timestamp|start speed|end speed|
+-------------+---------------+-------------+-----------+---------+
|0011538204193| 1538204193| 1538204196| 47.2| 35.3|
|0011538204222| 1538204222| 1538204225| 57.2| 49.27|
+-------------+---------------+-------------+-----------+---------+
Run Code Online (Sandbox Code Playgroud)
这是我的看法:
trip-id,由排序timestamp。lag在连续的行上移动并计算速度差。我被困在这里,因为我没有key属于同一组,因此我可以应用基于密钥的聚合。
我的问题是:
如何根据时间戳的差异映射以添加key列?因此,如果2条记录相差1秒,则它们应具有一个公共密钥。这样,我可以根据新添加的键来减少组。
有没有更好,更优化的方法来实现这一目标?我的方法可能效率很低,因为它需要逐行比较。在属于特定事件(来自单次车辆行驶的数据)的数据流中,检测这些“子事件”(例如,制动事件)的其他可能方法还有哪些?
提前致谢!
附录:
对于 Pandas 用户,几乎有一种常见的编程模式使用shift()+cumsum()来设置组标签,以识别与某些特定模式/条件匹配的连续行。使用 pyspark,我们可以使用 Window 函数lag()+sum()来做同样的事情并找到这个组标签(d2在下面的代码中):
from pyspark.sql import functions as F, Window
>>> df.orderBy('timestamp').show()
+-------+----------+-----+
|trip-id| timestamp|speed|
+-------+----------+-----+
| 001|1538204192|44.55|
| 001|1538204193|47.20|
| 001|1538204194|42.14|
| 001|1538204195|39.20|
| 001|1538204196|35.30|
| 001|1538204197|32.22|
| 001|1538204198|34.80|
| 001|1538204199|37.10|
| 001|1538204221|55.30|
| 001|1538204222|57.20|
| 001|1538204223|54.60|
| 001|1538204224|52.15|
| 001|1538204225|49.27|
| 001|1538204226|47.89|
| 001|1538204227|50.57|
| 001|1538204228|53.72|
+-------+----------+-----+
>>> df.printSchema()
root
|-- trip-id: string (nullable = true)
|-- unix_timestamp: integer (nullable = true)
|-- speed: double (nullable = true)
Run Code Online (Sandbox Code Playgroud)
# Window spec used to find previous speed F.lag('speed').over(w1) and also do the cumsum() to find flag `d2`
w1 = Window.partitionBy('trip-id').orderBy('timestamp')
# Window spec used to find the minimal value of flag `d1` over the partition(`trip-id`,`d2`)
w2 = Window.partitionBy('trip-id', 'd2').rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
Run Code Online (Sandbox Code Playgroud)
d3 : 标识分区上 d1 的最小值的标志('trip-id', 'd2'),只有当d3 == 0该行属于具有速度下降的组时。这将用于过滤掉不相关的行
df_1 = df.withColumn('d1', F.when(F.lag('speed').over(w1) > F.col('speed'), 0).otherwise(1))\
.withColumn('d2', F.sum('d1').over(w1)) \
.withColumn('d3', F.min('d1').over(w2))
>>> df_1.orderBy('timestamp').show()
+-------+----------+-----+---+---+---+
|trip-id| timestamp|speed| d1| d2| d3|
+-------+----------+-----+---+---+---+
| 001|1538204192|44.55| 1| 1| 1|
| 001|1538204193|47.20| 1| 2| 0|
| 001|1538204194|42.14| 0| 2| 0|
| 001|1538204195|39.20| 0| 2| 0|
| 001|1538204196|35.30| 0| 2| 0|
| 001|1538204197|32.22| 0| 2| 0|
| 001|1538204198|34.80| 1| 3| 1|
| 001|1538204199|37.10| 1| 4| 1|
| 001|1538204221|55.30| 1| 5| 1|
| 001|1538204222|57.20| 1| 6| 0|
| 001|1538204223|54.60| 0| 6| 0|
| 001|1538204224|52.15| 0| 6| 0|
| 001|1538204225|49.27| 0| 6| 0|
| 001|1538204226|47.89| 0| 6| 0|
| 001|1538204227|50.57| 1| 7| 1|
| 001|1538204228|53.72| 1| 8| 1|
+-------+----------+-----+---+---+---+
Run Code Online (Sandbox Code Playgroud)df_1 = df_1.where('d3 == 0')
>>> df_1.orderBy('timestamp').show()
+-------+----------+-----+---+---+---+
|trip-id| timestamp|speed| d1| d2| d3|
+-------+----------+-----+---+---+---+
| 001|1538204193|47.20| 1| 2| 0|
| 001|1538204194|42.14| 0| 2| 0|
| 001|1538204195|39.20| 0| 2| 0|
| 001|1538204196|35.30| 0| 2| 0|
| 001|1538204197|32.22| 0| 2| 0|
| 001|1538204222|57.20| 1| 6| 0|
| 001|1538204223|54.60| 0| 6| 0|
| 001|1538204224|52.15| 0| 6| 0|
| 001|1538204225|49.27| 0| 6| 0|
| 001|1538204226|47.89| 0| 6| 0|
+-------+----------+-----+---+---+---+
Run Code Online (Sandbox Code Playgroud)
现在对于 df_1, group bytrip-id和d2,找到F.struct('timestamp', 'speed')将返回组中第一条和最后一条记录的最小值和最大值,从 中选择相应的字段struct以获得最终结果:
df_new = df_1.groupby('trip-id', 'd2').agg(
F.min(F.struct('timestamp', 'speed')).alias('start')
, F.max(F.struct('timestamp', 'speed')).alias('end')
).select(
'trip-id'
, F.col('start.timestamp').alias('start timestamp')
, F.col('end.timestamp').alias('end timestamp')
, F.col('start.speed').alias('start speed')
, F.col('end.speed').alias('end speed')
)
>>> df_new.show()
+-------+---------------+-------------+-----------+---------+
|trip-id|start timestamp|end timestamp|start speed|end speed|
+-------+---------------+-------------+-----------+---------+
| 001| 1538204193| 1538204197| 47.20| 32.22|
| 001| 1538204222| 1538204226| 57.20| 47.89|
+-------+---------------+-------------+-----------+---------+
Run Code Online (Sandbox Code Playgroud)
注意:删除中间数据帧df_1,我们可以有以下内容:
df_new = df.withColumn('d1', F.when(F.lag('speed').over(w1) > F.col('speed'), 0).otherwise(1))\
.withColumn('d2', F.sum('d1').over(w1)) \
.withColumn('d3', F.min('d1').over(w2)) \
.where('d3 == 0') \
.groupby('trip-id', 'd2').agg(
F.min(F.struct('timestamp', 'speed')).alias('start')
, F.max(F.struct('timestamp', 'speed')).alias('end')
)\
.select(
'trip-id'
, F.col('start.timestamp').alias('start timestamp')
, F.col('end.timestamp').alias('end timestamp')
, F.col('start.speed').alias('start speed')
, F.col('end.speed').alias('end speed')
)
Run Code Online (Sandbox Code Playgroud)
希望这可以帮助。斯卡拉代码。
输出
+-------------+---------------+-------------+-----------+---------+
| breakID|start timestamp|end timestamp|start speed|end speed|
+-------------+---------------+-------------+-----------+---------+
|0011538204193| 1538204193| 1538204196| 47.2| 35.3|
|0011538204222| 1538204222| 1538204225| 57.2| 49.27|
+-------------+---------------+-------------+-----------+---------+
Run Code Online (Sandbox Code Playgroud)
代码
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.expressions.WindowSpec
import org.apache.spark.sql.functions._
scala> df.show
+-------+----------+-----+
|trip-id| timestamp|speed|
+-------+----------+-----+
| 001|1538204192|44.55|
| 001|1538204193| 47.2|
| 001|1538204194|42.14|
| 001|1538204195| 39.2|
| 001|1538204196| 35.3|
| 001|1538204197|32.22|
| 001|1538204198| 34.8|
| 001|1538204199| 37.1|
| 001|1538204221| 55.3|
| 001|1538204222| 57.2|
| 001|1538204223| 54.6|
| 001|1538204224|52.15|
| 001|1538204225|49.27|
| 001|1538204226|47.89|
| 001|1538204227|50.57|
| 001|1538204228|53.72|
+-------+----------+-----+
val overColumns = Window.partitionBy("trip-id").orderBy("timestamp")
val breaksDF = df
.withColumn("speeddiff", lead("speed", 1).over(overColumns) - $"speed")
.withColumn("breaking", when($"speeddiff" < 0, 1).otherwise(0))
scala> breaksDF.show
+-------+----------+-----+-------------------+--------+
|trip-id| timestamp|speed| speeddiff|breaking|
+-------+----------+-----+-------------------+--------+
| 001|1538204192|44.55| 2.6500000000000057| 0|
| 001|1538204193| 47.2| -5.060000000000002| 1|
| 001|1538204194|42.14|-2.9399999999999977| 1|
| 001|1538204195| 39.2|-3.9000000000000057| 1|
| 001|1538204196| 35.3|-3.0799999999999983| 1|
| 001|1538204197|32.22| 2.5799999999999983| 0|
| 001|1538204198| 34.8| 2.3000000000000043| 0|
| 001|1538204199| 37.1| 18.199999999999996| 0|
| 001|1538204221| 55.3| 1.9000000000000057| 0|
| 001|1538204222| 57.2|-2.6000000000000014| 1|
| 001|1538204223| 54.6| -2.450000000000003| 1|
| 001|1538204224|52.15|-2.8799999999999955| 1|
| 001|1538204225|49.27|-1.3800000000000026| 1|
| 001|1538204226|47.89| 2.6799999999999997| 0|
| 001|1538204227|50.57| 3.1499999999999986| 0|
| 001|1538204228|53.72| null| 0|
+-------+----------+-----+-------------------+--------+
val outputDF = breaksDF
.withColumn("breakevent",
when(($"breaking" - lag($"breaking", 1).over(overColumns)) === 1, "start of break")
.when(($"breaking" - lead($"breaking", 1).over(overColumns)) === 1, "end of break"))
scala> outputDF.show
+-------+----------+-----+-------------------+--------+--------------+
|trip-id| timestamp|speed| speeddiff|breaking| breakevent|
+-------+----------+-----+-------------------+--------+--------------+
| 001|1538204192|44.55| 2.6500000000000057| 0| null|
| 001|1538204193| 47.2| -5.060000000000002| 1|start of break|
| 001|1538204194|42.14|-2.9399999999999977| 1| null|
| 001|1538204195| 39.2|-3.9000000000000057| 1| null|
| 001|1538204196| 35.3|-3.0799999999999983| 1| end of break|
| 001|1538204197|32.22| 2.5799999999999983| 0| null|
| 001|1538204198| 34.8| 2.3000000000000043| 0| null|
| 001|1538204199| 37.1| 18.199999999999996| 0| null|
| 001|1538204221| 55.3| 1.9000000000000057| 0| null|
| 001|1538204222| 57.2|-2.6000000000000014| 1|start of break|
| 001|1538204223| 54.6| -2.450000000000003| 1| null|
| 001|1538204224|52.15|-2.8799999999999955| 1| null|
| 001|1538204225|49.27|-1.3800000000000026| 1| end of break|
| 001|1538204226|47.89| 2.6799999999999997| 0| null|
| 001|1538204227|50.57| 3.1499999999999986| 0| null|
| 001|1538204228|53.72| null| 0| null|
+-------+----------+-----+-------------------+--------+--------------+
scala> outputDF.filter("breakevent is not null").select("trip-id", "timestamp", "speed", "breakevent").show
+-------+----------+-----+--------------+
|trip-id| timestamp|speed| breakevent|
+-------+----------+-----+--------------+
| 001|1538204193| 47.2|start of break|
| 001|1538204196| 35.3| end of break|
| 001|1538204222| 57.2|start of break|
| 001|1538204225|49.27| end of break|
+-------+----------+-----+--------------+
outputDF.filter("breakevent is not null").withColumn("breakID",
when($"breakevent" === "start of break", concat($"trip-id",$"timestamp"))
.when($"breakevent" === "end of break", concat($"trip-id", lag($"timestamp", 1).over(overColumns))))
.groupBy("breakID").agg(first($"timestamp") as "start timestamp", last($"timestamp") as "end timestamp", first($"speed") as "start speed", last($"speed") as "end speed").show
+-------------+---------------+-------------+-----------+---------+
| breakID|start timestamp|end timestamp|start speed|end speed|
+-------------+---------------+-------------+-----------+---------+
|0011538204193| 1538204193| 1538204196| 47.2| 35.3|
|0011538204222| 1538204222| 1538204225| 57.2| 49.27|
+-------------+---------------+-------------+-----------+---------+
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
328 次 |
| 最近记录: |