如何计算Spark结构化流中的滞后差异?

Moz*_*aki 6 apache-spark apache-spark-sql pyspark spark-structured-streaming

我正在编写一个Spark结构化流程序。我需要创建一个带有滞后差的附加列。

为了重现我的问题,我提供了代码片段。此代码使用data.json存储在data文件夹中的文件:

[
  {"id": 77,"type": "person","timestamp": 1532609003},
  {"id": 77,"type": "person","timestamp": 1532609005},
  {"id": 78,"type": "crane","timestamp": 1532609005}
]
Run Code Online (Sandbox Code Playgroud)

码:

[
  {"id": 77,"type": "person","timestamp": 1532609003},
  {"id": 77,"type": "person","timestamp": 1532609005},
  {"id": 78,"type": "crane","timestamp": 1532609005}
]
Run Code Online (Sandbox Code Playgroud)

我收到此错误:

pyspark.sql.utils.AnalysisException:流数据帧/数据集不支持基于非时间的窗口;; \ nWindow [lag(timestamp#71L,1,null)windowspecdefinition(host_id#68,timestamp#71L ASC NULLS首先,第1行和第1行之间的行)为prev_timestamp#129L]

pis*_*all 1

pyspark.sql.utils.AnalysisException:u'流数据帧/数据集不支持基于时间的窗口

这意味着您的窗口应该基于一timestamp列。因此,如果您每秒都有一个数据点,并且您创建一个30s带有strideof 的窗口10s,则生成的窗口将创建一个新window列,其中startend列将包含差异为 的时间戳30s

您应该以这种方式使用该窗口:

words = words.withColumn('date_time', F.col('date_time').cast('timestamp'))

w = F.window('date_time', '30 seconds', '10 seconds')
words = words \
   .withWatermark('date_format', '1 minutes') \
   .groupBy(w).agg(F.mean('value'))
Run Code Online (Sandbox Code Playgroud)