2015-10-30更新
基于Roland Kuhn Awnser:
Akka Streams在Actors之间使用异步消息传递来实现流处理阶段.在异步边界上传递数据有一个开销,你在这里看到:你的计算似乎只需要大约160ns(来自单线程测量),而流式解决方案每个元素需要大约1μs,这是由消息传递决定的.
另一个误解是说"流"意味着并行性:在你的代码中,所有计算都在一个Actor(映射阶段)中顺序运行,因此对原始单线程解决方案没有任何好处.
为了从Akka Streams提供的并行性中受益,您需要具有多个处理阶段,每个阶段都执行任务
每个元素1μs,另见文档.
我做了一些改变.我的代码现在看起来像:
object MultiThread {
implicit val actorSystem = ActorSystem("Sys")
implicit val materializer = ActorMaterializer()
var counter = 0
var oldProgess = 0
//RunnableFlow: in -> flow -> sink
val in = Source(() => Iterator.continually((1254785478l, "name", 48, 23.09f)))
val flow = Flow[(Long, String, Int, Float)].map(p => SharedFunctions.transform2(SharedFunctions.transform(p)))
val tupleToEvent = Flow[(Long, String, Int, Float)].map(SharedFunctions.transform)
val eventToFactorial = Flow[Event].map(SharedFunctions.transform2)
val eventChef: Flow[(Long, String, Int, Float), Int, Unit] = Flow() { implicit builder …Run Code Online (Sandbox Code Playgroud) 我正在尝试使用*subFlows连接流.因此,我从广播的出口建立了一个来源.但它抛出了一个UnsupportedOperationException: cannot replace the shape of the EmptyModule.我试图谷歌这个例外,但我找不到类似的东西.
在这里我的代码
val aggFlow = Flow.fromGraph(GraphDSL.create() { implicit builder =>
val broadcast = builder.add(Broadcast[MonitoringMetricEvent](2))
val bc = builder.add(Broadcast[Long](1))
val zip = builder.add(ZipWith[StreamMeasurement, Long, (StreamMeasurement, Long)]((value, ewma) => (value, ewma)))
val merge = builder.add(Merge[Seq[StreamMeasurement]](1))
broadcast.out(1) ~> identityFlow ~> maxFlow ~> bc
val source = Source.fromGraph(GraphDSL.create() { implicit bl =>
SourceShape(bc.out(0))
})
broadcast.out(0) ~> identityFlow ~> topicFlow.groupBy(MAX_SUB_STREAMS, _._1)
.map(_._2)
.zip[Long](source)
.takeWhile(deciderFunction)
.map(_._1)
.fold[Seq[StreamMeasurement]](Seq.empty[StreamMeasurement])((seq, sm) => seq:+sm)
.mergeSubstreams ~> merge
FlowShape(broadcast.in, merge.out)
})
Run Code Online (Sandbox Code Playgroud)
在这里得到的例外情况: …
我是斯卡拉和阿卡的全新人物.我有一个简单的RunnableFlow:
Source -> Flow (do some transformation) -> Sink.runForeach
Run Code Online (Sandbox Code Playgroud)
现在我想要这样的东西:
Source -> Flow1 (do some transformation) -> Flow2 (do some transformation) -> Sink.runForeach
Run Code Online (Sandbox Code Playgroud)
但Flow2应该等到Flow1中的100个元素可用,然后将这100个元素转换为新元素(需要Flow1中的所有100个元素)并将此新元素提供给Sink.
我做了一些研究并找到了明确的用户定义缓冲区,但我不明白如何从flow2中的flow1访问所有100个元素并使用它们进行一些转换.有人可以解释一下吗?或者甚至更好地发布一个简单的小例子?或两者?