Spark:如何根据时差汇总/减少记录?

Shu*_*ail 5 dataframe apache-spark rdd apache-spark-sql pyspark

我有来自车辆的CSV时间序列数据,其中包含以下信息:

  • 旅行编号
  • 时间戳记
  • 速度

数据如下所示:

trip-id | timestamp  | speed

001     | 1538204192 | 44.55
001     | 1538204193 | 47.20 <-- start of brake
001     | 1538204194 | 42.14
001     | 1538204195 | 39.20
001     | 1538204196 | 35.30
001     | 1538204197 | 32.22 <-- end of brake
001     | 1538204198 | 34.80
001     | 1538204199 | 37.10
...
001     | 1538204221 | 55.30
001     | 1538204222 | 57.20 <-- start of brake
001     | 1538204223 | 54.60
001     | 1538204224 | 52.15
001     | 1538204225 | 49.27
001     | 1538204226 | 47.89 <-- end of brake
001     | 1538204227 | 50.57
001     | 1538204228 | 53.72
...
Run Code Online (Sandbox Code Playgroud)

speed基于的2个连续记录减少时,发生制动事件timestamp

我想从数据中提取的制动事件的事件而言start timestampend timestampstart speedend speed

+-------------+---------------+-------------+-----------+---------+
|      breakID|start timestamp|end timestamp|start speed|end speed|
+-------------+---------------+-------------+-----------+---------+
|0011538204193|     1538204193|   1538204196|       47.2|     35.3|
|0011538204222|     1538204222|   1538204225|       57.2|    49.27|
+-------------+---------------+-------------+-----------+---------+
Run Code Online (Sandbox Code Playgroud)

这是我的看法:

  1. 定义了一个窗口规范,其分区依据为trip-id,由排序timestamp
  2. 应用窗口lag在连续的行上移动并计算速度差。
  3. 过滤掉具有正速差的记录,因为我仅对制动事件感兴趣。
  4. 现在,我只有属于制动事件的记录,因此我希望将属于同一事件的记录分组。我想我可以根据时间戳差异来做到这一点。如果2条记录之间的差为1秒,则这2条记录属于同一制动事件。

我被困在这里,因为我没有key属于同一组,因此我可以应用基于密钥的聚合。

我的问题是:

  1. 如何根据时间戳的差异映射以添加key列?因此,如果2条记录相差1秒,则它们应具有一个公共密钥。这样,我可以根据新添加的键来减少组。

  2. 有没有更好,更优化的方法来实现这一目标?我的方法可能效率很低,因为它需要逐行比较。在属于特定事件(来自单次车辆行驶的数据)的数据流中,检测这些“子事件”(例如,制动事件)的其他可能方法还有哪些?

提前致谢!


附录:

jxc*_*jxc 5

对于 Pandas 用户,几乎有一种常见的编程模式使用shift()+cumsum()来设置组标签,以识别与某些特定模式/条件匹配的连续行。使用 pyspark,我们可以使用 Window 函数lag()+sum()来做同样的事情并找到这个组标签(d2在下面的代码中):

数据设置:

from pyspark.sql import functions as F, Window

>>> df.orderBy('timestamp').show()
+-------+----------+-----+
|trip-id| timestamp|speed|
+-------+----------+-----+
|    001|1538204192|44.55|
|    001|1538204193|47.20|
|    001|1538204194|42.14|
|    001|1538204195|39.20|
|    001|1538204196|35.30|
|    001|1538204197|32.22|
|    001|1538204198|34.80|
|    001|1538204199|37.10|
|    001|1538204221|55.30|
|    001|1538204222|57.20|
|    001|1538204223|54.60|
|    001|1538204224|52.15|
|    001|1538204225|49.27|
|    001|1538204226|47.89|
|    001|1538204227|50.57|
|    001|1538204228|53.72|
+-------+----------+-----+

>>> df.printSchema()
root
 |-- trip-id: string (nullable = true)
 |-- unix_timestamp: integer (nullable = true)
 |-- speed: double (nullable = true)
Run Code Online (Sandbox Code Playgroud)

设置两个 Window Spec (w1, w2):

# Window spec used to find previous speed F.lag('speed').over(w1) and also do the cumsum() to find flag `d2`
w1 = Window.partitionBy('trip-id').orderBy('timestamp')

# Window spec used to find the minimal value of flag `d1` over the partition(`trip-id`,`d2`)
w2 = Window.partitionBy('trip-id', 'd2').rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
Run Code Online (Sandbox Code Playgroud)

三个标志(d1、d2、d3):

  • d1 : 标识前一个速度是否大于当前速度的标志,如果为真 d1 = 0,否则 d1 = 1
  • d2 : 用相同的唯一编号标记连续行以进行速度下降的标志
  • d3 : 标识分区上 d1 的最小值的标志('trip-id', 'd2'),只有当d3 == 0该行属于具有速度下降的组时。这将用于过滤掉不相关的行

    df_1 = df.withColumn('d1', F.when(F.lag('speed').over(w1) > F.col('speed'), 0).otherwise(1))\
             .withColumn('d2', F.sum('d1').over(w1)) \
             .withColumn('d3', F.min('d1').over(w2))
    
    >>> df_1.orderBy('timestamp').show()
    +-------+----------+-----+---+---+---+
    |trip-id| timestamp|speed| d1| d2| d3|
    +-------+----------+-----+---+---+---+
    |    001|1538204192|44.55|  1|  1|  1|
    |    001|1538204193|47.20|  1|  2|  0|
    |    001|1538204194|42.14|  0|  2|  0|
    |    001|1538204195|39.20|  0|  2|  0|
    |    001|1538204196|35.30|  0|  2|  0|
    |    001|1538204197|32.22|  0|  2|  0|
    |    001|1538204198|34.80|  1|  3|  1|
    |    001|1538204199|37.10|  1|  4|  1|
    |    001|1538204221|55.30|  1|  5|  1|
    |    001|1538204222|57.20|  1|  6|  0|
    |    001|1538204223|54.60|  0|  6|  0|
    |    001|1538204224|52.15|  0|  6|  0|
    |    001|1538204225|49.27|  0|  6|  0|
    |    001|1538204226|47.89|  0|  6|  0|
    |    001|1538204227|50.57|  1|  7|  1|
    |    001|1538204228|53.72|  1|  8|  1|
    +-------+----------+-----+---+---+---+
    
    Run Code Online (Sandbox Code Playgroud)

删除不关心的行:

df_1 = df_1.where('d3 == 0')

>>> df_1.orderBy('timestamp').show()
+-------+----------+-----+---+---+---+
|trip-id| timestamp|speed| d1| d2| d3|
+-------+----------+-----+---+---+---+
|    001|1538204193|47.20|  1|  2|  0|
|    001|1538204194|42.14|  0|  2|  0|
|    001|1538204195|39.20|  0|  2|  0|
|    001|1538204196|35.30|  0|  2|  0|
|    001|1538204197|32.22|  0|  2|  0|
|    001|1538204222|57.20|  1|  6|  0|
|    001|1538204223|54.60|  0|  6|  0|
|    001|1538204224|52.15|  0|  6|  0|
|    001|1538204225|49.27|  0|  6|  0|
|    001|1538204226|47.89|  0|  6|  0|
+-------+----------+-----+---+---+---+
Run Code Online (Sandbox Code Playgroud)

最后一步:

现在对于 df_1, group bytrip-idd2,找到F.struct('timestamp', 'speed')将返回组中第一条和最后一条记录的最小值和最大值,从 中选择相应的字段struct以获得最终结果:

df_new = df_1.groupby('trip-id', 'd2').agg(
          F.min(F.struct('timestamp', 'speed')).alias('start')
        , F.max(F.struct('timestamp', 'speed')).alias('end')
).select(
      'trip-id'
    , F.col('start.timestamp').alias('start timestamp')
    , F.col('end.timestamp').alias('end timestamp')
    , F.col('start.speed').alias('start speed')
    , F.col('end.speed').alias('end speed')
)

>>> df_new.show()
+-------+---------------+-------------+-----------+---------+
|trip-id|start timestamp|end timestamp|start speed|end speed|
+-------+---------------+-------------+-----------+---------+
|    001|     1538204193|   1538204197|      47.20|    32.22|
|    001|     1538204222|   1538204226|      57.20|    47.89|
+-------+---------------+-------------+-----------+---------+
Run Code Online (Sandbox Code Playgroud)

注意:删除中间数据帧df_1,我们可以有以下内容:

df_new = df.withColumn('d1', F.when(F.lag('speed').over(w1) > F.col('speed'), 0).otherwise(1))\
           .withColumn('d2', F.sum('d1').over(w1)) \
           .withColumn('d3', F.min('d1').over(w2)) \
           .where('d3 == 0') \
           .groupby('trip-id', 'd2').agg(
                F.min(F.struct('timestamp', 'speed')).alias('start')
              , F.max(F.struct('timestamp', 'speed')).alias('end')
            )\
           .select(
                'trip-id'
              , F.col('start.timestamp').alias('start timestamp')
              , F.col('end.timestamp').alias('end timestamp')
              , F.col('start.speed').alias('start speed')
              , F.col('end.speed').alias('end speed')
            )
Run Code Online (Sandbox Code Playgroud)


C.S*_*lly 2

希望这可以帮助。斯卡拉代码。

输出

+-------------+---------------+-------------+-----------+---------+
|      breakID|start timestamp|end timestamp|start speed|end speed|
+-------------+---------------+-------------+-----------+---------+
|0011538204193|     1538204193|   1538204196|       47.2|     35.3|
|0011538204222|     1538204222|   1538204225|       57.2|    49.27|
+-------------+---------------+-------------+-----------+---------+
Run Code Online (Sandbox Code Playgroud)

代码

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.expressions.WindowSpec
import org.apache.spark.sql.functions._

scala> df.show
+-------+----------+-----+
|trip-id| timestamp|speed|
+-------+----------+-----+
|    001|1538204192|44.55|
|    001|1538204193| 47.2|
|    001|1538204194|42.14|
|    001|1538204195| 39.2|
|    001|1538204196| 35.3|
|    001|1538204197|32.22|
|    001|1538204198| 34.8|
|    001|1538204199| 37.1|
|    001|1538204221| 55.3|
|    001|1538204222| 57.2|
|    001|1538204223| 54.6|
|    001|1538204224|52.15|
|    001|1538204225|49.27|
|    001|1538204226|47.89|
|    001|1538204227|50.57|
|    001|1538204228|53.72|
+-------+----------+-----+

val overColumns = Window.partitionBy("trip-id").orderBy("timestamp")
val breaksDF = df
  .withColumn("speeddiff", lead("speed", 1).over(overColumns) - $"speed")
  .withColumn("breaking", when($"speeddiff" < 0, 1).otherwise(0))

scala> breaksDF.show
+-------+----------+-----+-------------------+--------+
|trip-id| timestamp|speed|          speeddiff|breaking|
+-------+----------+-----+-------------------+--------+
|    001|1538204192|44.55| 2.6500000000000057|       0|
|    001|1538204193| 47.2| -5.060000000000002|       1|
|    001|1538204194|42.14|-2.9399999999999977|       1|
|    001|1538204195| 39.2|-3.9000000000000057|       1|
|    001|1538204196| 35.3|-3.0799999999999983|       1|
|    001|1538204197|32.22| 2.5799999999999983|       0|
|    001|1538204198| 34.8| 2.3000000000000043|       0|
|    001|1538204199| 37.1| 18.199999999999996|       0|
|    001|1538204221| 55.3| 1.9000000000000057|       0|
|    001|1538204222| 57.2|-2.6000000000000014|       1|
|    001|1538204223| 54.6| -2.450000000000003|       1|
|    001|1538204224|52.15|-2.8799999999999955|       1|
|    001|1538204225|49.27|-1.3800000000000026|       1|
|    001|1538204226|47.89| 2.6799999999999997|       0|
|    001|1538204227|50.57| 3.1499999999999986|       0|
|    001|1538204228|53.72|               null|       0|
+-------+----------+-----+-------------------+--------+


val outputDF = breaksDF
  .withColumn("breakevent", 
    when(($"breaking" - lag($"breaking", 1).over(overColumns)) === 1, "start of break")
    .when(($"breaking" - lead($"breaking", 1).over(overColumns)) === 1, "end of break"))

scala> outputDF.show
+-------+----------+-----+-------------------+--------+--------------+
|trip-id| timestamp|speed|          speeddiff|breaking|    breakevent|
+-------+----------+-----+-------------------+--------+--------------+
|    001|1538204192|44.55| 2.6500000000000057|       0|          null|
|    001|1538204193| 47.2| -5.060000000000002|       1|start of break|
|    001|1538204194|42.14|-2.9399999999999977|       1|          null|
|    001|1538204195| 39.2|-3.9000000000000057|       1|          null|
|    001|1538204196| 35.3|-3.0799999999999983|       1|  end of break|
|    001|1538204197|32.22| 2.5799999999999983|       0|          null|
|    001|1538204198| 34.8| 2.3000000000000043|       0|          null|
|    001|1538204199| 37.1| 18.199999999999996|       0|          null|
|    001|1538204221| 55.3| 1.9000000000000057|       0|          null|
|    001|1538204222| 57.2|-2.6000000000000014|       1|start of break|
|    001|1538204223| 54.6| -2.450000000000003|       1|          null|
|    001|1538204224|52.15|-2.8799999999999955|       1|          null|
|    001|1538204225|49.27|-1.3800000000000026|       1|  end of break|
|    001|1538204226|47.89| 2.6799999999999997|       0|          null|
|    001|1538204227|50.57| 3.1499999999999986|       0|          null|
|    001|1538204228|53.72|               null|       0|          null|
+-------+----------+-----+-------------------+--------+--------------+


scala> outputDF.filter("breakevent is not null").select("trip-id", "timestamp", "speed", "breakevent").show
+-------+----------+-----+--------------+
|trip-id| timestamp|speed|    breakevent|
+-------+----------+-----+--------------+
|    001|1538204193| 47.2|start of break|
|    001|1538204196| 35.3|  end of break|
|    001|1538204222| 57.2|start of break|
|    001|1538204225|49.27|  end of break|
+-------+----------+-----+--------------+

outputDF.filter("breakevent is not null").withColumn("breakID", 
  when($"breakevent" === "start of break", concat($"trip-id",$"timestamp"))
  .when($"breakevent" === "end of break", concat($"trip-id", lag($"timestamp", 1).over(overColumns))))
  .groupBy("breakID").agg(first($"timestamp") as "start timestamp", last($"timestamp") as "end timestamp", first($"speed") as "start speed", last($"speed") as "end speed").show


+-------------+---------------+-------------+-----------+---------+
|      breakID|start timestamp|end timestamp|start speed|end speed|
+-------------+---------------+-------------+-----------+---------+
|0011538204193|     1538204193|   1538204196|       47.2|     35.3|
|0011538204222|     1538204222|   1538204225|       57.2|    49.27|
+-------------+---------------+-------------+-----------+---------+
Run Code Online (Sandbox Code Playgroud)