是否可以为Java 8 并行流指定自定义线程池?我找不到任何地方.
想象一下,我有一个服务器应用程序,我想使用并行流.但是应用程序很大且是多线程的,因此我想将它划分为区分.我不想在另一个模块的应用程序块任务的一个模块中执行缓慢的任务.
如果我不能为不同的模块使用不同的线程池,这意味着我无法在大多数现实情况下安全地使用并行流.
请尝试以下示例.在单独的线程中执行一些CPU密集型任务.这些任务利用并行流.第一个任务被破坏,因此每个步骤需要1秒(通过线程休眠模拟).问题是其他线程卡住并等待损坏的任务完成.这是一个人为的例子,但想象一下servlet应用程序和有人向共享fork连接池提交长时间运行的任务.
public class ParallelTest {
public static void main(String[] args) throws InterruptedException {
ExecutorService es = Executors.newCachedThreadPool();
es.execute(() -> runTask(1000)); //incorrect task
es.execute(() -> runTask(0));
es.execute(() -> runTask(0));
es.execute(() -> runTask(0));
es.execute(() -> runTask(0));
es.execute(() -> runTask(0));
es.shutdown();
es.awaitTermination(60, TimeUnit.SECONDS);
}
private static void runTask(int delay) {
range(1, 1_000_000).parallel().filter(ParallelTest::isPrime).peek(i -> Utils.sleep(delay)).max()
.ifPresent(max -> System.out.println(Thread.currentThread() + " " + max));
}
public static boolean isPrime(long n) {
return n > 1 && rangeClosed(2, (long) sqrt(n)).noneMatch(divisor …
Run Code Online (Sandbox Code Playgroud) 我想转换List<CompletableFuture<X>>
成CompletableFuture<List<T>>
.这非常有用,因为当您有许多异步任务并且需要获得所有异步任务的结果时.
如果其中任何一个失败,则最终的未来将失败.这就是我实施的方式:
public static <T> CompletableFuture<List<T>> sequence2(List<CompletableFuture<T>> com, ExecutorService exec) {
if(com.isEmpty()){
throw new IllegalArgumentException();
}
Stream<? extends CompletableFuture<T>> stream = com.stream();
CompletableFuture<List<T>> init = CompletableFuture.completedFuture(new ArrayList<T>());
return stream.reduce(init, (ls, fut) -> ls.thenComposeAsync(x -> fut.thenApplyAsync(y -> {
x.add(y);
return x;
},exec),exec), (a, b) -> a.thenCombineAsync(b,(ls1,ls2)-> {
ls1.addAll(ls2);
return ls1;
},exec));
}
Run Code Online (Sandbox Code Playgroud)
要运行它:
ExecutorService executorService = Executors.newCachedThreadPool();
Stream<CompletableFuture<Integer>> que = IntStream.range(0,100000).boxed().map(x -> CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep((long) (Math.random() * 10));
} catch (InterruptedException e) {
e.printStackTrace(); …
Run Code Online (Sandbox Code Playgroud) CompletableFuture::supplyAsync(() -> IO bound queries)
如何为CompletableFuture :: supplyAsync选择Executor以避免污染ForkJoinPool.commonPool()
.
有许多选项Executors
(newCachedThreadPool
,newWorkStealingPool
,newFixedThreadPool
等)
我在这里阅读了关于新ForkJoinPool的内容
如何为我的用例选择合适的?
java executorservice java-8 threadpoolexecutor completable-future