dk1*_*k14 3 multithreading scala future akka
我们正在为每个(小)传入的消息组创建一个actor链,以保证它们的顺序处理和管道(组通过公共id区分).问题是,我们的链叉,就像A1 -> (A2 -> A3 | A4 -> A5)我们要保证消息经历之间没有种族A2 -> A3和A4 -> A5.传统的解决方案是阻止A1actor直接当前消息被完全处理(在一个子链中):
def receive { //pseudocode
case x => ...
val f = A2orA4 ? msg
Await.complete(f, timeout)
}
Run Code Online (Sandbox Code Playgroud)
因此,应用程序中的线程数与处理中的消息数成正比,无论这些消息是活动的还是异步等待来自外部服务的某些响应.它使用fork-join(或任何其他动态)池工作大约两年,但当然不能使用固定池,并且在高负载的情况下极大地降低性能.更重要的是,它会影响GC,因为每个被阻塞的fork-actor都会保留冗余的先前消息的状态.
即使使用背压,它也会创建比收到的消息多N倍的线程(因为流中有N个顺序叉),这仍然很糟糕,因为一条消息的处理需要很长时间但CPU不多.所以我们应该处理更多的消息,因为我们有足够的内存.我提出的第一个解决方案 - 将链条线性化A1 -> A2 -> A3 -> A4 -> A5.还有更好的吗?
更简单的解决方案是将最后收到的消息的未来存储到actor的状态,并将其与之前的未来联系起来:
def receive = process(Future{new Ack}) //completed future
def process(prevAck: Future[Ack]): Receive = { //pseudocode
case x => ...
context become process(prevAck.flatMap(_ => A2orA4 ? msg))
}
Run Code Online (Sandbox Code Playgroud)
所以它将创造一个没有任何阻碍的期货链.期货完成后,该连锁店将被删除(最后一个除外).