我想创建一个图表,在下沉之前循环n次.我刚刚创建了这个满足我要求的样本,但是在下沉之后并没有结束,我真的不明白为什么.有人可以开导我吗?
谢谢.
import akka.actor.ActorSystem
import akka.stream.scaladsl._
import akka.stream.{ActorMaterializer, UniformFanOutShape}
import scala.concurrent.Future
object test {
def main(args: Array[String]) {
val ignore: Sink[Any, Future[Unit]] = Sink.ignore
val closed: RunnableGraph[Future[Unit]] = FlowGraph.closed(ignore) { implicit b =>
sink => {
import FlowGraph.Implicits._
val fileSource = Source.single((0, Array[String]()))
val merge = b.add(MergePreferred[(Int, Array[String])](1).named("merge"))
val afterMerge = Flow[(Int, Array[String])].map {
e =>
println("after merge")
e
}
val broadcastArray: UniformFanOutShape[(Int, Array[String]), (Int, Array[String])] = b.add(Broadcast[(Int, Array[String])](2).named("broadcastArray"))
val toRetry = Flow[(Int, Array[String])].filter {
case (r, s) => { …
Run Code Online (Sandbox Code Playgroud) 我已经使用 monix 和 akka-streams 对 List[ClassA] 到 List[ClassB] 的映射进行了基准测试,但我不明白为什么它这么慢。
我尝试了不同的映射方式,这是 JMH 的结果:
[info] Benchmark Mode Cnt Score Error Units
[info] MappingBenchmark.akkaLoadBalanceMap ss 20 742,626 â–’ 4,853 ms/op
[info] MappingBenchmark.akkaMapAsyncFold ss 20 480,460 â–’ 8,493 ms/op
[info] MappingBenchmark.akkaMapAsyncFoldAsync ss 20 331,398 â–’ 10,490 ms/op
[info] MappingBenchmark.akkaMapFold ss 20 713,500 â–’ 7,394 ms/op
[info] MappingBenchmark.akkaMapFoldAsync ss 20 313,275 â–’ 8,716 ms/op
[info] MappingBenchmark.map ss 20 0,567 â–’ 0,175 ms/op
[info] MappingBenchmark.monixBatchedObservables ss 20 259,736 â–’ 5,939 ms/op
[info] MappingBenchmark.monixMapAsyncFoldLeft ss 20 456,310 …
Run Code Online (Sandbox Code Playgroud)