Spark结构化流多个WriteStream到同一个接收器

Shi*_*M V 4 scala apache-spark slick-3.0 spark-structured-streaming

WritestreamSpark Structured Streaming 2.2.1中没有按顺序发生两个到同一个数据库接收器.请建议如何按顺序执行它们.

val deleteSink = ds1.writestream
  .outputMode("update")
  .foreach(mydbsink)
  .start()

val UpsertSink = ds2.writestream
  .outputMode("update")
  .foreach(mydbsink)
  .start()

deleteSink.awaitTermination()
UpsertSink.awaitTermination()
Run Code Online (Sandbox Code Playgroud)

使用上面的代码,deleteSink之后执行UpsertSink.

Shi*_*kou 8

如果您想要并行运行两个流,则必须使用

sparkSession.streams.awaitAnyTermination()
Run Code Online (Sandbox Code Playgroud)

代替

deleteSink.awaitTermination()
UpsertSink.awaitTermination()
Run Code Online (Sandbox Code Playgroud)

在你的情况下,除非删除deleteSink或抛出异常,否则UpsertSink永远不会启动,正如scaladoc中所说的那样

等待由异常或异常终止this查询query.stop().如果查询以异常终止,则抛出异常.如果查询已终止,则对此方法的所有后续调用将立即返回(如果查询被终止stop()),或立即抛出异常(如果查询已终止,则为异常).