我在这里查看文档:http://doc.akka.io/docs/akka/2.3.3/java/dispatchers.html
我们以这样的方式使用Akka,我们为不同的actor提供了两个独立的调度程序(默认的fork-join执行程序).我们现在遇到一些性能问题,我们正在研究如何调整调度程序配置参数并查看它们如何影响应用程序的性能.
我查看了文档,但并不真正了解配置参数.例如,仅针对简单的默认值,fork-join-executor调度程序:
这些是什么以及如何配置它们以了解它们如何影响应用程序性能?
# Min number of threads to cap factor-based parallelism number to
parallelism-min = 2
# Parallelism (threads) ... ceil(available processors * factor)
parallelism-factor = 2.0
# Max number of threads to cap factor-based parallelism number to
parallelism-max = 10
Run Code Online (Sandbox Code Playgroud)
谢谢!
我们在负载测试和扩展akka应用程序方面取得了一些进展,但我们看到scala.concurrent.forkjoin.ForkJoinPool.scan()在visualvm中成为第二大热点,约占自我时间的20%.自我时间(CPU)列仅表示其中的一小部分(小于自身时间列值的1%).
我怀疑这意味着阻塞或上下文切换可能存在问题,但我不太确定 - 任何人都可以提供洞察力吗?如果它是上下文切换我猜测调度调度程序吞吐量到更高的数量可能会增加我们的收益,否则如果它是由阻塞引起的,我们需要更多地读取代码.
任何见解都非常感激.
通常当使用Java 8的parallelStream()时,结果是通过默认的公共fork-join池(即ForkJoinPool.commonPool())执行.
然而,如果一个人的工作远离CPU限制,例如可能在很多时候都在等待IO,那么这显然是不可取的.在这种情况下,人们会想要使用一个单独的池,根据其他标准确定大小(例如,任务可能实际使用CPU的时间有多少).
有没有明显得到parallelStream()使用不同池的方式,但作为一个详细的方式在这里.
不幸的是,这种方法需要从fork-join池线程调用并行流上的终端操作.这样做的缺点是,如果目标分支连接池完全忙于现有工作,整个执行将等待它,而什么都不做.因此,池可能成为比单线程执行更糟糕的瓶颈.相反,当以"正常"方式使用parallelStream()时,使用ForkJoinPool.common.externalHelpComplete()或ForkJoinPool.common.tryExternalUnpush()来让池外的调用线程帮助处理.
有谁知道的方式既获得parallelStream()使用非默认的fork-join池,并有从的fork-join池帮助外的调用线程在这项工作中的处理(但fork-不休息加入游泳池的工作)?
在Java 8中,有两种方法可以启动异步计算 - CompletableFuture和ForkJoinTask.它们看起来都非常相似 - 内部类CompletableFuture甚至延伸ForkJoinTask.
是否有理由使用其中一个?
我可以看到的一个关键区别是该CompletableFuture.join方法只是阻塞直到将来完成(waitingGet只是使用a旋转ManagedBlocker),而a ForkJoinTask.join可以从队列中窃取工作以帮助您加入的任务完成.
有没有超过一个或另一个的好处?
我想了解Java fork-join池中处理任务的顺序.
到目前为止,我在文档中找到的唯一相关信息是关于一个名为"asyncMode"的参数,如果此池对从未加入的分叉任务使用本地先进先出调度模式,则为"true" .
我对这个陈述的解释是每个工人都有自己的任务队列; 工人从他们自己的队列前面接受任务,或者如果他们自己的队列是空的,就偷走其他工人队列的后面; 如果asyncMode为true(resp.false),worker会将新分叉的任务添加到自己队列的后面(resp.front).
如果我的解释错误,请纠正我!
现在,这提出了几个问题:
1)加入的分叉任务的顺序是什么?
我的猜测是,当一个任务被分叉时,它会被添加到工作者的队列中,如上面的解释所述.现在,假设任务已加入......
如果在调用join时,任务尚未启动,则工作者调用join将把任务拉出队列并立即开始处理它.
如果在调用join时,该任务已经被另一个worker偷走了,那么调用join的worker将同时处理其他任务(按照我上面的解释中描述的获取任务的顺序),直到它的任务为止.加入已被偷走的工人完成了.
这种猜测基于使用print语句编写简单的测试代码,并观察改变连接调用顺序的方式会影响处理任务的顺序.有人可以告诉我,我的猜测是否正确?
2)外部提交的任务的顺序是什么?
根据此问题的答案,fork-join池不使用外部队列.(顺便说一下,我正在使用Java 8.)
所以我要理解,当外部提交任务时,任务会被添加到随机选择的工作队列中吗?
如果是这样,外部提交的任务是否添加到队列的后面或前面?
最后,这取决于是通过调用pool.execute(任务)还是通过调用pool.invoke(task)来提交任务?这取决于调用pool.execute(task)或pool.invoke(task)的线程是外部线程还是此fork-join池中的线程?
Akka docs表示默认调度程序是一个fork-join-executor因为它"在大多数情况下都能提供出色的性能".
我想知道为什么会这样?
ForkJoinPool与其他类型的ExecutorService的不同之处主要在于使用工作窃取:池中的所有线程都尝试查找和执行提交给池的任务和/或由其他活动任务创建的任务(如果不存在则最终阻止等待工作) .这使得(1)在大多数任务产生其他子任务(如大多数ForkJoinTasks)时有效处理,以及(2)当许多小任务从外部客户端提交到池时.特别是在构造函数中将asyncMode设置为true时,ForkJoinPools也可以(3)适用于从未加入的事件样式任务.
起初,我猜Akka不是案例(1)的一个例子,因为我无法弄清楚Akka如何分配任务,我的意思是,在许多任务中可以分叉的任务是什么?
我认为每条消息都是一个独立的任务,这就是为什么我认为Akka与case(2)类似,其中消息是许多小任务(通过!和?)提交给ForkJoinPool.
下一个问题虽然与akka没有严格关系,但是,为什么ForkJoinPool fork_joinPool会使fork和join(允许工作窃取的主要功能)的用例仍然没有被使用?
从Fork Join Pool的可扩展性
我们注意到上下文切换的数量异常,超过每秒70000.
那一定是问题所在,但究竟是什么原因造成的呢?Viktor提出了合格的猜测,它必须是线程池执行器的任务队列,因为它是共享的,并且LinkedBlockingQueue中的锁可能会在存在争用时生成上下文切换.
但是,如果Akka不使用ForkJoinTasks,则外部客户端提交的所有任务都将在共享队列中排队,因此争用应与in中的相同ThreadPoolExecutor.
所以,我的问题是:
ForkJoinTasks(案例(1))还是与案例(2)有关?ForkJoinPool如果外部客户端提交的所有任务都被推送到共享队列并且不会发生工作窃取,为什么在情况(2)中是有益的?正确答案来自johanandren,但我想补充一些亮点.
现在,在(IIRC)JDK 7u12之前,ForkJoinPool有一个全局提交队列.当工作线程用尽本地任务以及窃取任务时,他们到达那里并试图查看外部工作是否可用.在这种设计中,对于由ArrayBlockingQueue支持的常规ThreadPoolExecutor没有任何优势.[...]
现在,外部提交进入其中一个提交队列.然后,没有工作的工作人员可以首先查看与特定工作者关联的提交队列,然后四处寻找其他人的提交队列.人们也可以称之为"偷工作".
因此,这可以在未使用fork join的情况下窃取工作.正如Doug Lea所说
当许多客户提交大量任务时,吞吐量大大提高.(我已经测量了高达60倍的speedupson微基准测试).我们的想法是以与工人类似的方式对待外部提交者 - 使用随机排队和转发.(这需要一个大的内部重构todisassociate工作队列和工作人员.)这也大大提高了所有任务异步并提交到池而不是分叉的吞吐量,这成为构建actor框架的合理途径,以及许多你可能使用的普通服务ThreadPoolExecutor for.
对于FJP来说,4%确实不多.你还需要注意FJP的权衡:FJP让线程旋转一段时间,以便能够更快地处理准时到达的工作.这确保了在许多情况下的良好延迟.特别是如果您的池过度配置,那么在几乎空闲的情况下,权衡时间与更多功耗有关.
我对ExecutorService和 的内部调度机制有点困惑ForkJoinPool。
我了解ExecutorService调度是通过这种方式完成的。
一堆任务在排队。一旦线程可用,它将处理第一个可用任务,依此类推。
同时, aForkJoinPool被表示为不同的,因为它使用了工作窃取算法。如果我理解正确,这意味着一个线程可以从另一个线程窃取一些任务。
然而,我并不真正理解在ExecutorService和 中实现的机制之间的区别ForkJoinPool。根据我的理解,这两种机制都应该尽可能减少每个线程的空闲时间。
我会理解,如果在 a 的情况下ExecutorService,每个线程都有自己的队列。然而,情况并非如此,因为队列由池的不同线程共享......
任何澄清将是非常受欢迎的!
来自java docs,
ForkJoinPool与其他类型的ExecutorService的不同之处主要在于使用工作窃取:池中的所有线程都尝试查找和执行由其他活动任务创建的子任务(如果不存在则最终阻塞等待工作).
这可以在大多数任务产生其他子任务时实现高效处理(与大多数ForkJoinTasks一样).在构造函数中将asyncMode设置为true时,ForkJoinPools也可能适用于从未加入的事件样式任务.
通过下面的ForkJoinPool示例,与ThreadPoolExecutor不同,我没有看到设置队列大小的参数.我没有弄清楚ForkJoinPool如何窃取机制.
//creating the ThreadPoolExecutor
ThreadPoolExecutor executorPool = new ThreadPoolExecutor(2, 10, 60, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(3000), threadFactory, rejectionHandler);
Run Code Online (Sandbox Code Playgroud)
假设我已经创建了具有10个线程的ThreadPoolExecutor,并且已经提交了3000个Callable任务.这些线程如何共享子任务的执行负载?
并且ForkJoin池如何针对相同的用例表现不同?
java multithreading java.util.concurrent threadpoolexecutor forkjoinpool
我们可以向 提交两种类型的任务forkJoinPool。一个是RecursiveAction,另一个是RecursiveTask。
它们之间有什么区别?
我使用 Java 16 通过 HTTP 向 API 发出请求。为了整体加快速度,我将其加载到自定义ForkJoinPool. 我在下面编译了一个重现示例。
自从迁移到 Java 17(openjdk build 17.0.1+12-39)后,这会抛出 RejectedExecutionException:
Caused by: java.util.concurrent.RejectedExecutionException: Thread limit exceeded replacing blocked worker
at java.base/java.util.concurrent.ForkJoinPool.tryCompensate(ForkJoinPool.java:1819)
at java.base/java.util.concurrent.ForkJoinPool.compensatedBlock(ForkJoinPool.java:3446)
at java.base/java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3432)
at java.base/java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1898)
at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2072)
at java.net.http/jdk.internal.net.http.HttpClientImpl.send(HttpClientImpl.java:553)
at java.net.http/jdk.internal.net.http.HttpClientFacade.send(HttpClientFacade.java:119)
at Test.lambda$retrieveMany$1(Test.java:30)
Run Code Online (Sandbox Code Playgroud)
为什么会出现这种情况?ForkJoinPool 是否发生了我不知道的变化?
代码
Caused by: java.util.concurrent.RejectedExecutionException: Thread limit exceeded replacing blocked worker
at java.base/java.util.concurrent.ForkJoinPool.tryCompensate(ForkJoinPool.java:1819)
at java.base/java.util.concurrent.ForkJoinPool.compensatedBlock(ForkJoinPool.java:3446)
at java.base/java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3432)
at java.base/java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1898)
at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2072)
at java.net.http/jdk.internal.net.http.HttpClientImpl.send(HttpClientImpl.java:553)
at java.net.http/jdk.internal.net.http.HttpClientFacade.send(HttpClientFacade.java:119)
at Test.lambda$retrieveMany$1(Test.java:30)
Run Code Online (Sandbox Code Playgroud) forkjoinpool ×10
java ×9
akka ×3
fork-join ×3
cpu-usage ×1
future ×1
java-17 ×1
java-8 ×1
java-stream ×1
openjdk-17 ×1
scalability ×1
threadpool ×1