Tei*_*raz 5 scala future cats-effect
从本教程https://github.com/slouc/concurrency-in-scala-with-ce#threading \nasync 操作分为 3 组,需要显着不同的线程池来运行:
\n\n\n非阻塞异步操作:
\n有界池的线程数量非常少(甚至可能只有一个),但优先级非常高。这些线程基本上大部分时间都处于空闲状态,并不断轮询是否有新的异步 IO 通知。这些线程处理请求所花费的时间直接映射到应用程序延迟,因此除了接收通知并将其转发到应用程序的其余部分之外,在此池中不执行任何其他工作非常重要。\n有界池,具有非常大的延迟线程数量较少(甚至可能只有一个),但优先级非常高。这些线程基本上大部分时间都处于空闲状态,并不断轮询是否有新的异步 IO 通知。这些线程处理请求所花费的时间直接映射到应用程序延迟,因此除了接收通知并将其转发到应用程序的其余部分之外,不要在此池中完成其他工作,这一点非常重要。
\n
\n\n阻塞异步操作:
\n无界缓存池。无限制,因为阻塞操作可以(并且将会)阻塞线程一段时间,并且我们希望能够同时服务其他 I/O 请求。缓存是因为创建太多线程可能会耗尽内存,因此重用现有线程很重要。
\n
\n\nCPU 密集型操作:
\n固定池,其中线程数等于 CPU 核心数。这非常简单。回到过去,“黄金法则”是线程数 = CPU 核心数 + 1,但“+1”来自这样一个事实:总是为 I/O 保留一个额外的线程(如上所述,我们现在有单独的池)。
\n
在我的 Cats Effect 应用程序中,我使用基于 Scala Future 的 ReactiveMongo lib 来访问 MongoDB,它在与 MongoDB 通信时不会阻塞线程,例如执行非阻塞 IO。
\n它需要执行上下文。\n猫效果提供默认执行上下文IOApp.executionContext
我的问题是:我应该使用哪个执行上下文来进行非阻塞 io?
\nIOApp.executionContext?
但是,从IOApp.executionContext文档来看:
\n\n为应用程序提供默认的 ExecutionContext。
\nJVM 顶部的默认设置是根据可用 CPU 的数量延迟构建为固定线程池(请参阅 PoolUtils)。
\n
看起来这个执行上下文属于我上面列出的第三组 - CPU-heavy operations (Fixed pool in which number of threads equals the number of CPU cores.),这让我认为 IOApp.executionContext 不是非阻塞 IO 的良好候选者。
我是对的吗?我应该为非阻塞 IO 创建一个带有固定线程池(1 或 2 个线程)的单独上下文(因此它将属于我上面列出的第一组 - Non-blocking asynchronous operations: Bounded pool with a very low number of threads (maybe even just one), with a very high priority.)?
或者是IOApp.executionContext为 CPU 密集型和非阻塞 IO 操作而设计的?
我用来将 Scala Future 转换为 F 并排除执行上下文的函数:
\ndef scalaFutureToF[F[_]: Async, A](\n future: => Future[A]\n )(implicit ec: ExecutionContext): F[A] =\n Async[F].async { cb =>\n future.onComplete {\n case Success(value) => cb(Right(value))\n case Failure(exception) => cb(Left(exception))\n }\n }\nRun Code Online (Sandbox Code Playgroud)\n
在《猫效应 3》中,每个IOApp角色都有Runtime:
final class IORuntime private[effect] (
val compute: ExecutionContext,
private[effect] val blocking: ExecutionContext,
val scheduler: Scheduler,
val shutdown: () => Unit,
val config: IORuntimeConfig,
private[effect] val fiberErrorCbs: FiberErrorHashtable = new FiberErrorHashtable(16)
)
Run Code Online (Sandbox Code Playgroud)
您几乎总是希望保留默认值,而不是胡乱声明自己的运行时,除非在测试或教育示例中。
在您的内部,IOApp您可以compute通过以下方式访问池:
runtime.compute
Run Code Online (Sandbox Code Playgroud)
如果你想执行阻塞操作,那么你可以使用以下blocking结构:
blocking(IO(println("foo!"))) >> IO.unit
Run Code Online (Sandbox Code Playgroud)
通过这种方式,您可以告诉 CE3 运行时此操作可能会被阻塞,因此应该分派到专用池。看这里。
CE2呢?嗯,它有类似的机制,但它们非常笨重,而且还包含很多惊喜。例如,阻塞调用是使用Blocker它来安排的,然后必须以某种方式凭空召唤或通过整个应用程序进行线程化,并且线程池定义是使用尴尬的ContextShift. 如果您有任何选择,我强烈建议您投入一些精力迁移到 CE3。
很好,但是 Reactive Mongo 呢?
ReactiveMongo 使用 Netty(基于 Java NIO API)。而且Netty有自己的线程池。这在 Netty 5 中发生了变化(请参见此处),但 ReactiveMongo 似乎仍在 Netty 4 上(请参见此处)。
但是,您要问的是将执行回调的ExecutionContext线程池。这可以是您的计算池。
让我们看一些代码。首先,你的翻译方法。我只是更改async为,async_因为我使用的是 CE3,并且添加了线程打印行:
def scalaFutureToF[F[_]: Async, A](future: => Future[A])(implicit ec: ExecutionContext): F[A] =
Async[F].async_ { cb =>
future.onComplete {
case Success(value) => {
println(s"Inside Callback: [${Thread.currentThread.getName}]")
cb(Right(value))
}
case Failure(exception) => cb(Left(exception))
}
}
Run Code Online (Sandbox Code Playgroud)
现在假设我们有两个执行上下文 - 一个来自我们的执行IOApp上下文,另一个将代表 ReactiveMongo 用于运行Future. 这是虚构的 ReactiveMongo:
val reactiveMongoContext: ExecutionContext =
ExecutionContext.fromExecutor(Executors.newFixedThreadPool(1))
Run Code Online (Sandbox Code Playgroud)
另一个就是简单的runtime.compute。
现在让我们Future这样定义:
def myFuture: Future[Unit] = Future {
println(s"Inside Future: [${Thread.currentThread.getName}]")
}(reactiveMongoContext)
Run Code Online (Sandbox Code Playgroud)
请注意,我们如何Future通过将 传递reactiveMongoContext给 ReactiveMongo 来假装它在 ReactiveMongo 内部运行。
最后,让我们运行该应用程序:
override def run: IO[Unit] = {
val myContext: ExecutionContext = runtime.compute
scalaFutureToF(myFuture)(implicitly[Async[IO]], myContext)
}
Run Code Online (Sandbox Code Playgroud)
这是输出:
内部未来:[pool-1-thread-1]
内部回调:[io-compute-6]
我们提供的执行上下文scalaFutureToF只是运行回调。Future 本身运行在我们单独的线程池上,该线程池代表 ReactiveMongo 的池。实际上,您无法控制该池,因为它来自 ReactiveMongo 内部。
额外信息
顺便说一句,如果您不使用类型类层次结构 ( F),而是IO直接使用值,那么您可以使用以下简化方法:
def scalaFutureToIo[A](future: => Future[A]): IO[A] =
IO.fromFuture(IO(future))
Run Code Online (Sandbox Code Playgroud)
看看这个甚至不需要您传递一个ExecutionContext- 它只是使用您的计算池。或者更具体地说,它使用为 定义的任何内容def executionContext: F[ExecutionContext],Async[IO]结果是计算池。让我们检查:
override def run: IO[Unit] = {
IO.executionContext.map(ec => println(ec == runtime.compute))
}
// prints true
Run Code Online (Sandbox Code Playgroud)
最后但并非最不重要的:
如果我们确实有办法指定 ReactiveMongo 的底层 Netty 应该使用哪个线程池,那么是的,在这种情况下我们肯定应该使用单独的池。我们永远不应该将我们的runtime.compute池提供给其他运行时。
| 归档时间: |
|
| 查看次数: |
2705 次 |
| 最近记录: |