Edg*_*erg 6 java concurrency multithreading scala executioncontext
出于某种原因,我无法解决这个问题.我有一个运行Play的应用程序调用Elastic Search.作为我的设计的一部分,我的服务使用包含scala future的Java API,如本博文中所示.我已经更新了该帖子中的代码以提示ExecutionContext,它将执行一些阻塞I/O,如下所示:
import scala.concurent.{blocking, Future, Promise}
import org.elasticsearch.action.{ActionRequestBuilder, ActionListener, ActionResponse }
def execute[RB <: ActionRequestBuilder[_, T, _, _]](request: RB): Future[T] = {
blocking {
request.execute(this)
promise.future
}
}
Run Code Online (Sandbox Code Playgroud)
我构建查询以发送给ES的实际服务将executionContext作为构造函数参数,然后用于调用弹性搜索.我这样做是为了让播放使用的全局执行上下文不会让它的线程被ES的阻塞调用所束缚.这个SO评论提到只有全局上下文才能阻塞,所以这让我不得不创建自己的.在同一个帖子/答案中有很多关于使用ForkJoin池的信息,但是我不确定如何使用这些文档中的内容并将其与阻塞文档中的提示相结合来创建响应阻塞的执行上下文提示.
我认为我遇到的一个问题是,我不确定如何首先如何回应阻止上下文?我正在阅读最佳实践,它使用的示例是一个无限的线程缓存:
请注意,在这里我更喜欢使用无限制的"缓存线程池",因此它没有限制.在阻塞I/O时,我们必须拥有足够的线程来阻止I/O. 但是如果无界限太多,取决于用例,你可以稍后对其进行微调,这个样本的想法是你得到了滚动.
那么这是否意味着我的ForkJoin支持的线程池,我应该尝试在处理非阻塞I/O时使用缓存线程并创建一个新线程来阻止IO?或者是其他东西?几乎我在网上找到的关于使用单独线程池的每个资源都倾向于执行新手指南所做的事情,也就是说:
如何调整各种线程池在很大程度上取决于您的个人应用程序,超出了本文的范围.
我知道这取决于你的应用程序,但在这种情况下,如果我只想创建某种类型的阻塞感知ExecutionContext并理解管理线程的合适策略.如果Context专门用于应用程序的单个部分,那么我应该只是制作一个固定的线程池大小而不是首先使用/忽略该blocking关键字吗?
我倾向于絮絮叨叨,所以我会试着在答案中分解我正在寻找的东西:
blocking那里的关键部分.对不起,这里有一个很长的问题,我只是想让你了解我正在看的东西,并且我一直试图绕过这一天超过一天需要一些外界的帮助.
编辑:为了清楚起见,ElasticSearch Service的构造函数签名是:
//Note that these are not implicit parameters!
class ElasticSearchService(otherParams ..., val executionContext: ExecutionContext)
Run Code Online (Sandbox Code Playgroud)
在我的应用程序启动代码我有这样的事情:
object Global extends GlobalSettings {
val elasticSearchContext = //Custom Context goes here
...
val elasticSearchService = new ElasticSearchService(params, elasticSearchContext);
...
}
Run Code Online (Sandbox Code Playgroud)
我也正在阅读Play对上下文的建议,但还没有看到关于阻止提示的任何内容,我怀疑我可能要查看源代码以查看它们是否扩展了BlockContext特征.
因此,我深入研究了文档,并且针对我正在处理的情况,Play 的最佳实践是
\n\n\n\n\n在某些情况下,您可能希望将工作分派给其他线程池。这可能包括 CPU 繁重工作或 IO 工作,例如数据库访问。为此,您应该首先创建一个线程池,这可以在 Scala 中轻松完成:
\n
并提供了一些代码:
\n\nobject Contexts {\n implicit val myExecutionContext: ExecutionContext = Akka.system.dispatchers.lookup("my-context")\n}\nRun Code Online (Sandbox Code Playgroud)\n\n上下文来自 Akka,所以我跑到那里搜索他们提供的默认值和上下文类型,这最终引导我找到了关于调度程序的文档。默认是ForkJoinPool,其管理块的默认方法是调用managedBlock(blocker). 这让我阅读了文档,其中指出:
\n\n\n根据给定的阻止程序进行阻止。如果当前线程是 ForkJoinWorkerThread,则此方法可能会在必要时安排激活一个备用线程,以确保当前线程被阻塞时有足够的并行性。
\n
所以看起来如果我有一个ForkJoinWorkerThread那么我认为我想要的行为就会发生。进一步查看 ForkJoinPool 的源代码,我注意到默认的线程工厂是:
val defaultForkJoinWorkerThreadFactory: ForkJoinWorkerThreadFactory = juc.ForkJoinPool.defaultForkJoinWorkerThreadFactory\nRun Code Online (Sandbox Code Playgroud)\n\n这对我来说意味着,如果我使用 Akka 中的默认值,我将获得一个以我期望的方式处理阻塞的上下文。
\n\n因此,再次阅读 Akka 文档,似乎指定我的上下文是这样的:
\n\nmy-context {\n type = Dispatcher\n executor = "fork-join-executor"\n fork-join-executor {\n parallelism-min = 8\n parallelism-factor = 3.0\n parallelism-max = 64\n task-peeking-mode = "FIFO"\n }\n throughput = 100\n}\nRun Code Online (Sandbox Code Playgroud)\n\n这就是我想要的。
\n\n当我在源代码中搜索时,我做了一些查找调用blocking或调用的用途,并找到了一个覆盖ThreadPoolBuildermanagedBlock中 ForkJoin 行为的示例
private[akka] class AkkaForkJoinWorkerThread(_pool: ForkJoinPool) extends ForkJoinWorkerThread(_pool) with BlockContext {\n override def blockOn[T](thunk: \xe2\x87\x92 T)(implicit permission: CanAwait): T = {\n val result = new AtomicReference[Option[T]](None)\n ForkJoinPool.managedBlock(new ForkJoinPool.ManagedBlocker {\n def block(): Boolean = {\n result.set(Some(thunk))\n true\n }\n def isReleasable = result.get.isDefined\n })\n result.get.get // Exception intended if None\n }\n }\nRun Code Online (Sandbox Code Playgroud)\n\n这似乎是我最初要求的如何制作实现 BlockContext 的示例。该文件还包含显示如何创建 ExecutorServiceFactory 的代码,我相信这是executor配置部分所引用的内容。所以我想如果我想要拥有完全自定义的上下文,我会做的就是扩展某种类型的 WorkerThread 并编写我自己的 ExecutorServiceFactory ,它使用自定义的工作线程,然后在属性中指定完全限定的类名,就像这篇文章建议的那样。
我可能会选择使用 Akka 的 forkjoin :)
\n| 归档时间: |
|
| 查看次数: |
857 次 |
| 最近记录: |