当无水印的流式 DataFrame/DataSet 上存在流式聚合时,不支持追加输出模式

Ran*_*Guy 1 java apache-spark spark-streaming spark-structured-streaming

我有一个 kafka 流,正在加载到 Spark。来自 Kafka 主题的消息具有以下属性:bl_ibanblacklistedtimestamp。因此,有 IBANS、关于该 IBAN 是否被列入黑名单 (Y/N) 的标志,并且还有该记录的时间戳。问题是一个 IBAN 可以有多个记录,因为超时的 IBAN 可能会被列入黑名单或“删除”。我想要实现的目标是了解每个 IBANS 的当前状态。然而,我从更简单的目标开始,那就是列出每个最新的 IBAN timestamp(之后我也想添加blacklisted状态),所以我生成了以下代码(其中黑名单代表我从 Kafka 加载的数据集):

blackList = blackList.groupBy("bl_iban")
                .agg(col("bl_iban"), max("timestamp"));
Run Code Online (Sandbox Code Playgroud)

之后我尝试使用以下代码将其打印到控制台:

StreamingQuery query = blackList.writeStream()
    .format("console")
    .outputMode(OutputMode.Append())
    .start();
Run Code Online (Sandbox Code Playgroud)

我已经运行我的代码并收到以下错误: Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark

所以我将水印添加到我的数据集中,如下所示:

blackList = blackList.withWatermark("timestamp", "2 seconds")
                .groupBy("bl_iban")
                .agg(col("bl_iban"), max("timestamp"));
Run Code Online (Sandbox Code Playgroud)

之后又出现同样的错误。我有什么想法可以解决这个问题吗?


更新:在迈克的帮助下,我成功地摆脱了这个错误。但问题是我仍然无法让我的黑名单发挥作用。我可以看到数据是如何从 Kafka 加载的,但之后从我的组操作中我得到了两个空批次,仅此而已。从Kafka打印的数据:

+-----------------------+-----------+-----------------------+
|bl_iban                |blacklisted|timestamp              |
+-----------------------+-----------+-----------------------+
|SK047047595122709025789|N          |2020-04-10 17:26:58.208|
|SK341492788657560898224|N          |2020-04-10 17:26:58.214|
|SK118866580129485701645|N          |2020-04-10 17:26:58.215|
+-----------------------+-----------+-----------------------+
Run Code Online (Sandbox Code Playgroud)

这就是我获取输出黑名单的方法:

blackList = blackList.selectExpr("split(cast(value as string),',') as value", "cast(timestamp as timestamp) timestamp")
                .selectExpr("value[0] as bl_iban", "value[1] as blacklisted", "timestamp");
Run Code Online (Sandbox Code Playgroud)

这是我的团体操作:

Dataset<Row> blackListCurrent = blackList.withWatermark("timestamp", "20 minutes")
                .groupBy(window(col("timestamp"), "10 minutes", "5 minutes"), col("bl_iban"))
                .agg(col("bl_iban"), max("timestamp"));
Run Code Online (Sandbox Code Playgroud)

源文件链接:Spark Blacklist

mik*_*ike 5

当您在 Spark 中使用水印时,您需要确保您的聚合了解窗口。Spark文档提供了更多背景知识。

在你的情况下,代码应该看起来像这样

blackList = blackList.withWatermark("timestamp", "2 seconds")
  .groupBy(window(col("timestamp"), "10 minutes", "5 minutes"), col("bl_iban"))
  .agg(col("bl_iban"), max("timestamp"));
Run Code Online (Sandbox Code Playgroud)

重要的是,该属性timestamp具有时间戳数据类型!