如何在可完成的 future 流中展平列表?

Ale*_*x R 10 java parallel-processing concurrency java-stream completable-future

我有这个:

Stream<CompletableFuture<List<Item>>>
Run Code Online (Sandbox Code Playgroud)

我怎样才能将它转换为

Stream<CompletableFuture<Item>>
Run Code Online (Sandbox Code Playgroud)

其中:第二个流由第一个流中每个列表内的每个项目组成。

我研究了一下thenCompose,但这解决了一个完全不同的问题,也称为“扁平化”。

如何以流方式高效地完成此操作,而不阻塞或过早消耗不必要的流项目?

这是迄今为止我最好的尝试:

    ExecutorService pool = Executors.newFixedThreadPool(PARALLELISM);
    Stream<CompletableFuture<List<IncomingItem>>> reload = ... ;

    @SuppressWarnings("unchecked")
    CompletableFuture<List<IncomingItem>> allFutures[] = reload.toArray(CompletableFuture[]::new);
    CompletionService<List<IncomingItem>> queue = new ExecutorCompletionService<>(pool);
    for(CompletableFuture<List<IncomingItem>> item: allFutures) {
        queue.submit(item::get);
    }
    List<IncomingItem> THE_END = new ArrayList<IncomingItem>();
    CompletableFuture<List<IncomingItem>> ender = CompletableFuture.allOf(allFutures).thenApply(whatever -> {
        queue.submit(() -> THE_END);
        return THE_END;
    });
    queue.submit(() -> ender.get());
    Iterable<List<IncomingItem>> iter = () -> new Iterator<List<IncomingItem>>() {
        boolean checkNext = true;
        List<IncomingItem> next = null;
        @Override
        public boolean hasNext() {
            if(checkNext) {
                try {
                    next = queue.take().get();
                } catch (InterruptedException | ExecutionException e) {
                    throw new RuntimeException(e);
                }
                checkNext = false;
            }
            if(next == THE_END || next == null) {
                return false;
            }
            else {
                return true;
            }
        }
        @Override
        public List<IncomingItem> next() {
            if(checkNext) {
                hasNext();
            }
            if(!hasNext()) {
                throw new IllegalStateException();
            }
            checkNext = true;
            return next;
        }
    };
    Stream<IncomingItem> flat = StreamSupport.stream(iter.spliterator(), false).flatMap(List::stream);
Run Code Online (Sandbox Code Playgroud)

这首先有效,不幸的是,它有一个致命的错误:生成的流似乎在检索所有项目之前过早终止。

2023 年更新

四年过去了,我仍然没有选择答案,因为此刻的两个答案都说这是“不可能的”。这听起来不对。我只是要求一种有效的方法来获取批量完成的项目,并能够单独监控它们的完成情况。也许使用 Java 21 中的虚拟线程会更容易?我不再维护这个代码库,但这个问题仍然是一个未解决的大问题。

更新#2:假设的澄清

CompletableFuture 根据https://www.geeksforgeeks.org/completablefuture-in-java/表示异步计算的未来结果...有趣的是,JavaDocs不太清楚。

更新#3:

性能不是问题的重点,只有效率,具体意义上来说:如果任务 1 需要 T1 完成,任务 2 需要 T2 完成,那么等待总时间完成max(T1,T2)比等待时间更有效率T1+T2。然后将其扩展到N一定大小的任务列表T, U, V, ...,它可能看起来像max(T1..Tn)+max(U1..Un)+max(V1..Vn)或希望如此max(T1..Vn)。假设这里所有列表的 N 相同只是一种解释性简化(实际上应该是I, J, K...)。换句话说,请假设列表流中表示的异步任务是 I/O 限制的,而不是 CPU 限制的。Thread.sleep()如果您想在代码中演示某些内容,可以通过插入随机数来模拟。对于草率的符号表示歉意 - 我有计算机科学背景,但自从我尝试正式描述这样的问题以来已经有一段时间了。

更新#4

根据@Slaw的回答,我可以看到我面临的核心问题是流到达顺序(通过提供期货的顺序Stream)和未来完成顺序CompletableFuture(每个完成执行和解锁的顺序)之间的不匹配其中List)。因此,针对这个问题的新修订的 TL;DR是:如何获取Stream以任意顺序生成的 a,并将其重新排序为 future 执行完成顺序,以便操作flatMap()不会不必要地阻塞?

Hol*_*ger 4

这是一个解决方案,它不仅通过优先处理已经完成的 future 来避免阻塞,甚至还尽可能保留对原始流的延迟处理。只要已经遇到的 future 中有已完成的 future,它就不会在源遍历中前进。

\n

这可以通过使用无限源的示例得到最好的证明,该源仍然可以在有限的时间内完成(并且由于对已完成的 future 的偏好而相当快)。

\n
Stream<CompletableFuture<List<Integer>>> streamOfFutures = Stream.generate(\n    () -> CompletableFuture.supplyAsync(\n        () -> ThreadLocalRandom.current().ints(10, 0, 200).boxed().toList(),\n        CompletableFuture.delayedExecutor(\n            ThreadLocalRandom.current().nextLong(5), TimeUnit.SECONDS))\n);\nSystem.out.println(flattenResults(streamOfFutures)\n    .peek(System.out::println)\n    .anyMatch(i -> i == 123)\n);\n
Run Code Online (Sandbox Code Playgroud)\n

该实现将在遇到时立即处理已经完成的 future。仅当未来尚未完成时,排队操作才会被链接,并且挂起的计数器会增加。即使在异常完成的情况下也必须注意减少计数器并将项目(空列表)排队,以解除消费者线程的阻塞,以防它\xe2\x80\x99s在此时获取元素。遇到异常时,异常将传播给调用者。与短路并行流一样,如果在处理所有元素之前找到结果,则可能会错过错误。

\n

如果终端操作是短路的并且在没有处理整个流的情况下完成,则计数器是无关的,并且操作将不会等待待处理的 future 的完成。仅当源流已完全遍历时,计数器才与检测所有 future 何时完成相关。

\n
static <T> Stream<T> flattenResults(Stream<CompletableFuture<List<T>>> stream) {\n    Spliterator<CompletableFuture<List<T>>> srcSp = stream.spliterator();\n    BlockingQueue<List<T>> queue = new LinkedBlockingQueue<>();\n\n    return StreamSupport.stream(new Spliterators.AbstractSpliterator<T>(\n                                                 srcSp.estimateSize(), 0) {\n        final AtomicLong pending = new AtomicLong();\n        Spliterator<T> fromCurrentList;\n        Throwable failure;\n\n        @Override\n        public boolean tryAdvance(Consumer<? super T> action) {\n            if(checkExisting(action)) return true;\n\n            while(srcSp.tryAdvance(this::checkNew)) {\n                if(checkExisting(action)) return true;\n            }\n\n            return checkAfterSourceExhausted(action);\n        }\n        private boolean checkExisting(Consumer<? super T> action) {\n            for(;;) {\n                var sp = fromCurrentList;\n                if(sp == null) {\n                    List<T> newList = queue.poll();\n                    if(newList == null) return false;\n                    fromCurrentList = sp = newList.spliterator();\n                }\n                if(sp.tryAdvance(action)) return true;\n                fromCurrentList = null;\n            }\n        }\n\n        private void checkNew(CompletableFuture<List<T>> f) {\n            if(f.isDone()) fromCurrentList = f.join().spliterator();\n            else {\n                pending.incrementAndGet();\n                f.whenComplete((r, t) -> {\n                    if(t != null) {\n                        failure = t;\n                        r = List.of();\n                    }\n                    queue.offer(r);\n                    pending.decrementAndGet();\n                });\n            }\n        }\n\n        private boolean checkAfterSourceExhausted(Consumer<? super T> action) {\n            while(pending.get() != 0 || !queue.isEmpty()) {\n                checkFailure();\n                try {\n                    List<T> newList = queue.take();\n                    fromCurrentList = newList.spliterator();\n                    if(checkExisting(action)) return true;\n                } catch(InterruptedException ex) {\n                    throw new CompletionException(ex);\n                }\n            }\n            return false;\n        }\n\n        private void checkFailure() {\n            Throwable t = failure;\n            if(t != null) {\n                if(t instanceof RuntimeException rt) throw rt;\n                if(t instanceof Error e) throw e;\n                throw new CompletionException(t);\n            }\n        }\n    }, false);\n}\n
Run Code Online (Sandbox Code Playgroud)\n
\n

你可以使用类似的东西

\n
Stream<CompletableFuture<List<Integer>>> streamOfFutures = IntStream.range(0, 10)\n    .mapToObj(i -> \n      CompletableFuture.supplyAsync(\n        () -> IntStream.range(i * 10, (i + 1) * 10).boxed().toList(),\n        CompletableFuture.delayedExecutor(10 - i, TimeUnit.SECONDS)));\n\nSystem.out.println(flattenResults(streamOfFutures)\n    .peek(System.out::println)\n    .anyMatch(i -> i == 34)\n);\n
Run Code Online (Sandbox Code Playgroud)\n

可视化\xe2\x80\x9c先完成,先处理\xe2\x80\x9d。或者将终端操作改为

\n
flattenResults(streamOfFutures).forEach(System.out::println);\n
Run Code Online (Sandbox Code Playgroud)\n

证明所有期货的完成均得到正确认可或

\n
if(flattenResults(streamOfFutures).count() != 100)\n    throw new AssertionError();\nelse\n    System.out.println("Success");\n
Run Code Online (Sandbox Code Playgroud)\n

拥有可以集成到自动化测试中的东西。

\n

  • 这是可能的,但不是简化。由于这是扁平化列表,因此返回“Stream&lt;CompletableFuture&lt;T&gt;&gt;”意味着将每个“T”包装在一个新的未来中。只有失败的 future 才会在源 future 和结果 future 之间存在一对一的关系。反之,请注意,代码在减少之前关心队列项目,并在检测到零后重新检查队列,这在两个操作的执行重叠时很重要。 (2认同)