Yan*_*eve 4 asynchronous scala future akka
我编写了以下代码,以便在调用Future中的某些异步工作时尝试确定actor对传入消息的行为(实际上,异步工作是为Slick的异步数据库API建模):
val actor = system.actorOf(Props[WTFAsyncActor])
for (i <- 1 until 100) {
actor ! 'wtf + i.toString
Thread.sleep(200l) // yes I know actors should not invoke this method
}
Thread.sleep(10000l) // same as above
Run Code Online (Sandbox Code Playgroud)
Actor的代码如下:
class WTFAsyncActor extends Actor with ActorLogging{
import scala.concurrent.ExecutionContext.Implicits.global
override def receive: Receive = {
case i@_ =>
log.info(s"[$i] external block pre future")
Thread.`yield`() // just trying to relinquish control to induce context switching
Future {
log.info(s"[$i] internal block pre sleep")
Thread.sleep(1000l) // yeah bad - trying to emulate something meaningful like slick
log.info(s"[$i] internal block post sleep")
}
log.info(s"[$i] external block post future")
}
}
Run Code Online (Sandbox Code Playgroud)
我得到以下日志(摘自示例运行)
10:17:58.408 [wtf-async-test-akka.actor.default-dispatcher-2] INFO org.wtf.test.WTFAsyncActor - ['wtf1] external block pre future
10:17:58.420 [wtf-async-test-akka.actor.default-dispatcher-2] INFO org.wtf.test.WTFAsyncActor - ['wtf1] external block post future
10:17:58.421 [wtf-async-test-akka.actor.default-dispatcher-2] INFO org.wtf.test.WTFAsyncActor - ['wtf1] internal block pre sleep
10:17:58.599 [wtf-async-test-akka.actor.default-dispatcher-3] INFO org.wtf.test.WTFAsyncActor - ['wtf2] external block pre future
10:17:58.600 [wtf-async-test-akka.actor.default-dispatcher-2] INFO org.wtf.test.WTFAsyncActor - ['wtf2] external block post future
10:17:58.600 [wtf-async-test-akka.actor.default-dispatcher-2] INFO org.wtf.test.WTFAsyncActor - ['wtf2] internal block pre sleep
10:17:58.800 [wtf-async-test-akka.actor.default-dispatcher-2] INFO org.wtf.test.WTFAsyncActor - ['wtf3] external block pre future
10:17:58.801 [wtf-async-test-akka.actor.default-dispatcher-3] INFO org.wtf.test.WTFAsyncActor - ['wtf3] external block post future
10:17:58.801 [wtf-async-test-akka.actor.default-dispatcher-3] INFO org.wtf.test.WTFAsyncActor - ['wtf3] internal block pre sleep
10:17:59.001 [wtf-async-test-akka.actor.default-dispatcher-3] INFO org.wtf.test.WTFAsyncActor - ['wtf4] external block pre future
10:17:59.002 [wtf-async-test-akka.actor.default-dispatcher-3] INFO org.wtf.test.WTFAsyncActor - ['wtf4] external block post future
10:17:59.002 [wtf-async-test-akka.actor.default-dispatcher-3] INFO org.wtf.test.WTFAsyncActor - ['wtf4] internal block pre sleep
10:17:59.202 [wtf-async-test-akka.actor.default-dispatcher-2] INFO org.wtf.test.WTFAsyncActor - ['wtf5] external block pre future
10:17:59.206 [wtf-async-test-akka.actor.default-dispatcher-3] INFO org.wtf.test.WTFAsyncActor - ['wtf5] external block post future
10:17:59.402 [wtf-async-test-akka.actor.default-dispatcher-3] INFO org.wtf.test.WTFAsyncActor - ['wtf6] external block pre future
10:17:59.403 [wtf-async-test-akka.actor.default-dispatcher-2] INFO org.wtf.test.WTFAsyncActor - ['wtf6] external block post future
10:17:59.421 [wtf-async-test-akka.actor.default-dispatcher-3] INFO org.wtf.test.WTFAsyncActor - ['wtf1] internal block post sleep
10:17:59.421 [wtf-async-test-akka.actor.default-dispatcher-3] INFO org.wtf.test.WTFAsyncActor - ['wtf6] internal block pre sleep
10:17:59.607 [wtf-async-test-akka.actor.default-dispatcher-3] INFO org.wtf.test.WTFAsyncActor - ['wtf2] internal block post sleep
10:17:59.607 [wtf-async-test-akka.actor.default-dispatcher-3] INFO org.wtf.test.WTFAsyncActor - ['wtf5] internal block pre sleep
10:17:59.608 [wtf-async-test-akka.actor.default-dispatcher-3] INFO org.wtf.test.WTFAsyncActor - ['wtf7] external block pre future
Run Code Online (Sandbox Code Playgroud)
我认为可以肯定地说,只要有一些线程可用,无论Future块还没有完成执行,新消息都会被视为它们的来源.我对吗?
我设置要揭示的是接收块是否必须等待 Future块在处理新消息之前完成.好像它没有.(wtf2在wtf1完成对线程3的计算之前进入调度程序线程3,例如)
这种行为有什么警告吗?
请原谅看起来像个愚蠢的问题.我没有看到akka代码库的内幕(我现在对scala和akka来说太新了,目前)
receive在任何Future完成之前,可以处理后续消息.这种行为并没有任何警告,有些事情需要注意.特别是,不要在Future中关闭任何actor的可变状态.Future可以与actor正在处理的消息同时执行,打破了对actor状态的单线程访问的保证,并导致可能使actor的状态处于无效位置的竞争条件.
在想要从Actor中异步启动某些工作的地方,常见的模式是启动子actor来完成工作.子actor可以使用任何结果将消息发送回其父级,并且您确定子actor不会干扰父actor的状态.
| 归档时间: |
|
| 查看次数: |
626 次 |
| 最近记录: |