使用带水印的附加输出模式时的结构化流异常

Ray*_*y J 8 java apache-spark spark-structured-streaming

尽管我正在使用withWatermark(),但是当我运行我的spark工作时,我收到以下错误消息:

线程"main"中的异常org.apache.spark.sql.AnalysisException:当没有水印的流式DataFrames/DataSets上有流式聚合时,不支持追加输出模式;;

从我在编程指南中看到的内容,这与预期用法(和示例代码)完全匹配.有谁知道什么可能是错的?

提前致谢!

相关代码(Java 8,Spark 2.2.0):

StructType logSchema = new StructType()
        .add("timestamp", TimestampType)
        .add("key", IntegerType)
        .add("val", IntegerType);

Dataset<Row> kafka = spark
        .readStream()
        .format("kafka")
        .option("kafka.bootstrap.servers", brokers)
        .option("subscribe", topics)
        .load();

Dataset<Row> parsed = kafka
        .select(from_json(col("value").cast("string"), logSchema).alias("parsed_value"))
        .select("parsed_value.*");

Dataset<Row> tenSecondCounts = parsed
        .withWatermark("timestamp", "10 minutes")
        .groupBy(
            parsed.col("key"),
            window(parsed.col("timestamp"), "1 day"))
        .count();

StreamingQuery query = tenSecondCounts
        .writeStream()
        .trigger(Trigger.ProcessingTime("10 seconds"))
        .outputMode("append")
        .format("console")
        .option("truncate", false)
        .start();
Run Code Online (Sandbox Code Playgroud)

zsx*_*ing 15

问题在于parsed.col.替换它将col解决问题.我建议总是使用col函数而不是Dataset.col.

Dataset.col返回resolved columncol返回unresolved column.

parsed.withWatermark("timestamp", "10 minutes")将使用具有相同名称的新列创建新的数据集.水印信息附加timestamp在新数据集的列中,而不是parsed.col("timestamp"),因此列中groupBy没有水印.

当您使用未解析的列时,Spark会为您找出正确的列.