在追加模式下激活水印和窗口

dej*_*jan 7 apache-spark spark-structured-streaming

下面结构化的流媒体代码水印和窗口数据,24小时间隔,15分钟幻灯片.代码在附加模式下仅生成空的批处理0.在更新模式下,结果会正确显示.需要附加模式,因为S3接收器仅在附加模式下工作.

String windowDuration = "24 hours";
String slideDuration = "15 minutes";
Dataset<Row> sliding24h = rowData
        .withWatermark(eventTimeCol, slideDuration)
        .groupBy(functions.window(col(eventTimeCol), windowDuration, slideDuration),
                col(nameCol)).count();

sliding24h
        .writeStream()
        .format("console")
        .option("truncate", false)
        .option("numRows", 1000)
        .outputMode(OutputMode.Append())
        //.outputMode(OutputMode.Complete())
        .start()
        .awaitTermination();
Run Code Online (Sandbox Code Playgroud)

以下是完整的测试代码:

public static void main(String [] args) throws StreamingQueryException {
     SparkSession spark = SparkSession.builder().master("local[*]").getOrCreate();

     ArrayList<String> rl = new ArrayList<>();
     for (int i = 0; i < 200; ++i) {
         long t = 1512164314L + i * 5 * 60;
         rl.add(t + ",qwer");
     }

     String nameCol = "name";
     String eventTimeCol = "eventTime";
     String eventTimestampCol = "eventTimestamp";

     MemoryStream<String> input = new MemoryStream<>(42, spark.sqlContext(), Encoders.STRING());
     input.addData(JavaConversions.asScalaBuffer(rl).toSeq());
     Dataset<Row> stream = input.toDF().selectExpr(
             "cast(split(value,'[,]')[0] as long) as " + eventTimestampCol,
             "cast(split(value,'[,]')[1] as String) as " + nameCol);

     System.out.println("isStreaming: " +  stream.isStreaming());

     Column eventTime = functions.to_timestamp(col(eventTimestampCol));
     Dataset<Row> rowData = stream.withColumn(eventTimeCol, eventTime);

     String windowDuration = "24 hours";
     String slideDuration = "15 minutes";
     Dataset<Row> sliding24h = rowData
             .withWatermark(eventTimeCol, slideDuration)
             .groupBy(functions.window(col(eventTimeCol), windowDuration, slideDuration),
                     col(nameCol)).count();

     sliding24h
             .writeStream()
             .format("console")
             .option("truncate", false)
             .option("numRows", 1000)
             .outputMode(OutputMode.Append())
             //.outputMode(OutputMode.Complete())
             .start()
             .awaitTermination();
}
Run Code Online (Sandbox Code Playgroud)