我正在寻找C/CPP中工作窃取队列的正确实现.我环顾了谷歌,但没有找到任何有用的东西.
也许有人熟悉一个好的开源实现?(我不想实施原始学术论文中的伪代码).
我想Runnable通过一种方法将任务提交到ForkJoinPool:
forkJoinPool.submit(Runnable task)
Run Code Online (Sandbox Code Playgroud)
注意,我使用的是JDK 7.
在引擎盖下,它们被转换为ForkJoinTask对象.我知道当一个任务以递归方式分成较小的任务时,ForkJoinPool是有效的.
题:
如果没有递归,窃取工作仍然可以在ForkJoinPool中工作吗?
在这种情况下值得吗?
更新1: 任务很小,可能不平衡.即使对于严格相同的任务,诸如上下文切换,线程调度,停车,页面未命中等事情也会妨碍导致不平衡.
更新2: Doug Lea在Concurrency JSR-166兴趣小组中写道,给出了一个暗示:
当所有任务都是异步并提交到池而不是分叉时,这也极大地提高了吞吐量,这成为构造actor框架的合理方法,以及许多您可能使用ThreadPoolExecutor的普通服务.
我认为,当涉及到相当小的CPU绑定任务时,由于这种优化,ForkJoinPool是可行的方法.重点是这些任务已经很小,不需要递归分解.工作窃取工作,无论是大型还是小型任务 - 任务都可以被来自忙碌工人的Deque尾巴的另一个自由工作者抓住.
更新3: ForkJoinPool的可扩展性 - Akka乒乓球队的基准测试显示了很好的结果.
尽管如此,要更有效地应用ForkJoinPool需要进行性能调整.
以下是否正确?
(当涉及多个生产者(即CAS操作)时,破坏者模式是否仍然比其他无锁多生产者多消费者队列(例如来自提升)快得多?)
我的情况详细:
处理条目可以产生几个新条目,这些条目也必须最终处理.性能具有最高优先级,以FIFO顺序处理的条目具有第二优先级.
在当前实现中,每个线程使用本地FIFO,在其中添加新条目.空闲线程从其他线程的本地FIFO中窃取工作.线程处理之间的依赖关系使用无锁,机械同情的哈希表(写入时的CAS,具有桶粒度)来解决.这导致相当低的争用,但FIFO顺序有时会被破坏.
使用干扰模式可以保证FIFO顺序.但是不会将条目分配到线程上导致更高的争用(例如,读取游标上的CAS),而不是工作窃取的本地FIFO(每个线程的吞吐量大致相同)?
我发现的参考文献
关于破坏者的标准技术论文(第5章+6)中的性能测试不包括不相交的工作分布.
https://groups.google.com/forum/?fromgroups=#!topic/lmax-disruptor/tt3wQthBYd0是我在disruptor +偷窃工作中发现的唯一参考.它声明如果存在任何共享状态,每个线程的队列会显着减慢,但不会详细说明或解释原因.我怀疑这句话适用于我的情况:
concurrency concurrent-programming disruptor-pattern work-stealing
我一直在研究我正在实现的线程池的不同调度算法.由于我正在解决的问题的性质,我可以假设并行运行的任务是独立的,不会产生任何新任务.任务可以是不同的大小.
我立即采用最流行的调度算法"偷工作",使用无锁的deques作为本地工作队列,我对这种方法比较满意.但是我想知道是否有任何常见的情况,工作窃取不是最好的方法.
对于这个特殊问题,我对每个单独任务的大小有一个很好的估计.工作窃取没有利用这些信息,我想知道是否有任何调度程序可以提供更好的负载平衡,而不是工作窃取这些信息(显然具有相同的效率).
NB.这个问题与之前的问题有关.
algorithm multithreading load-balancing scheduling work-stealing
在我的项目中,我正在构建一个Java执行框架,用于接收来自客户端的工作请求.工作(不同大小)被分解为一组任务,然后排队等待处理.有单独的队列来处理每种类型的任务,每个队列都与ThreadPool相关联.ThreadPools的配置方式使得引擎的整体性能最佳.
这种设计有助于我们有效地平衡请求,并且大量请求不会最终占用系统资源.但是,当某些队列为空且其各自的线程池处于空闲状态时,解决方案有时会失效.
为了使这更好,我正在考虑实现一个工作/任务窃取技术,以便负载很重的队列可以从其他ThreadPools获得帮助.但是,这可能需要实现我自己的Executor,因为Java不允许多个队列与ThreadPool相关联,并且不支持工作窃取概念.
阅读有关Fork/Join的信息,但这似乎不适合我的需求.构建此解决方案的任何建议或替代方法都非常有用.
谢谢安迪
是否有与Java 8一起使用的待处理任务队列Executors.newWorkStealingPool()?
例如,假设#available核心为2,并且Executors.newWorkStealingPool()为空,因为2个任务已在运行.那么如果第三个任务被提交给工作窃取执行者会发生什么?排队了吗?如果是,那么在队列中有什么限制?
提前致谢.
你好,我认为使用 CompletableFuture 和默认设置ForkJoinPool我可以优化任务的执行而不是经典,ExecutorService但我错过了一些东西
使用此代码执行需要 1 秒,我有 3 个工作线程:
for (int i = 0; i < 3; i++) {
final int counter = i;
listTasks.add(CompletableFuture.supplyAsync(() -> {
Thread.sleep(1000);
System.out.println("Looking up " + counter + " on thread " + Thread.currentThread().getName());
return null;
}));
}
Run Code Online (Sandbox Code Playgroud)
好吧,看起来很正常。
但是使用此代码,需要 3 秒:
for (int i = 0; i < 9; i++) {
final int counter = i;
listTasks.add(CompletableFuture.supplyAsync(() -> {
Thread.sleep(1000);
System.out.println("Looking up " + counter + " on thread " …Run Code Online (Sandbox Code Playgroud) java multithreading asynchronous work-stealing completable-future
我想改进我的fork/join小例子,以表明在Java Fork/Join框架执行工作中发生窃取.
我需要对代码进行哪些更改?示例目的:只需对多个线程之间的值分解进行线性研究.
package com.stackoverflow.questions;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
public class CounterFJ<T extends Comparable<T>> extends RecursiveTask<Integer> {
private static final long serialVersionUID = 5075739389907066763L;
private List<T> _list;
private T _test;
private int _lastCount = -1;
private int _start;
private int _end;
private int _divideFactor = 4;
private static final int THRESHOLD = 20;
public CounterFJ(List<T> list, T test, int start, int end, int factor) {
_list = list;
_test = test;
_start = start; …Run Code Online (Sandbox Code Playgroud) 我正在尝试维护多个线程中的项目列表,每个线程一个(例如,每个线程说一个套接字连接)。我在维护此列表ArrayDeque<>。我面临的问题ArrayDeque<>是超过项目数超过项目数。线程池中的线程数。
这是我的代码:
package com.practice;
import java.util.ArrayDeque;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class CompletableFutureApp implements AutoCloseable {
private static void sleep(long timeMS) {
try {
Thread.sleep(timeMS);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
try (CompletableFutureApp completableFutureApp = new CompletableFutureApp()) {
Runnable[] tasksList1 = new Runnable[100];
for (int i = 0; i < tasksList1.length; i++) {
String msg1 = "TaskList 1 no.: " + i; …Run Code Online (Sandbox Code Playgroud) 我正在阅读一篇关于并发运行时的文章,本文中提到了算法work stealing.但我不知道这个算法是什么!所以我想要一些解释或一些好的链接,可以帮助我做一个关于这个算法的演示文稿.
work-stealing ×10
java ×6
algorithm ×3
fork-join ×2
queue ×2
arraydeque ×1
asynchronous ×1
c++ ×1
concurrency ×1
executor ×1
forkjoinpool ×1
java-8 ×1
scheduling ×1
threadpool ×1