Edu*_*rdo 7 scala akka akka-stream
我的程序中有几个Flow,我想并行处理.一切都完成后,我想触发一些行动.
执行此操作的一种方法是在每次完成后向Actor发送消息,并且当Actor验证所有流已准备好时,它可以触发该操作.
我想知道是否有一些我可能会忽略的akka-streams Scala DSL中的任何东西会使它更简单.
编辑:将流转换为未来是行不通的,因为正如文档所提到的,Future在流中发生的第一个事件之后立即完成.运行以下代码:
implicit val system = ActorSystem("Sys")
val fm = FlowMaterializer(MaterializerSettings())
def main(args: Array[String]): Unit = {
val fut = Flow(1 second, {() => println("tick")}).toFuture(fm)
fut.onComplete{ _ =>
println("future completed")
}
}
Run Code Online (Sandbox Code Playgroud)
打印"tick",然后是"future completed",然后是"tick"的无限序列.
正如评论中所提到的,我相信@Eduardo是正确的将转换Flow为a Future.考虑这个例子:
implicit val system = ActorSystem("Sys")
import system.dispatcher
val text1 =
"""hello1world
foobar""".stripMargin
val text2 =
"""this1is
a1test""".stripMargin
def flowFut(text:String) = Flow(text.split("\\s").toVector)
.map(_.toUpperCase())
.map(_.replace("1", ""))
.toFuture(FlowMaterializer(MaterializerSettings()))
val fut1 = flowFut(text1)
val fut2 = flowFut(text2)
val fut3 = for{
f1 <- fut1
f2 <- fut2
} yield {
s"$f1, $f2"
}
fut3 foreach {println(_)}
Run Code Online (Sandbox Code Playgroud)
在这里,我在每组文本行上运行两个单独的变换,转换为upper并从任何文本中删除#1.然后,我将结果强制Flow为一个,Future这样我就可以将结果组合成一个新的Future,然后打印出来.
| 归档时间: |
|
| 查看次数: |
4001 次 |
| 最近记录: |