如何在流中将 1 个可完成的未来划分为多个可完成的未来?

mys*_*eim 6 java java-8 java-stream completable-future

例如我有这样的方法:

public CompletableFuture<Page> getPage(int i) {
    ...
}
public CompletableFuture<Document> getDocument(int i) {
    ...
}
public CompletableFuture<Void> parseLinks(Document doc) {
    ...
}
Run Code Online (Sandbox Code Playgroud)

而我的流程:

List<CompletableFuture> list = IntStream
    .range(0, 10)
    .mapToObj(i -> getPage(i))

    // I want method like this:
    .thenApplyAndSplit(CompletableFuture<Page> page -> {
        List<CompletableFuture<Document>> docs = page.getDocsId()
            .stream()
            .map(i -> getDocument(i))
            .collect(Collectors.toList());
        return docs;
    })
    .map(CompletableFuture<Document> future -> {
        return future.thenApply(Document doc -> parseLink(doc);
    })
    .collect(Collectors.toList());
Run Code Online (Sandbox Code Playgroud)

它应该类似于flatMap()for CompletableFuture,所以我想实现这个流程:

List<Integer> -> Stream<CompletableFuture<Page>>
              -> Stream<CompletableFuture<Document>>
              -> parse each
Run Code Online (Sandbox Code Playgroud)

更新

Stream<CompletableFuture<Page>> pagesCFS = IntStream
        .range(0, 10)
        .mapToObj(i -> getPage(i));

Stream<CompletableFuture<Document>> documentCFS = listCFS.flatMap(page -> {
    // How to return stream of Document when page finishes?
    // page.thenApply( ... )
})
Run Code Online (Sandbox Code Playgroud)

use*_*547 0

您真的必须使用 Streams 吗?你不能只是把一些依赖行为放在你的吗CompletableFutures?特别是自从你最后一次调用返回后CompletableFutures<Void>(当然,也可以使用Collection.forEach

List<CompletableFuture<Page>> completableFutures = IntStream
      .range(0, 10)
      .mapToObj(i -> getPage(i)).collect(Collectors.toList());

for (CompletableFuture<Page> page : completableFutures) {
    page.thenAccept(p -> {
        List<Integer> docsId = p.getDocsId();
        for (Integer integer : docsId) {
            getDocument(integer).thenAccept(d-> parseLinks(d));
        }
    });
}
Run Code Online (Sandbox Code Playgroud)

编辑:好吧,所以我又做了一次尝试,但我不确定这是否是一个好主意,因为我不是CompletableFuture.

使用以下方法(也许可以有更好的实现):

public static <T> CompletableFuture<Stream<T>> flatMapCF(Stream<CompletableFuture<T>> stream){
    return CompletableFuture.supplyAsync( ()->
        stream.map(CompletableFuture::join)
    );
}


Stream<CompletableFuture<Page>> pagesCFS = IntStream
        .range(0, 10)
        .mapToObj(i -> getPage(i));

CompletableFuture<Stream<Page>> pageCF = flatMapCF(pagesCFS);

CompletableFuture<Stream<Document>> docCF= 
   pageCF.thenCompose(a ->
        flatMapCF(a.flatMap(
                b -> b.getDocsId()
                        .stream()
                        .map(c -> getDocument(c))
        )));
Run Code Online (Sandbox Code Playgroud)

问题可能是,只有CompletableFuture当所有结果都可用时才返回