适当的方式来阻止Akka Streams的条件

jar*_*daf 13 scala akka-stream

我已经成功使用FileIO来传输文件的内容,为每一行计算一些转换并聚合/减少结果.

现在我有一个非常具体的用例,我希望在达到条件时停止流,这样就不必读取整个文件,但过程会尽快完成.推荐的方法是什么?

Kon*_*ski 22

如果停止条件是"在流的外面"

KillSwitch你可以使用一个高级构建块来执行此操作:http://doc.akka.io/japi/akka/2.4.7/akka/stream/KillSwitches.html一旦杀死,流将被关闭切换通知.

它有abort(reason)/ shutdownetc之类的方法,请看这里的API:http://doc.akka.io/japi/akka/2.4.7/akka/stream/SharedKillSwitch.html

参考文档在这里:http://doc.akka.io/docs/akka/2.4.8/scala/stream/stream-dynamic.html#kill-switch-scala

示例用法是:

val countingSrc = Source(Stream.from(1)).delay(1.second,
    DelayOverflowStrategy.backpressure)
val lastSnk = Sink.last[Int]

val (killSwitch, last) = countingSrc
  .viaMat(KillSwitches.single)(Keep.right)
  .toMat(lastSnk)(Keep.both)
  .run()

doSomethingElse()

killSwitch.shutdown()

Await.result(last, 1.second) shouldBe 2
Run Code Online (Sandbox Code Playgroud)

如果停止条件在流内

你可以takeWhile用来表达任何条件,虽然有时take或者limit也可能足够"拿10个lnes".

如果您的逻辑非常先进,您可以构建一个特殊的阶段来处理使用的特殊逻辑statefulMapConcat,允许表达任何内容 - 因此您可以随时"从内部"完成流.

  • 如果需要为连续流打开和关闭,以便我的reduce函数调用将聚合的(分组的)数据发送到Sink,该怎么办?我可以使用takeWhile(p)和swithc p true和false吗? (2认同)