我正在努力实现我的处理管道的最佳方法.
我的制作人将作品提供给BlockingQueue.在消费者方面,我轮询队列,包装我在Runnable任务中获得的内容,并将其提交给ExecutorService.
while (!isStopping())
{
String work = workQueue.poll(1000L, TimeUnit.MILLISECONDS);
if (work == null)
{
break;
}
executorService.execute(new Worker(work)); // needs to block if no threads!
}
Run Code Online (Sandbox Code Playgroud)
这不理想; 当然,ExecutorService有自己的队列,所以真正发生的事情是我总是完全耗尽我的工作队列并填充任务队列,随着任务的完成,队列会慢慢排空.
我意识到我可以在生产者端排队任务,但我真的不愿意这样做 - 我喜欢我的工作队列的间接/隔离是愚蠢的字符串; 它真的不是生产者的任何事情会发生在他们身上.迫使生产者对Runnable或Callable进行排队会破坏抽象,恕我直言.
但我确实希望共享工作队列代表当前的处理状态.如果消费者没有跟上,我希望能够阻止生产者.
我喜欢使用Executors,但我觉得我正在与他们的设计作斗争.我可以部分喝Kool-ade,还是我必须吞下它?我是否正在抵制排队任务?(我怀疑我可以设置ThreadPoolExecutor来使用1任务队列并覆盖它的执行方法来阻止而不是拒绝队列满,但这感觉很糟糕.)
建议?
java concurrency producer-consumer executorservice blockingqueue
我正在开发一个需要并发的Java守护进程:一个无限循环,它监听作业队列(redis)并将每个作业分配给一个worker.工作人员不一定要返回值.
我发现Executors非常有用,我使用ThreadPoolExecutor来维护许多工作线程.
但是,这些工作者运行的第三方代码需要尽可能隔离,避免共享静态属性.
我的问题:是否有任何Java库/框架提供类似于Executors的功能,例如:
..但产生进程而不是线程?
有没有办法使用ExecutorService来暂停/恢复特定的线程?
private static ExecutorService threadpool = Executors.newFixedThreadPool(5);
想象一下,我想要停止作为id = 0的线程(假设为每个线程分配一个增量id,直到达到线程池的大小).
过了一会儿,按一个按钮让我们说,我想恢复那个特定的线程并让所有其他线程保持其当前状态,可以暂停或恢复.
我在Java文档中找到了一个未完成的PausableThreadPoolExecutor版本.但它不适合我需要的东西,因为它恢复了池中的所有线程.
如果没有办法使用ExecutorService的默认实现,那么任何人都可以指出这个问题的Java实现吗?
谢谢!
我有一个命令行应用程序,它使用由以下ExecutorService创建的java组成的Spring管理的bean :
ExecutorService service = Executors.newFixedThreadPool(4);
Run Code Online (Sandbox Code Playgroud)
现在,我希望我的服务在我的应用程序关闭时关闭,所以我让我的bean实现了DisposableBean接口并且有一个destroy方法,例如:
public void destroy(){
service.shutdown();
}
Run Code Online (Sandbox Code Playgroud)
然后我可能想做一些事情,比如在Spring上下文中注册一个关闭钩子.然而,我发现(艰难的方式,即在预生产版本中)这不起作用:在调用ExecutorService.shutdown()方法之前不会调用shutdown钩子,导致经典的catch 22问题(它会被调用中断,即,如果我在应用程序运行时按Ctrl-C).这逃过了我的单元测试,因为出于某种原因,它似乎在JUnit中工作正常,这仍然令我感到困惑:JUnit的做法有何不同?
到目前为止我找到的解决方案是ApplicationContext.close()在退出main函数之前显式调用.我想知道是否有更好的解决方案,以及有什么是Spring管理的灵活线程池的最佳实践.如果我的bean 不是由Spring直接管理但是由Spring管理的bean创建的呢?我应该将调用级联到destroy()吗?这不是很容易出错吗?
我感谢任何评论,建议,进一步阅读,RTFM,魔术食谱.
谢谢!
我在这里查看文档:http://doc.akka.io/docs/akka/2.3.3/java/dispatchers.html
我们以这样的方式使用Akka,我们为不同的actor提供了两个独立的调度程序(默认的fork-join执行程序).我们现在遇到一些性能问题,我们正在研究如何调整调度程序配置参数并查看它们如何影响应用程序的性能.
我查看了文档,但并不真正了解配置参数.例如,仅针对简单的默认值,fork-join-executor调度程序:
这些是什么以及如何配置它们以了解它们如何影响应用程序性能?
# Min number of threads to cap factor-based parallelism number to
parallelism-min = 2
# Parallelism (threads) ... ceil(available processors * factor)
parallelism-factor = 2.0
# Max number of threads to cap factor-based parallelism number to
parallelism-max = 10
Run Code Online (Sandbox Code Playgroud)
谢谢!
我需要创建一个库,在其中我将有同步和异步方法.
executeSynchronous() - 等到我有结果,返回结果.executeAsynchronous() - 立即返回Future,如果需要,可在其他事情完成后处理.我的图书馆的核心逻辑
客户将使用我们的库,他们将通过传递DataKey构建器对象来调用它.然后我们将使用该DataKey对象构造一个URL,并通过执行它来对该URL进行HTTP客户端调用,然后在我们将响应作为JSON字符串返回之后,我们将通过创建DataResponse对象将该JSON字符串发送回我们的客户.有些客户会打电话executeSynchronous(),有些人可能会打电话executeAsynchronous(),这就是为什么我需要在我的库中单独提供两种方法.
接口:
public interface Client {
// for synchronous
public DataResponse executeSynchronous(DataKey key);
// for asynchronous
public Future<DataResponse> executeAsynchronous(DataKey key);
}
Run Code Online (Sandbox Code Playgroud)
然后我有我DataClient实现上面的Client接口:
public class DataClient implements Client {
private RestTemplate restTemplate = new RestTemplate();
// do I need to have all threads as non-daemon or I can have daemon thread for my use case?
private ExecutorService executor = …Run Code Online (Sandbox Code Playgroud) 问题我已经多年了:在这个伪代码中,
ExecutorService svc = Executors.newFixedThreadPool(3);
svc.submit(new Runnable() { /* code A */ });
svc.shutdown();
if(svc.awaitTermination(...)) {
// code B
Run Code Online (Sandbox Code Playgroud)
.awaitTermination()没有记录在代码A和B之间发生的事情. 有没有理由不是?
该ExecutorService的和concurrent包的javadoc定义的之前发生完成的任务和工作之间之前,他们已提交,但不执行的任务和代码之间的成功后.awaitTermination()调用.
注意,我不是要求设计批评如何重构我的代码以利用记录的事先关系.我的问题是,在这种情况下,有没有理由说文档没有提及?
(请注意,尽管标题非常贴切,但这并不是22665198的重复.)
java concurrency executorservice memory-barriers happens-before
我正在尝试测试一个在单独的线程中工作的方法,简化它是这样的:
public void methodToTest()
{
Thread thread = new Thread()
{
@Override
public void run() {
Clazz.i = 2;
}
};
thread.start();
}
Run Code Online (Sandbox Code Playgroud)
在我的单元测试中,我想测试Clazz.i == 2,但我不能这样做,因为我认为断言是在线程更改值之前运行的.我想使用另一个线程来测试它,然后使用join等待,但它仍然无法正常工作.
SSCCE:
@Test
public void sscce() throws InterruptedException
{
Thread thread = new Thread()
{
@Override
public void run() {
methodToTest()
}
};
thread.start();
thread.join();
AssertEquals(2, Clazz.i);
}
public static class Clazz
{
public static int i = 0;
}
Run Code Online (Sandbox Code Playgroud)
我认为这是因为测试主代码创建了一个等待(加入)第二个线程的线程,但是第二个线程没有完成工作,它创建另一个线程来完成工作然后完成,这继续第一个线程,而第三个线程Clazz.i = 2在断言后执行.
我怎样才能使第一个线程等待它启动的线程以及该线程启动的任何线程?
我正在尝试使用以下属性构建一个实现ExecutorService,让我们调用它SequentialPooledExecutor.
SequentialPooledExecutor共享相同线程池的所有实例
对相同实例的调用SequentialPooledExecutor按顺序执行.
换句话说,实例在开始处理其队列中的下一个任务之前等待当前正在执行的任务的终止.
我目前正在实施SequentialPooledExecutor自己,但我想知道我是否正在重新发明轮子.我研究了不同的实现ExecutorService,例如那些由Executors类提供的实现,但我找不到符合我要求的实现.
你知道我现有的实现是否缺失,还是我应该继续自己实现界面?
编辑:
我认为我的要求不是很清楚,让我们看看我是否可以用其他的话来解释它.
假设我有一系列会话,比如1000个(我以前称之为执行程序实例的东西).我可以向会话提交任务,我希望保证提交给同一会话的所有任务都按顺序执行.但是,属于不同会话的任务应该彼此没有依赖关系.
我想定义一个ExecutorService执行这些任务但是使用有限数量的线程,比方说200,但确保在同一个会话中的前一个任务完成之前没有启动任务.
我不知道是否存在已经存在的任何事情,或者我是否应该实施这样的ExecutorService事情.
java multithreading executorservice threadpool threadpoolexecutor
是否有一个ExecutorService实现行为类似于具有以下特征的线程池?
相当标准的线程池行为.你认为ThreadPoolExecutor会处理这个,但是
executorService = new ThreadPoolExecutor(
2, 10, // min/max threads
60, TimeUnit.SECONDS, // time of inactivity before scaling back
new SynchronousQueue<Runnable>()); // task queue
Run Code Online (Sandbox Code Playgroud)
如果提交的任务超过10个,则会抛出异常.切换到a LinkedBlockingQueue将永远不会扩展到两个最小线程之外,除非你限制大小new LinkedBlockingQueue<Runnable>(20),在这种情况下,将有两个线程处理1-20个任务,2-10个线程处理21-30个任务,并且异常超过30个任务.不漂亮.同时,固定的线程池永远不会缩小非活动线程.
那么,为了得到我所追求的东西,我可以使用其他类型BlockingQueue或小提琴我错过的其他设置吗?是否有另一个ExceutorService更适合的实现(哪个?),或者我最好通过重写execute()方法来自己滚动ThreadPoolExecutor?
executorservice ×10
java ×10
concurrency ×4
threadpool ×3
junit ×2
akka ×1
callable ×1
daemon ×1
forkjoinpool ×1
spring ×1