标签: executorservice

具有可变延迟的ScheduledExecutorService

假设我有一个从java.util.concurrent.BlockingQueue中提取元素并处理它们的任务.

public void scheduleTask(int delay, TimeUnit timeUnit)
{
    scheduledExecutorService.scheduleWithFixedDelay(new Task(queue), 0, delay, timeUnit);
}
Run Code Online (Sandbox Code Playgroud)

如果可以动态更改频率,如何安排/重新安排任务?

  • 我们的想法是获取数据更新流并将它们批量传播到GUI
  • 用户应该能够改变更新的频率

java concurrency executorservice blockingqueue

20
推荐指数
3
解决办法
1万
查看次数

从Callable.call()抛出异常的位置

可能重复:
处理Java ExecutorService任务中的异常

我使用ExecutorServicefrom Java来协调Threads.用于启动我使用的线程

pool = new ExecutorService(2);
callableResults = pool.invokeAll(threads);
Run Code Online (Sandbox Code Playgroud)

为了收集结果,我使用future.get()每个线程."threads"是实现Callable和覆盖的类中的对象列表call().

现在我遇到了以下问题.该方法call()确实抛出了各种特定的异常.invokeAll()并且future.get()只投掷InterruptedException.

我在哪里可以捕获我投入的具体例外情况call()?或者我必须在那里处理它们?如果抛出其中一个异常,那么结果是InterruptedException

java multithreading exception callable executorservice

20
推荐指数
1
解决办法
3万
查看次数

超出范围时,ExecutorService是否会收集垃圾?

我问这个问题是因为我正在创建大量的执行程序服务,虽然我可能已经在需要调查的地方发生了内存泄漏,但我认为最近对以下代码的更改实际上使其恶化,因此我试图确认到底是怎么回事:

@FunctionalInterface
public interface BaseConsumer extends Consumer<Path> {
    @Override
    default void accept(final Path path) {
        String name = path.getFileName().toString();
        ExecutorService service = Executors.newSingleThreadExecutor(runnable -> {
            Thread thread = new Thread(runnable, "documentId=" + name);
            thread.setDaemon(true);
            return thread;
        });
        Future<?> future = service.submit(() -> {
            baseAccept(path);
            return null;
        });
        try {
            future.get();
        } catch (InterruptedException ex) {
            Thread.currentThread().interrupt();
        } catch (ExecutionException ex) {
            throw new RuntimeException(ex);
        }
    }

    void baseAccept(final Path path) throws Exception;
}
Run Code Online (Sandbox Code Playgroud)

然后Consumer<Path>在另一个具有(通常)N = 2个线程的线程池上调用它,我不确定这是否相关.

问题是:一旦完成,是否ExecutorService …

java multithreading garbage-collection executorservice java-8

19
推荐指数
2
解决办法
6438
查看次数

ExecutorService令人惊讶的性能收支平衡点---经验法则?

我正在试图弄清楚如何正确使用Java的Executors.我意识到将任务提交给ExecutorService有自己的开销.但是,我很惊讶它看到它的高度.

我的程序需要以尽可能低的延迟处理大量数据(股票市场数据).大多数计算都是相当简单的算术运算.

我试着测试一些非常简单的东西:" Math.random() * Math.random()"

最简单的测试在一个简单的循环中运行这个计算.第二个测试在匿名Runnable中进行相同的计算(这应该衡量创建新对象的成本).第三测试传递Runnable到一个ExecutorService(在此测定引入执行人的成本).

我在我的小型笔记本电脑上运行测试(2 cpus,1.5 gig ram):

(in milliseconds)
simpleCompuation:47
computationWithObjCreation:62
computationWithObjCreationAndExecutors:422
Run Code Online (Sandbox Code Playgroud)

(大约四次运行中,前两个数字最终相等)

请注意,执行程序所花费的时间远远多于在单个线程上执行的时间.对于1到8之间的线程池大小,数字大致相同.

问题:我是否遗漏了一些明显的或者预期的结果?这些结果告诉我,我传递给执行程序的任何任务都必须进行一些非平凡的计算.如果我正在处理数百万条消息,并且我需要对每条消息执行非常简单(且便宜)的转换,我仍然可能无法使用执行程序...尝试在多个CPU之间传播计算可能最终会比仅仅更昂贵在一个线程中完成它们.设计决策变得比我原先想象的要复杂得多.有什么想法吗?


import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class ExecServicePerformance {

 private static int count = 100000;

 public static void main(String[] args) throws InterruptedException {

  //warmup
  simpleCompuation();
  computationWithObjCreation();
  computationWithObjCreationAndExecutors();

  long start = System.currentTimeMillis();
  simpleCompuation();
  long stop = System.currentTimeMillis();
  System.out.println("simpleCompuation:"+(stop-start));

  start = System.currentTimeMillis();
  computationWithObjCreation();
  stop = System.currentTimeMillis();
  System.out.println("computationWithObjCreation:"+(stop-start));

  start = System.currentTimeMillis();
  computationWithObjCreationAndExecutors();
  stop = System.currentTimeMillis();
  System.out.println("computationWithObjCreationAndExecutors:"+(stop-start)); …
Run Code Online (Sandbox Code Playgroud)

java performance executorservice

18
推荐指数
3
解决办法
2万
查看次数

是否可以向ThreadPoolExecutor的BlockingQueue添加任务?

ThreadPoolExecutor的JavaDoc 不清楚是否可以将任务直接添加到BlockingQueue执行程序的后台.文档称调用executor.getQueue()"主要用于调试和监视".

我正ThreadPoolExecutor用自己的方式构建一个BlockingQueue.我保留对队列的引用,以便我可以直接向其添加任务.返回相同的队列,getQueue()因此我假设admonition getQueue()适用于通过我的方式获取的对后备队列的引用.

代码的一般模式是:

int n = ...; // number of threads
queue = new ArrayBlockingQueue<Runnable>(queueSize);
executor = new ThreadPoolExecutor(n, n, 1, TimeUnit.HOURS, queue);
executor.prestartAllCoreThreads();
// ...
while (...) {
    Runnable job = ...;
    queue.offer(job, 1, TimeUnit.HOURS);
}
while (jobsOutstanding.get() != 0) {
    try {
        Thread.sleep(...);
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
}
executor.shutdownNow();
Run Code Online (Sandbox Code Playgroud)

queue.offer() VS executor.execute()

据我了解,典型的用途是通过添加任务executor.execute().上面示例中的方法具有阻塞队列的优点,但execute()如果队列已满并立即失败并拒绝我的任务.我也喜欢提交作业与阻塞队列交互; 对我来说,这感觉更"纯粹"的生产者 …

java concurrency producer-consumer executorservice blockingqueue

18
推荐指数
2
解决办法
9474
查看次数

如何决定是使用newCachedThreadPool还是newFixedThreadPool?

我正在开发一个项目,我需要确保每个线程都在特定范围内工作.例如:

NO_OF_THREADS: 2
NO_OF_TASKS: 10
Run Code Online (Sandbox Code Playgroud)

如果number of threads is 2number of tasks is 10则每个线程将被执行10 tasks.这意味着2个线程将会这样做20 tasks.

在实际场景中,这些数字(任务数和线程数)将非常高,因为它们都可以在我的代码中配置.

在上面的例子中,first thread应该使用id之间1 and 10和之间second thread应该使用id 11 and 20等等,如果有更多的线程.之后,每个线程将建立数据库连接,然后插入数据库.

所以我的下面的代码工作正常.

public static void main(String[] args) {

    final int noOfThreads = 2;
    final int noOfTasks = 10;

    //create thread pool with given size 
    ExecutorService service = Executors.newFixedThreadPool(noOfThreads);

    // queue some tasks 
    for (int i = 0, int nextId = 1; …
Run Code Online (Sandbox Code Playgroud)

java multithreading executorservice

18
推荐指数
1
解决办法
1万
查看次数

在ThreadPoolExecutor中使用InheritableThreadLocal - 或者 - 不重用线程的ThreadPoolExecutor

我想用两个InheritableThreadLocal和一个ThreadPoolExecutor.

这会因为ThreadPoolExecutor重用每个池的线程而崩溃(毕竟它是一个池),这意味着InheritableThreadLocal它不能按预期工作.现在这个问题对我来说显而易见,但追踪却特别狡猾.

我使用InheritableThreadLocal这样几个顶级进程中的每一个都有自己的数据库连接以及它产生的任何子进程.我不只是使用一个共享连接池,因为在提交到数据库和/或准备大量反复使用的PreparedStatements之前,每个顶级进程都会对其连接进行大量的多步骤工作.

我使用ThreadPoolExecutor这些顶级进程之间的共享,因为某些行为需要进行门控.尽管我可能有4个顶级进程在运行,但我一次只能有一个进程写入数据库(或者系统需要访问其他一些共享资源).因此,我将让顶级进程创建Runnable并将其发送到共享ThreadPoolExecutor,以确保在整个系统中同时运行不超过一个(或两个或三个).

问题在于,因为ThreadPoolExecutor重用了池中的线程,所以它将获取在该池中 InheritableThreadLocal运行的原始值,而不是将Runnable发送到的顶级进程中的值ThreadPoolExecutor.

  • 有没有办法强制工作池ThreadPoolExecutor使用InheritableThreadLocal创建Runnable的进程上下文中的值而不是在重用线程池的上下文中?

  • 或者,是否有任何实现在ThreadPoolExecutor每次启动新的Runnable时创建一个新线程?出于我的目的,我只关心将同时运行的线程的数量设置为固定大小.

  • 有没有其他解决方案或建议人们让我完成我上面描述的内容?

(虽然我意识到我可以通过将类数据库连接从类传递到子线程到子线程来解决问题,就像某种社区自行车一样,我想避免这种情况.)

还有一个关于StackOverflow,InheritableThreadLocal和线程池的问题也解决了这个问题.但是,该问题的解决方案似乎是它对于InheritableThreadLocal来说是一个糟糕的用例,我不认为这适用于我的情况.

谢谢你的任何想法.

java multithreading executorservice

17
推荐指数
2
解决办法
6572
查看次数

Java线程池/执行器服务和wait() - 线程和任务队列会发生什么?

我环顾四周但没有找到答案,所以我想确认一下.

假设我有一个固定大小的线程池 - ExecutorService pool = Executors.newFixedThreadPool(5);

我有一些代码:

pool.execute(new Runnable(){
    try{
        Object waitForMe = doSomethingAndGetObjectToWaitFor();
        waitForMe.wait();
        doSomethingElse();
    }catch(Exception e){ throw new RunTimeException(e) } 

});
Run Code Online (Sandbox Code Playgroud)

让我们假设上面的代码被称为几百次.池中只有5个线程(因此上述语句中只有5个应该在一个点上存在).还假设wait()在一个对象上对一个thrid方做了一些I/O调用,并在操作完成时等待回调,所以它自然需要一段时间才能完成.

现在我的问题是当其中一个任务到达a时的行为是什么wait(),任务是否进入休眠状态,然后线程池中的线程从队列中取出另一个任务并开始运行它?

如果正在等待的任务进入睡眠状态,当它获得notify()并唤醒时会发生什么?线程是否返回到线程池的队列(前面或后面)并等待,直到5个线程中的一个可以继续执行它(即调用doSomethingelse())?或者正在执行它的线程是否也进入休眠状态,即5个执行程序线程中的一个正在等待任务(这是我假设的)?或者执行程序线程是否接受另一个任务,并在第一个任务从wait()返回时被中断?

java concurrency multithreading executorservice threadpool

17
推荐指数
1
解决办法
1万
查看次数

了解Java FixedThreadPool

我试图了解Java FixedThreadPool在实践中如何工作,但文档没有回答我的问题.

假设一个简单的场景,如:

ExecutorService ES= Executors.newFixedThreadPool(3);
List<Future> FL;
for(int i=1;i<=200;i++){
   FL.add(ES.submit(new Task()));
}
ES.shutdown();
Run Code Online (Sandbox Code Playgroud)

这里TaskCallable它构建一些资源,使用它们,然后返回一些输出.

我的问题:Task完成for循环后有多少内存?换句话说:Task一次只有3 个构建资源,或者所有资源都是预先创建的,这样,在.submit我有200个Task(及其资源)等待执行之后?

注意:资源构造发生在构造函数中Task,而不是在call()方法中.

在javadoc中(随意跳过以下内容):让我感到困惑的是Java文档中的以下解释

创建一个线程池,该线程池重用在共享的无界队列中运行的固定数量的线程.在任何时候,最多nThreads线程将是活动的处理任务.

我想这意味着,在我的示例中,所有200个任务都在队列中,但其中只有3个随时执行.

任何帮助都非常感谢.

java multithreading callable executorservice

17
推荐指数
3
解决办法
2994
查看次数

从4.2.0.RC3升级到4.2.0.RELEASE时出现Spring异步问题

我有一个使用spring(4.2.x)工件的web应用程序spring-webmvc,spring-messaging,spring-websocket

我在spring config java类中使用了以下@ Enable*annotations

@EnableWebMvc
@EnableWebSocketMessageBroker
@EnableAsync
@EnableMBeanExport
Run Code Online (Sandbox Code Playgroud)

WebSocket用于向浏览器客户端广播消息.并且很少有使用@Async注释的异步方法

春季版本4.2.0.RC3的应用程序运行良好.但当我将其更改为GA版本4.2.0.RELEASE时,我在启动时得到以下异常.如果我删除@EnableAsync它工作正常,但我需要异步功能.

org.springframework.beans.factory.NoUniqueBeanDefinitionException: No qualifying bean of type [org.springframework.core.task.TaskExecutor] is defined: expected single matching bean but found 4: clientOutboundChannelExecutor,messageBrokerTaskScheduler,clientInboundChannelExecutor,brokerChannelExecutor
    org.springframework.beans.factory.support.DefaultListableBeanFactory.getBean(DefaultListableBeanFactory.java:366)
    org.springframework.beans.factory.support.DefaultListableBeanFactory.getBean(DefaultListableBeanFactory.java:332)
    org.springframework.scheduling.annotation.AsyncAnnotationBeanPostProcessor.setBeanFactory(AsyncAnnotationBeanPostProcessor.java:128)
    org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeAwareMethods(AbstractAutowireCapableBeanFactory.java:1597)
    org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1565)
    org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:545)
    org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:482)
    org.springframework.beans.factory.support.AbstractBeanFactory$1.getObject(AbstractBeanFactory.java:305)
    org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:230)
    org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:301)
    org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:201)
    org.springframework.context.support.PostProcessorRegistrationDelegate.registerBeanPostProcessors(PostProcessorRegistrationDelegate.java:228)
    org.springframework.context.support.AbstractApplicationContext.registerBeanPostProcessors(AbstractApplicationContext.java:682)
    org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:522)
    org.springframework.web.servlet.FrameworkServlet.configureAndRefreshWebApplicationContext(FrameworkServlet.java:667)
    org.springframework.web.servlet.FrameworkServlet.initWebApplicationContext(FrameworkServlet.java:539)
    org.springframework.web.servlet.FrameworkServlet.initServletBean(FrameworkServlet.java:493)
    org.springframework.web.servlet.HttpServletBean.init(HttpServletBean.java:136)
Run Code Online (Sandbox Code Playgroud)

java spring spring-mvc executorservice spring-websocket

17
推荐指数
2
解决办法
1万
查看次数