使用FutureTask进行并发

byl*_*nan 7 java concurrency multithreading futuretask

我有这样的服务:

class DemoService {
    Result process(Input in) {
        filter1(in);
        if (filter2(in)) return...
        filter3(in);
        filter4(in);
        filter5(in);
        return ...

    }
}
Run Code Online (Sandbox Code Playgroud)

现在我想要它更快,我发现一些过滤器可以同时启动,而一些过滤器必须等待其他过滤器完成.例如:

filter1--
         |---filter3--
filter2--             |---filter5
          ---filter4--
Run Code Online (Sandbox Code Playgroud)

意思是:

1.filter1和filter2可以同时启动,filter3和filter4也是如此

2.filter3和filter4必须等待filter2完成

还有一件事:

如果filter2返回true,则'process'方法立即返回并忽略以下过滤器.

现在我的解决方案是使用FutureTask:

            // do filter's work at FutureTask
        for (Filter filter : filters) {
            FutureTask<RiskResult> futureTask = new FutureTask<RiskResult>(new CallableFilter(filter, context));
            executorService.execute(futureTask);
        }

        //when all FutureTask are submitted, wait for result
        for(Filter filter : filters) {
            if (filter.isReturnNeeded()) {
                FutureTask<RiskResult> futureTask = context.getTask(filter.getId());
                riskResult = futureTask.get();
                if (canReturn(filter, riskResult)) {
                    returnOk = true;
                    return riskResult;
                }
            }
        }
Run Code Online (Sandbox Code Playgroud)

我的CallableFilter:

public class CallableFilter implements Callable<RiskResult> {

    private Filter filter;
    private Context context;

    @Override
    public RiskResult call() throws Exception {
        List<Filter> dependencies = filter.getDependentFilters();
        if (dependencies != null && dependencies.size() > 0) {

            //wait for its dependency filters to finish
            for (Filter d : dependencies) {
                FutureTask<RiskResult> futureTask = context.getTask(d.getId());
                futureTask.get();

            }
        }

        //do its own work
        return filter.execute(context);
    }
}
Run Code Online (Sandbox Code Playgroud)

我想知道:

1.在案例中使用FutureTask是个好主意吗?有更好的解决方案吗?

2.线程上下文切换的开销.

谢谢!

lba*_*scs 6

在Java 8中,您可以使用CompletableFuture将过滤器链接在一起.使用thenApply和thenCompose系列方法,以便向CompletableFuture添加新的异步过滤器 - 它们将在上一步完成后执行.thenCombine结合两个独立的CompletableFutures.使用allOf等待两个以上CompletableFuture对象的结果.

如果您不能使用Java 8,那么Guava ListenableFuture也可以这样做,请参阅Listenable Future Explained.使用Guava,您可以等待多个独立运行的过滤器以Futures.allAsList结束 - 这也会返回一个ListenableFuture.

使用这两种方法的想法是,在声明未来的操作,它们之间的相互依赖关系以及它们的线程之后,您将获得一个Future对象,它封装了您的最终结果.

编辑:可以通过使用complete()方法或使用Guava SettableFuture(实现ListenableFuture)显式完成CompletableFuture来实现早期返回