标签: executorservice

生产者/消费者工作队列

我正在努力实现我的处理管道的最佳方法.

我的制作人将作品提供给BlockingQueue.在消费者方面,我轮询队列,包装我在Runnable任务中获得的内容,并将其提交给ExecutorService.

while (!isStopping())
{
    String work = workQueue.poll(1000L, TimeUnit.MILLISECONDS);
    if (work == null)
    {
        break;
    }
    executorService.execute(new Worker(work));   // needs to block if no threads!
}
Run Code Online (Sandbox Code Playgroud)

这不理想; 当然,ExecutorService有自己的队列,所以真正发生的事情是我总是完全耗尽我的工作队列并填充任务队列,随着任务的完成,队列会慢慢排空.

我意识到我可以在生产者端排队任务,但我真的不愿意这样做 - 我喜欢我的工作队列的间接/隔离是愚蠢的字符串; 它真的不是生产者的任何事情会发生在他们身上.迫使生产者对Runnable或Callable进行排队会破坏抽象,恕我直言.

但我确实希望共享工作队列代表当前的处理状态.如果消费者没有跟上,我希望能够阻止生产者.

我喜欢使用Executors,但我觉得我正在与他们的设计作斗争.我可以部分喝Kool-ade,还是我必须吞下它?我是否正在抵制排队任务?(我怀疑我可以设置ThreadPoolExecutor来使用1任务队列并覆盖它的执行方法来阻止而不是拒绝队列满,但这感觉很糟糕.)

建议?

java concurrency producer-consumer executorservice blockingqueue

10
推荐指数
1
解决办法
2万
查看次数

Java库维护进程池

我正在开发一个需要并发的Java守护进程:一个无限循环,它监听作业队列(redis)并将每个作业分配给一个worker.工作人员不一定要返回值.

我发现Executors非常有用,我使用ThreadPoolExecutor来维护许多工作线程.

但是,这些工作者运行的第三方代码需要尽可能隔离,避免共享静态属性.

我的问题:是否有任何Java库/框架提供类似于Executors的功能,例如:

  • 工人池
  • 自动调整池大小

..但产生进程而不是线程?

java concurrency multithreading executorservice threadpool

10
推荐指数
1
解决办法
2924
查看次数

Java ExecutorService暂停/恢复特定线程

有没有办法使用ExecutorService来暂停/恢复特定的线程?

private static ExecutorService threadpool = Executors.newFixedThreadPool(5);

想象一下,我想要停止作为id = 0的线程(假设为每个线程分配一个增量id,直到达到线程池的大小).

过了一会儿,按一个按钮让我们说,我想恢复那个特定的线程并让所有其他线程保持其当前状态,可以暂停或恢复.

我在Java文档中找到了一个未完成的PausableThreadPoolExecutor版本.但它不适合我需要的东西,因为它恢复了池中的所有线程.

如果没有办法使用ExecutorService的默认实现,那么任何人都可以指出这个问题的Java实现吗?

谢谢!

java concurrency multithreading executorservice threadpool

10
推荐指数
1
解决办法
1万
查看次数

如何使用Spring正确关闭执行程序服务?

我有一个命令行应用程序,它使用由以下ExecutorService创建的java组成的Spring管理的bean :

ExecutorService service = Executors.newFixedThreadPool(4);
Run Code Online (Sandbox Code Playgroud)

现在,我希望我的服务在我的应用程序关闭时关闭,所以我让我的bean实现了DisposableBean接口并且有一个destroy方法,例如:

public void destroy(){
  service.shutdown();
}
Run Code Online (Sandbox Code Playgroud)

然后我可能想做一些事情,比如在Spring上下文中注册一个关闭钩子.然而,我发现(艰难的方式,即在预生产版本中)这不起作用:在调用ExecutorService.shutdown()方法之前不会调用shutdown钩子,导致经典的catch 22问题(它会被调用中断,即,如果我在应用程序运行时按Ctrl-C).这逃过了我的单元测试,因为出于某种原因,它似乎在JUnit中工作正常,这仍然令我感到困惑:JUnit的做法有何不同?

到目前为止我找到的解决方案是ApplicationContext.close()在退出main函数之前显式调用.我想知道是否有更好的解决方案,以及有什么是Spring管理的灵活线程池的最佳实践.如果我的bean 不是由Spring直接管理但是由Spring管理的bean创建的呢?我应该将调用级联到destroy()吗?这不是很容易出错吗?

我感谢任何评论,建议,进一步阅读,RTFM,魔术食谱.

谢谢!

java junit spring multithreading executorservice

10
推荐指数
2
解决办法
2万
查看次数

如何配置和调整Akka Dispatchers

我在这里查看文档:http://doc.akka.io/docs/akka/2.3.3/java/dispatchers.html

我们以这样的方式使用Akka,我们为不同的actor提供了两个独立的调度程序(默认的fork-join执行程序).我们现在遇到一些性能问题,我们正在研究如何调整调度程序配置参数并查看它们如何影响应用程序的性能.

我查看了文档,但并不真正了解配置参数.例如,仅针对简单的默认值,fork-join-executor调度程序:

这些是什么以及如何配置它们以了解它们如何影响应用程序性能?

# Min number of threads to cap factor-based parallelism number to
parallelism-min = 2
# Parallelism (threads) ... ceil(available processors * factor)
parallelism-factor = 2.0
# Max number of threads to cap factor-based parallelism number to
parallelism-max = 10
Run Code Online (Sandbox Code Playgroud)

谢谢!

java executorservice akka forkjoinpool

10
推荐指数
1
解决办法
6102
查看次数

如何在多线程环境中更好地使用ExecutorService?

我需要创建一个库,在其中我将有同步和异步方法.

  • executeSynchronous() - 等到我有结果,返回结果.
  • executeAsynchronous() - 立即返回Future,如果需要,可在其他事情完成后处理.

我的图书馆的核心逻辑

客户将使用我们的库,他们将通过传递DataKey构建器对象来调用它.然后我们将使用该DataKey对象构造一个URL,并通过执行它来对该URL进行HTTP客户端调用,然后在我们将响应作为JSON字符串返回之后,我们将通过创建DataResponse对象将该JSON字符串发送回我们的客户.有些客户会打电话executeSynchronous(),有些人可能会打电话executeAsynchronous(),这就是为什么我需要在我的库中单独提供两种方法.

接口:

public interface Client {

    // for synchronous
    public DataResponse executeSynchronous(DataKey key);

    // for asynchronous
    public Future<DataResponse> executeAsynchronous(DataKey key);
}
Run Code Online (Sandbox Code Playgroud)

然后我有我DataClient实现上面的Client接口:

public class DataClient implements Client {

    private RestTemplate restTemplate = new RestTemplate();
    // do I need to have all threads as non-daemon or I can have daemon thread for my use case?
    private ExecutorService executor = …
Run Code Online (Sandbox Code Playgroud)

java multithreading daemon callable executorservice

10
推荐指数
2
解决办法
1280
查看次数

.awaitTermination()是否在执行程序中完成工作之前发生了?

问题我已经多年了:在这个伪代码中,

ExecutorService svc = Executors.newFixedThreadPool(3);
svc.submit(new Runnable() { /* code A */ });
svc.shutdown();
if(svc.awaitTermination(...)) {
    // code B
Run Code Online (Sandbox Code Playgroud)

.awaitTermination()没有记录在代码A和B之间发生的事情. 有没有理由不是

ExecutorService的concurrent包的javadoc定义的之前发生完成的任务和工作之间之前,他们已提交,但不执行的任务和代码之间的成功后.awaitTermination()调用.

注意,我不是要求设计批评如何重构我的代码以利用记录的事先关系.我的问题是,在这种情况下,有没有理由说文档没有提及?

(请注意,尽管标题非常贴切,但这并不是22665198的重复.)

java concurrency executorservice memory-barriers happens-before

10
推荐指数
1
解决办法
271
查看次数

如何等待一个产生它自己的线程的线程?

我正在尝试测试一个在单独的线程中工作的方法,简化它是这样的:

public void methodToTest()
{
    Thread thread = new Thread()
    {
        @Override
        public void run() {
            Clazz.i = 2;
        }
    };
    thread.start();
}
Run Code Online (Sandbox Code Playgroud)

在我的单元测试中,我想测试Clazz.i == 2,但我不能这样做,因为我认为断言是在线程更改值之前运行的.我想使用另一个线程来测试它,然后使用join等待,但它仍然无法正常工作.

SSCCE:

@Test
public void sscce() throws InterruptedException
{
    Thread thread = new Thread()
    {
        @Override
        public void run() {
            methodToTest()
        }
    };
    thread.start();     
    thread.join();  
    AssertEquals(2, Clazz.i);
}

public static class Clazz
{
    public static int i = 0;
}
Run Code Online (Sandbox Code Playgroud)

我认为这是因为测试主代码创​​建了一个等待(加入)第二个线程的线程,但是第二个线程没有完成工作,它创建另一个线程来完成工作然后完成,这继续第一个线程,而第三个线程Clazz.i = 2在断言后执行.

我怎样才能使第一个线程等待它启动的线程以及该线程启动的任何线程

java junit multithreading executorservice countdownlatch

10
推荐指数
1
解决办法
1100
查看次数

ExecutorService,按顺序执行任务但从池中获取线程

我正在尝试使用以下属性构建一个实现ExecutorService,让我们调用它SequentialPooledExecutor.

  1. SequentialPooledExecutor共享相同线程池的所有实例

  2. 对相同实例的调用SequentialPooledExecutor按顺序执行.

换句话说,实例在开始处理其队列中的下一个任务之前等待当前正在执行的任务的终止.

我目前正在实施SequentialPooledExecutor自己,但我想知道我是否正在重新发明轮子.我研究了不同的实现ExecutorService,例如那些由Executors类提供的实现,但我找不到符合我要求的实现.

你知道我现有的实现是否缺失,还是我应该继续自己实现界面?

编辑:

我认为我的要求不是很清楚,让我们看看我是否可以用其他的话来解释它.

假设我有一系列会话,比如1000个(我以前称之为执行程序实例的东西).我可以向会话提交任务,我希望保证提交给同一会话的所有任务都按顺序执行.但是,属于不同会话的任务应该彼此没有依赖关系.

我想定义一个ExecutorService执行这些任务但是使用有限数量的线程,比方说200,但确保在同一个会话中的前一个任务完成之前没有启动任务.

我不知道是否存在已经存在的任何事情,或者我是否应该实施这样的ExecutorService事情.

java multithreading executorservice threadpool threadpoolexecutor

10
推荐指数
2
解决办法
3911
查看次数

ExecutorService,用于扩展线程,然后对任务进行排队

是否有一个ExecutorService实现行为类似于具有以下特征的线程池?

  1. 总是至少有X个活动线程.
  2. 如果提交任务并且所有活动线程都忙,则启动新线程,最多为Y个线程.
  3. 如果提交任务并且所有Y个线程都忙,则该任务将排队.
  4. 如果未提交任何新任务,则池将缩减为X活动线程.

相当标准的线程池行为.你认为ThreadPoolExecutor会处理这个,但是

executorService = new ThreadPoolExecutor(
    2, 10, // min/max threads
    60, TimeUnit.SECONDS, // time of inactivity before scaling back
    new SynchronousQueue<Runnable>()); // task queue
Run Code Online (Sandbox Code Playgroud)

如果提交的任务超过10个,则会抛出异常.切换到a LinkedBlockingQueue将永远不会扩展到两个最小线程之外,除非你限制大小new LinkedBlockingQueue<Runnable>(20),在这种情况下,将有两个线程处理1-20个任务,2-10个线程处理21-30个任务,并且异常超过30个任务.不漂亮.同时,固定的线程池永远不会缩小非活动线程.

那么,为了得到我所追求的东西,我可以使用其他类型BlockingQueue或小提琴我错过的其他设置吗?是否有另一个ExceutorService更适合的实现(哪个?),或者我最好通过重写execute()方法来自己滚动ThreadPoolExecutor

java multithreading executorservice

9
推荐指数
1
解决办法
2057
查看次数