mme*_*tic 5 multithreading scala akka playframework executioncontext
我试图测试ExecutionContext在游戏应用程序的行为,发现我不能够实现并行的任何程度时,我使用的是默认调度程序通过调用as.dispatcher,as.dispatchers.lookup("akka.actor.default-dispatcher")或通过将默认执行上下文作为参数传递给我的控制器类:
class HomeController @Inject()(cc: ControllerComponents)(implicit ec: ExecutionContext)
Run Code Online (Sandbox Code Playgroud)
我以此处提供的播放示例为基础。并添加/更改以下配置:
路线
GET /futures controllers.HomeController.testFutures(dispatcherId: String)
Run Code Online (Sandbox Code Playgroud)
common.conf
akka {
my-dispatcher {
executor = "fork-join-executor"
fork-join-executor {
# vm-cores = 4
parallelism-min = 4
parallelism-factor = 2.0
# 2x vm-cores
parallelism-max = 8
}
}
actor.default-dispatcher {
executor = "fork-join-executor"
fork-join-executor {
# vm-cores = 4
parallelism-min = 4
parallelism-factor = 2.0
# 2x vm-cores
parallelism-max = 8
}
}
}
Run Code Online (Sandbox Code Playgroud)
家庭控制器
@Singleton
class HomeController @Inject()(cc: ControllerComponents, as: ActorSystem) extends AbstractController(cc) {
import HomeController._
def testFutures(dispatcherId: String) = Action.async { implicit request =>
implicit val dispatcher = as.dispatchers.lookup(dispatcherId)
Future.sequence((0 to 10).map(i => Future {
val time = 1000 + Random.nextInt(200)
log.info(s"Sleeping #$i for $time ms")
Thread.sleep(time)
log.info(s"Awakening #$i")
})).map(_ => Ok("ok"))
}
}
Run Code Online (Sandbox Code Playgroud)
由于某些原因,对http://localhost:9000/futures?dispatcherId=akka.actor.default-dispatcher(default dispatcher)的调用不会并行化并产生以下输出:
[info] c.HomeController - Sleeping #0 for 1044 ms
[info] c.HomeController - Awakening #0
[info] c.HomeController - Sleeping #1 for 1034 ms
[info] c.HomeController - Awakening #1
[info] c.HomeController - Sleeping #2 for 1031 ms
[info] c.HomeController - Awakening #2
[info] c.HomeController - Sleeping #3 for 1065 ms
[info] c.HomeController - Awakening #3
[info] c.HomeController - Sleeping #4 for 1082 ms
[info] c.HomeController - Awakening #4
[info] c.HomeController - Sleeping #5 for 1057 ms
[info] c.HomeController - Awakening #5
[info] c.HomeController - Sleeping #6 for 1090 ms
[info] c.HomeController - Awakening #6
[info] c.HomeController - Sleeping #7 for 1165 ms
[info] c.HomeController - Awakening #7
[info] c.HomeController - Sleeping #8 for 1173 ms
[info] c.HomeController - Awakening #8
[info] c.HomeController - Sleeping #9 for 1034 ms
[info] c.HomeController - Awakening #9
[info] c.HomeController - Sleeping #10 for 1056 ms
[info] c.HomeController - Awakening #10
Run Code Online (Sandbox Code Playgroud)
但是,对此的调用http://localhost:9000/futures?dispatcherId=akka.my-dispatcher(使用另一个调度程序)使correclty并行化并产生以下输出。
[info] c.HomeController - Sleeping #1 for 1191 ms
[info] c.HomeController - Sleeping #0 for 1055 ms
[info] c.HomeController - Sleeping #7 for 1196 ms
[info] c.HomeController - Sleeping #4 for 1121 ms
[info] c.HomeController - Sleeping #6 for 1040 ms
[info] c.HomeController - Sleeping #2 for 1016 ms
[info] c.HomeController - Sleeping #5 for 1107 ms
[info] c.HomeController - Sleeping #3 for 1165 ms
[info] c.HomeController - Awakening #2
[info] c.HomeController - Sleeping #8 for 1002 ms
[info] c.HomeController - Awakening #6
[info] c.HomeController - Sleeping #9 for 1127 ms
[info] c.HomeController - Awakening #0
[info] c.HomeController - Sleeping #10 for 1016 ms
[info] c.HomeController - Awakening #5
[info] c.HomeController - Awakening #4
[info] c.HomeController - Awakening #3
[info] c.HomeController - Awakening #1
[info] c.HomeController - Awakening #7
[info] c.HomeController - Awakening #8
[info] c.HomeController - Awakening #10
[info] c.HomeController - Awakening #9
Run Code Online (Sandbox Code Playgroud)
任何想法为什么会发生这种情况?
小智 2
我认为行为是由akka.actor.default-dispatcheris 类型给出的BatchingExecutor,这将尝试在操作的情况下进行优化,例如map/flatmap通过在同一线程中执行它们来避免不必要的调度。在我们要阻塞的情况下,我们可以用提示来指示它scala.concurrent.blocking (Thread.sleep (time)),这样标记就存储在ThreadLocal[BlockContext],指示阻塞的意图,并且不会应用优化,而是将操作扔到另一个线程中。
Thread.sleep(time)如果您为此更改此行,scala.concurrent.blocking(Thread.sleep(time))您将获得所需的行为
@Singleton
class HomeController @Inject()(cc: ControllerComponents, as: ActorSystem) extends AbstractController(cc) {
import HomeController._
def testFutures(dispatcherId: String) = Action.async { implicit request =>
implicit val dispatcher = as.dispatchers.lookup(dispatcherId)
Future.sequence((0 to 10).map(i => Future {
val time = 1000 + Random.nextInt(200)
log.info(s"Sleeping #$i for $time ms")
scala.concurrent.blocking(Thread.sleep(time))
log.info(s"Awakening #$i")
})).map(_ => Ok("ok"))
}
}
Run Code Online (Sandbox Code Playgroud)
[info] play.api.Play - Application started (Dev) (no global state)
Sleeping #0 for 1062 ms
Sleeping #1 for 1128 ms
Sleeping #2 for 1189 ms
Sleeping #3 for 1105 ms
Sleeping #4 for 1169 ms
Sleeping #5 for 1178 ms
Sleeping #6 for 1057 ms
Sleeping #7 for 1003 ms
Sleeping #8 for 1164 ms
Sleeping #9 for 1029 ms
Sleeping #10 for 1005 ms
Awakening #7
Awakening #10
Awakening #9
Awakening #6
Awakening #0
Awakening #3
Awakening #1
Awakening #8
Awakening #4
Awakening #5
Awakening #2
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
116 次 |
| 最近记录: |