Ale*_*x R 10 java parallel-processing concurrency java-stream completable-future
我有这个:
Stream<CompletableFuture<List<Item>>>
Run Code Online (Sandbox Code Playgroud)
我怎样才能将它转换为
Stream<CompletableFuture<Item>>
Run Code Online (Sandbox Code Playgroud)
其中:第二个流由第一个流中每个列表内的每个项目组成。
我研究了一下thenCompose
,但这解决了一个完全不同的问题,也称为“扁平化”。
如何以流方式高效地完成此操作,而不阻塞或过早消耗不必要的流项目?
这是迄今为止我最好的尝试:
ExecutorService pool = Executors.newFixedThreadPool(PARALLELISM);
Stream<CompletableFuture<List<IncomingItem>>> reload = ... ;
@SuppressWarnings("unchecked")
CompletableFuture<List<IncomingItem>> allFutures[] = reload.toArray(CompletableFuture[]::new);
CompletionService<List<IncomingItem>> queue = new ExecutorCompletionService<>(pool);
for(CompletableFuture<List<IncomingItem>> item: allFutures) {
queue.submit(item::get);
}
List<IncomingItem> THE_END = new ArrayList<IncomingItem>();
CompletableFuture<List<IncomingItem>> ender = CompletableFuture.allOf(allFutures).thenApply(whatever -> {
queue.submit(() -> THE_END);
return THE_END;
});
queue.submit(() -> ender.get());
Iterable<List<IncomingItem>> iter = () -> new Iterator<List<IncomingItem>>() {
boolean checkNext = true;
List<IncomingItem> next = null;
@Override
public boolean hasNext() {
if(checkNext) {
try {
next = queue.take().get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
checkNext = false;
}
if(next == THE_END || next == null) {
return false;
}
else {
return true;
}
}
@Override
public List<IncomingItem> next() {
if(checkNext) {
hasNext();
}
if(!hasNext()) {
throw new IllegalStateException();
}
checkNext = true;
return next;
}
};
Stream<IncomingItem> flat = StreamSupport.stream(iter.spliterator(), false).flatMap(List::stream);
Run Code Online (Sandbox Code Playgroud)
这首先有效,不幸的是,它有一个致命的错误:生成的流似乎在检索所有项目之前过早终止。
2023 年更新
四年过去了,我仍然没有选择答案,因为此刻的两个答案都说这是“不可能的”。这听起来不对。我只是要求一种有效的方法来获取批量完成的项目,并能够单独监控它们的完成情况。也许使用 Java 21 中的虚拟线程会更容易?我不再维护这个代码库,但这个问题仍然是一个未解决的大问题。
更新#2:假设的澄清
CompletableFuture
根据https://www.geeksforgeeks.org/completablefuture-in-java/表示异步计算的未来结果...有趣的是,JavaDocs不太清楚。
更新#3:
性能不是问题的重点,只有效率,具体意义上来说:如果任务 1 需要 T1 完成,任务 2 需要 T2 完成,那么等待总时间完成max(T1,T2)
比等待时间更有效率T1+T2
。然后将其扩展到N
一定大小的任务列表T, U, V, ...
,它可能看起来像max(T1..Tn)+max(U1..Un)+max(V1..Vn)
或希望如此max(T1..Vn)
。假设这里所有列表的 N 相同只是一种解释性简化(实际上应该是I
, J
, K
...)。换句话说,请假设列表流中表示的异步任务是 I/O 限制的,而不是 CPU 限制的。Thread.sleep()
如果您想在代码中演示某些内容,可以通过插入随机数来模拟。对于草率的符号表示歉意 - 我有计算机科学背景,但自从我尝试正式描述这样的问题以来已经有一段时间了。
更新#4
根据@Slaw的回答,我可以看到我面临的核心问题是流到达顺序(通过提供期货的顺序Stream
)和未来完成顺序CompletableFuture
(每个完成执行和解锁的顺序)之间的不匹配其中List
)。因此,针对这个问题的新修订的 TL;DR是:如何获取Stream
以任意顺序生成的 a,并将其重新排序为 future 执行完成顺序,以便操作flatMap()
不会不必要地阻塞?
这是一个解决方案,它不仅通过优先处理已经完成的 future 来避免阻塞,甚至还尽可能保留对原始流的延迟处理。只要已经遇到的 future 中有已完成的 future,它就不会在源遍历中前进。
\n这可以通过使用无限源的示例得到最好的证明,该源仍然可以在有限的时间内完成(并且由于对已完成的 future 的偏好而相当快)。
\nStream<CompletableFuture<List<Integer>>> streamOfFutures = Stream.generate(\n () -> CompletableFuture.supplyAsync(\n () -> ThreadLocalRandom.current().ints(10, 0, 200).boxed().toList(),\n CompletableFuture.delayedExecutor(\n ThreadLocalRandom.current().nextLong(5), TimeUnit.SECONDS))\n);\nSystem.out.println(flattenResults(streamOfFutures)\n .peek(System.out::println)\n .anyMatch(i -> i == 123)\n);\n
Run Code Online (Sandbox Code Playgroud)\n该实现将在遇到时立即处理已经完成的 future。仅当未来尚未完成时,排队操作才会被链接,并且挂起的计数器会增加。即使在异常完成的情况下也必须注意减少计数器并将项目(空列表)排队,以解除消费者线程的阻塞,以防它\xe2\x80\x99s在此时获取元素。遇到异常时,异常将传播给调用者。与短路并行流一样,如果在处理所有元素之前找到结果,则可能会错过错误。
\n如果终端操作是短路的并且在没有处理整个流的情况下完成,则计数器是无关的,并且操作将不会等待待处理的 future 的完成。仅当源流已完全遍历时,计数器才与检测所有 future 何时完成相关。
\nstatic <T> Stream<T> flattenResults(Stream<CompletableFuture<List<T>>> stream) {\n Spliterator<CompletableFuture<List<T>>> srcSp = stream.spliterator();\n BlockingQueue<List<T>> queue = new LinkedBlockingQueue<>();\n\n return StreamSupport.stream(new Spliterators.AbstractSpliterator<T>(\n srcSp.estimateSize(), 0) {\n final AtomicLong pending = new AtomicLong();\n Spliterator<T> fromCurrentList;\n Throwable failure;\n\n @Override\n public boolean tryAdvance(Consumer<? super T> action) {\n if(checkExisting(action)) return true;\n\n while(srcSp.tryAdvance(this::checkNew)) {\n if(checkExisting(action)) return true;\n }\n\n return checkAfterSourceExhausted(action);\n }\n private boolean checkExisting(Consumer<? super T> action) {\n for(;;) {\n var sp = fromCurrentList;\n if(sp == null) {\n List<T> newList = queue.poll();\n if(newList == null) return false;\n fromCurrentList = sp = newList.spliterator();\n }\n if(sp.tryAdvance(action)) return true;\n fromCurrentList = null;\n }\n }\n\n private void checkNew(CompletableFuture<List<T>> f) {\n if(f.isDone()) fromCurrentList = f.join().spliterator();\n else {\n pending.incrementAndGet();\n f.whenComplete((r, t) -> {\n if(t != null) {\n failure = t;\n r = List.of();\n }\n queue.offer(r);\n pending.decrementAndGet();\n });\n }\n }\n\n private boolean checkAfterSourceExhausted(Consumer<? super T> action) {\n while(pending.get() != 0 || !queue.isEmpty()) {\n checkFailure();\n try {\n List<T> newList = queue.take();\n fromCurrentList = newList.spliterator();\n if(checkExisting(action)) return true;\n } catch(InterruptedException ex) {\n throw new CompletionException(ex);\n }\n }\n return false;\n }\n\n private void checkFailure() {\n Throwable t = failure;\n if(t != null) {\n if(t instanceof RuntimeException rt) throw rt;\n if(t instanceof Error e) throw e;\n throw new CompletionException(t);\n }\n }\n }, false);\n}\n
Run Code Online (Sandbox Code Playgroud)\n你可以使用类似的东西
\nStream<CompletableFuture<List<Integer>>> streamOfFutures = IntStream.range(0, 10)\n .mapToObj(i -> \n CompletableFuture.supplyAsync(\n () -> IntStream.range(i * 10, (i + 1) * 10).boxed().toList(),\n CompletableFuture.delayedExecutor(10 - i, TimeUnit.SECONDS)));\n\nSystem.out.println(flattenResults(streamOfFutures)\n .peek(System.out::println)\n .anyMatch(i -> i == 34)\n);\n
Run Code Online (Sandbox Code Playgroud)\n可视化\xe2\x80\x9c先完成,先处理\xe2\x80\x9d。或者将终端操作改为
\nflattenResults(streamOfFutures).forEach(System.out::println);\n
Run Code Online (Sandbox Code Playgroud)\n证明所有期货的完成均得到正确认可或
\nif(flattenResults(streamOfFutures).count() != 100)\n throw new AssertionError();\nelse\n System.out.println("Success");\n
Run Code Online (Sandbox Code Playgroud)\n拥有可以集成到自动化测试中的东西。
\n