Moz*_*aki 6 apache-spark apache-spark-sql pyspark spark-structured-streaming
我正在编写一个Spark结构化流程序。我需要创建一个带有滞后差的附加列。
为了重现我的问题,我提供了代码片段。此代码使用data.json存储在data文件夹中的文件:
[
{"id": 77,"type": "person","timestamp": 1532609003},
{"id": 77,"type": "person","timestamp": 1532609005},
{"id": 78,"type": "crane","timestamp": 1532609005}
]
Run Code Online (Sandbox Code Playgroud)
码:
[
{"id": 77,"type": "person","timestamp": 1532609003},
{"id": 77,"type": "person","timestamp": 1532609005},
{"id": 78,"type": "crane","timestamp": 1532609005}
]
Run Code Online (Sandbox Code Playgroud)
我收到此错误:
pyspark.sql.utils.AnalysisException:流数据帧/数据集不支持基于非时间的窗口;; \ nWindow [lag(timestamp#71L,1,null)windowspecdefinition(host_id#68,timestamp#71L ASC NULLS首先,第1行和第1行之间的行)为prev_timestamp#129L]
pyspark.sql.utils.AnalysisException:u'流数据帧/数据集不支持基于时间的窗口
这意味着您的窗口应该基于一timestamp列。因此,如果您每秒都有一个数据点,并且您创建一个30s带有strideof 的窗口10s,则生成的窗口将创建一个新window列,其中start和end列将包含差异为 的时间戳30s。
您应该以这种方式使用该窗口:
words = words.withColumn('date_time', F.col('date_time').cast('timestamp'))
w = F.window('date_time', '30 seconds', '10 seconds')
words = words \
.withWatermark('date_format', '1 minutes') \
.groupBy(w).agg(F.mean('value'))
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
223 次 |
| 最近记录: |