Ero*_*osC 7 scala stream reactive-programming akka
我正在设计一个生成CSV测试数据的小工具.我想使用Akka Streams(1.0-RC4)来实现数据流.将有一个Source生成随机数,转换为CSV字符串,一些速率限制器和一个写入文件的接收器.
还应该有一个使用小型REST接口停止工具的干净方法.
这是我在努力的地方.流启动后(RunnableFlow.run())似乎无法阻止它.Source和Sink是无限的(至少在磁盘运行完全:) :)所以他们不会停止流.
向Source或Sink添加控制逻辑感觉不对.也使用ActorSystem.shutdown().什么是阻止流的好方法?
Ero*_*osC 10
好的,我找到了一个不错的解决方案.它已经坐在我的鼻子底下,我只是没有看到它.Source.lazyEmpty
具体化为一个承诺,当完成后将终止源和它背后的流.
剩下的问题是,如何将其包含在无限的随机数流中.我试过了Zip
.结果是没有随机数通过流,因为lazyEmpty
从不发出值(doh).我尝试了Merge
但是流程从未终止,因为Merge
一直持续到所有来源都已完成.
所以我写了自己的合并.它从输入端口中的一个转发所有的值和当终止任何源完成.
object StopperFlow {
private class StopperMergeShape[A](_init: Init[A] = Name("StopperFlow")) extends FanInShape[A](_init) {
val in = newInlet[A]("in")
val stopper = newInlet[Unit]("stopper")
override protected def construct(init: Init[A]): FanInShape[A] = new StopperMergeShape[A](init)
}
private class StopperMerge[In] extends FlexiMerge[In, StopperMergeShape[In]](
new StopperMergeShape(), Attributes.name("StopperMerge")) {
import FlexiMerge._
override def createMergeLogic(p: PortT) = new MergeLogic[In] {
override def initialState =
State[In](Read(p.in)) { (ctx, input, element) =>
ctx.emit(element)
SameState
}
override def initialCompletionHandling = eagerClose
}
}
def apply[In](): Flow[In, In, Promise[Unit]] = {
val stopperSource = Source.lazyEmpty[Unit]
Flow(stopperSource) { implicit builder =>
stopper =>
val stopperMerge = builder.add(new StopperMerge[In]())
stopper ~> stopperMerge.stopper
(stopperMerge.in, stopperMerge.out)
}
}
}
Run Code Online (Sandbox Code Playgroud)
流可以插入任何流.实现后,它将返回一个Promise
在完成时终止流.这是我对它的测试.
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
val startTime = System.currentTimeMillis()
def dumpToConsole(f: Float) = {
val timeSinceStart = System.currentTimeMillis() - startTime
System.out.println(s"[$timeSinceStart] - Random number: $f")
}
val randomSource = Source(() => Iterator.continually(Random.nextFloat()))
val consoleSink = Sink.foreach(dumpToConsole)
val flow = randomSource.viaMat(StopperFlow())(Keep.both).to(consoleSink)
val (_, promise) = flow.run()
Thread.sleep(1000)
val _ = promise.success(())
Thread.sleep(1000)
Run Code Online (Sandbox Code Playgroud)
我希望这对其他人也有用.仍然让我感到困惑的是,为什么没有内置的方法来终止来自流外部的流.