标签: work-stealing

在C/C++中实现工作窃取队列?

我正在寻找C/CPP中工作窃取队列的正确实现.我环顾了谷歌,但没有找到任何有用的东西.

也许有人熟悉一个好的开源实现?(我不想实施原始学术论文中的伪代码).

c++ algorithm queue multithreading work-stealing

34
推荐指数
2
解决办法
1万
查看次数

Java ForkJoinPool具有非递归任务,是否可以正常工作?

我想Runnable通过一种方法将任务提交到ForkJoinPool:

forkJoinPool.submit(Runnable task)
Run Code Online (Sandbox Code Playgroud)

注意,我使用的是JDK 7.

在引擎盖下,它们被转换为ForkJoinTask对象.我知道当一个任务以递归方式分成较小的任务时,ForkJoinPool是有效的.

题:

如果没有递归,窃取工作仍然可以在ForkJoinPool中工作吗?

在这种情况下值得吗?

更新1: 任务很小,可能不平衡.即使对于严格相同的任务,诸如上下文切换,线程调度,停车,页面未命中等事情也会妨碍导致不平衡.

更新2: Doug Lea在Concurrency JSR-166兴趣小组中写道,给出了一个暗示:

当所有任务都是异步并提交到池而不是分叉时,这也极大地提高了吞吐量,这成为构造actor框架的合理方法,以及许多您可能使用ThreadPoolExecutor的普通服务.

我认为,当涉及到相当小的CPU绑定任务时,由于这种优化,ForkJoinPool是可行的方法.重点是这些任务已经很小,不需要递归分解.工作窃取工作,无论是大型还是小型任务 - 任务都可以被来自忙碌工人的Deque尾巴的另一个自由工作者抓住.

更新3: ForkJoinPool的可扩展性 - Akka乒乓球队的基准测试显示了很好的结果.

尽管如此,要更有效地应用ForkJoinPool需要进行性能调整.

java multithreading fork-join work-stealing forkjoinpool

25
推荐指数
1
解决办法
2512
查看次数

什么时候使用破坏者模式和当工作窃取本地存储?

以下是否正确?

  • 破坏者模式具有更好的并行性能和可扩展性,如果每个条目有多种方式(IO操作或注释)进行处理,因为可以使用多个消费者不争进行并行化.
  • 相反,工作窃取(即在本地存储条目和从其他线程窃取条目)具有更好的并行性能和可伸缩性,如果每个条目必须仅以单一方式处理,因为在干扰模式中将条目不相交地分配到多个线程会导致争用.

(当涉及多个生产者(即CAS操作)时,破坏者模式是否仍然比其他无锁多生产者多消费者队列(例如来自提升)快得多?)


我的情况详细:

处理条目可以产生几个新条目,这些条目也必须最终处理.性能具有最高优先级,以FIFO顺序处理的条目具有第二优先级.

在当前实现中,每个线程使用本地FIFO,在其中添加新条目.空闲线程从其他线程的本地FIFO中窃取工作.线程处理之间的依赖关系使用无锁,机械同情的哈希表(写入时的CAS,具有桶粒度)来解决.这导致相当低的争用,但FIFO顺序有时会被破坏.

使用干扰模式可以保证FIFO顺序.但是不会将条目分配到线程上导致更高的争用(例如,读取游标上的CAS),而不是工作窃取的本地FIFO(每个线程的吞吐量大致相同)?


我发现的参考文献

关于破坏者的标准技术论文(第5章+6)中的性能测试不包括不相交的工作分布.

https://groups.google.com/forum/?fromgroups=#!topic/lmax-disruptor/tt3wQthBYd0是我在disruptor +偷窃工作中发现的唯一参考.它声明如果存在任何共享状态,每个线程的队列会显着减慢,但不会详细说明或解释原因.我怀疑这句话适用于我的情况:

  • 使用无锁哈希表解析共享状态;
  • 必须在消费者之间不相交地分发条目;
  • 除了工作窃取之外,每个线程只在其本地队列中进行读写.

concurrency concurrent-programming disruptor-pattern work-stealing

11
推荐指数
1
解决办法
2267
查看次数

工作窃取始终是最合适的用户级线程调度算法吗?

我一直在研究我正在实现的线程池的不同调度算法.由于我正在解决的问题的性质,我可以假设并行运行的任务是独立的,不会产生任何新任务.任务可以是不同的大小.

我立即采用最流行的调度算法"偷工作",使用无锁的deques作为本地工作队列,我对这种方法比较满意.但是我想知道是否有任何常见的情况,工作窃取不是最好的方法.

对于这个特殊问题,我对每个单独任务的大小有一个很好的估计.工作窃取没有利用这些信息,我想知道是否有任何调度程序可以提供更好的负载平衡,而不是工作窃取这些信息(显然具有相同的效率).

NB.这个问题与之前的问题有关.

algorithm multithreading load-balancing scheduling work-stealing

10
推荐指数
1
解决办法
3690
查看次数

工作/任务窃取ThreadPoolExecutor

在我的项目中,我正在构建一个Java执行框架,用于接收来自客户端的工作请求.工作(不同大小)被分解为一组任务,然后排队等待处理.有单独的队列来处理每种类型的任务,每个队列都与ThreadPool相关联.ThreadPools的配置方式使得引擎的整体性能最佳.

这种设计有助于我们有效地平衡请求,并且大量请求不会最终占用系统资源.但是,当某些队列为空且其各自的线程池处于空闲状态时,解决方案有时会失效.

为了使这更好,我正在考虑实现一个工作/任务窃取技术,以便负载很重的队列可以从其他ThreadPools获得帮助.但是,这可能需要实现我自己的Executor,因为Java不允许多个队列与ThreadPool相关联,并且不支持工作窃取概念.

阅读有关Fork/Join的信息,但这似乎不适合我的需求.构建此解决方案的任何建议或替代方法都非常有用.

谢谢安迪

java executor threadpool work-stealing

7
推荐指数
1
解决办法
5511
查看次数

在Java 8中,Executors.newWorkStealingPool()是否也提供了一个任务队列?

是否有与Java 8一起使用的待处理任务队列Executors.newWorkStealingPool()

例如,假设#available核心为2,并且Executors.newWorkStealingPool()为空,因为2个任务已在运行.那么如果第三个任务被提交给工作窃取执行者会发生什么?排队了吗?如果是,那么在队列中有什么限制?

提前致谢.

java multithreading executorservice java-8 work-stealing

5
推荐指数
1
解决办法
4624
查看次数

在 ForkJoinpool 上使用 CompletableFuture 并避免线程等待

你好,我认为使用 CompletableFuture 和默认设置ForkJoinPool我可以优化任务的执行而不是经典,ExecutorService但我错过了一些东西

使用此代码执行需要 1 秒,我有 3 个工作线程:

for (int i = 0; i < 3; i++) {
    final int counter = i;
    listTasks.add(CompletableFuture.supplyAsync(() -> {
        Thread.sleep(1000);
        System.out.println("Looking up " + counter + " on thread " + Thread.currentThread().getName());
        return null;
    }));
}
Run Code Online (Sandbox Code Playgroud)

好吧,看起来很正常。

但是使用此代码,需要 3 秒:

for (int i = 0; i < 9; i++) {
    final int counter = i;
    listTasks.add(CompletableFuture.supplyAsync(() -> {
        Thread.sleep(1000);
        System.out.println("Looking up " + counter + " on thread " …
Run Code Online (Sandbox Code Playgroud)

java multithreading asynchronous work-stealing completable-future

5
推荐指数
1
解决办法
5502
查看次数

如何在Java Fork/Join框架中显示工作窃取?

我想改进我的fork/join小例子,以表明在Java Fork/Join框架执行工作中发生窃取.

我需要对代码进行哪些更改?示例目的:只需对多个线程之间的值分解进行线性研究.

package com.stackoverflow.questions;

import java.util.LinkedList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;

public class CounterFJ<T extends Comparable<T>> extends RecursiveTask<Integer> {

    private static final long serialVersionUID = 5075739389907066763L;
    private List<T> _list;
    private T _test;
    private int _lastCount = -1;
    private int _start;
    private int _end;
    private int _divideFactor = 4;

    private static final int THRESHOLD = 20;

    public CounterFJ(List<T> list, T test, int start, int end, int factor) {
        _list = list;
        _test = test;
        _start = start; …
Run Code Online (Sandbox Code Playgroud)

java fork-join work-stealing

2
推荐指数
1
解决办法
1769
查看次数

Java 8:ArrayDeque &lt;&gt;。poll在并行环境中返回null

我正在尝试维护多个线程中的项目列表,每个线程一个(例如,每个线程说一个套接字连接)。我在维护此列表ArrayDeque<>。我面临的问题ArrayDeque<>是超过项目数超过项目数。线程池中的线程数。

这是我的代码:

package com.practice;

import java.util.ArrayDeque;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class CompletableFutureApp implements AutoCloseable {
    private static void sleep(long timeMS) {
        try {
            Thread.sleep(timeMS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        try (CompletableFutureApp completableFutureApp = new CompletableFutureApp()) {

            Runnable[] tasksList1 = new Runnable[100];

            for (int i = 0; i < tasksList1.length; i++) {
                String msg1 = "TaskList 1 no.: " + i; …
Run Code Online (Sandbox Code Playgroud)

java arraydeque work-stealing completable-future

1
推荐指数
1
解决办法
28
查看次数

工作窃取算法

我正在阅读一篇关于并发运行时的文章,本文中提到了算法work stealing.但我不知道这个算法是什么!所以我想要一些解释或一些好的链接,可以帮助我做一个关于这个算法的演示文稿.

algorithm queue work-stealing

0
推荐指数
3
解决办法
1万
查看次数