小编ato*_*tom的帖子

在spark结构化流中执行单独的流式查询

我正在尝试使用两个不同的窗口聚合流并将其打印到控制台中.但是,只打印第一个流式查询.在tenSecsQ不打印到控制台.

SparkSession spark = SparkSession
    .builder()
    .appName("JavaStructuredNetworkWordCountWindowed")
    .config("spark.master", "local[*]")
    .getOrCreate();

Dataset<Row> lines = spark
    .readStream()
    .format("socket")
    .option("host", host)
    .option("port", port)
    .option("includeTimestamp", true)
    .load();

Dataset<Row> words = lines
    .as(Encoders.tuple(Encoders.STRING(), Encoders.TIMESTAMP()))
    .toDF("word", "timestamp");

// 5 second window
Dataset<Row> fiveSecs = words
    .groupBy(
         functions.window(words.col("timestamp"), "5 seconds"),
         words.col("word")
    ).count().orderBy("window");

// 10 second window
Dataset<Row> tenSecs = words
    .groupBy(
          functions.window(words.col("timestamp"), "10 seconds"),
          words.col("word")
    ).count().orderBy("window");
Run Code Online (Sandbox Code Playgroud)

触发5s和10s聚合流的流式查询.不打印10s流的输出.只有5s打印到控制台

// Start writeStream() for 5s window
StreamingQuery fiveSecQ = fiveSecs.writeStream()
    .queryName("5_secs")
    .outputMode("complete")
    .format("console")
    .option("truncate", "false")
    .start();

// Start …
Run Code Online (Sandbox Code Playgroud)

apache-spark spark-structured-streaming

12
推荐指数
1
解决办法
3058
查看次数