Akka流-通过分组关闭流而不丢失数据

sta*_*kov 2 scala akka-stream

我有一个对元素进行分组的源和一个发出批处理请求的接收器,我使用KillSwitch能够在任意时间点关闭图形。switch.shutdown()调用时源输出丢失的最新不完整批次记录的问题

val source = Source.tick(10.millis, 10.millis, "tick").grouped(500)

val (switch, _) = source.viaMat(KillSwitches.single)(Keep.right)
.toMat(sink)(Keep.both).run()

Thread.sleep(3000) // wait some arbitrary time

switch.shutdown()
Run Code Online (Sandbox Code Playgroud)

关机时有办法“清除”未完成的批次吗?

Ste*_*tti 5

根据其文档,kill开关关闭的行为是位置性的

调用[[UniqueKillSwitch#shutdown()]]后,具体化为[[UniqueKillSwitch]]的[[FlowShape]]的[[Graph]]的运行实例将完成其下游并取消其上游(除非完成或失败)在这种情况下,该命令将被忽略)。

参见更多文档在这里

现在,grouped舞台将仅在完成时发出部分填充的组,而在取消时不会发出。

这意味着下图( killswitch 之前分组)的行为将与您观察到的一样

  val switch = 
    Source.tick(10.millis, 175.millis, "tick")
          .grouped(10)
          .viaMat(KillSwitches.single)(Keep.right)
          .toMat(Sink.foreach(println))(Keep.left)
          .run()
Run Code Online (Sandbox Code Playgroud)

而下图( killswitch 之后分组)将在完成时向下游发射部分组

  val switch =
    Source.tick(10.millis, 175.millis, "tick")
          .viaMat(KillSwitches.single)(Keep.right)
          .grouped(10)
          .toMat(Sink.foreach(println))(Keep.left)
          .run()
Run Code Online (Sandbox Code Playgroud)