mys*_*eim 6 java java-8 java-stream completable-future
例如我有这样的方法:
public CompletableFuture<Page> getPage(int i) {
...
}
public CompletableFuture<Document> getDocument(int i) {
...
}
public CompletableFuture<Void> parseLinks(Document doc) {
...
}
Run Code Online (Sandbox Code Playgroud)
而我的流程:
List<CompletableFuture> list = IntStream
.range(0, 10)
.mapToObj(i -> getPage(i))
// I want method like this:
.thenApplyAndSplit(CompletableFuture<Page> page -> {
List<CompletableFuture<Document>> docs = page.getDocsId()
.stream()
.map(i -> getDocument(i))
.collect(Collectors.toList());
return docs;
})
.map(CompletableFuture<Document> future -> {
return future.thenApply(Document doc -> parseLink(doc);
})
.collect(Collectors.toList());
Run Code Online (Sandbox Code Playgroud)
它应该类似于flatMap()
for CompletableFuture
,所以我想实现这个流程:
List<Integer> -> Stream<CompletableFuture<Page>>
-> Stream<CompletableFuture<Document>>
-> parse each
Run Code Online (Sandbox Code Playgroud)
更新
Stream<CompletableFuture<Page>> pagesCFS = IntStream
.range(0, 10)
.mapToObj(i -> getPage(i));
Stream<CompletableFuture<Document>> documentCFS = listCFS.flatMap(page -> {
// How to return stream of Document when page finishes?
// page.thenApply( ... )
})
Run Code Online (Sandbox Code Playgroud)
您真的必须使用 Streams 吗?你不能只是把一些依赖行为放在你的吗CompletableFutures
?特别是自从你最后一次调用返回后CompletableFutures<Void>
(当然,也可以使用Collection.forEach
)
List<CompletableFuture<Page>> completableFutures = IntStream
.range(0, 10)
.mapToObj(i -> getPage(i)).collect(Collectors.toList());
for (CompletableFuture<Page> page : completableFutures) {
page.thenAccept(p -> {
List<Integer> docsId = p.getDocsId();
for (Integer integer : docsId) {
getDocument(integer).thenAccept(d-> parseLinks(d));
}
});
}
Run Code Online (Sandbox Code Playgroud)
编辑:好吧,所以我又做了一次尝试,但我不确定这是否是一个好主意,因为我不是CompletableFuture
.
使用以下方法(也许可以有更好的实现):
public static <T> CompletableFuture<Stream<T>> flatMapCF(Stream<CompletableFuture<T>> stream){
return CompletableFuture.supplyAsync( ()->
stream.map(CompletableFuture::join)
);
}
Stream<CompletableFuture<Page>> pagesCFS = IntStream
.range(0, 10)
.mapToObj(i -> getPage(i));
CompletableFuture<Stream<Page>> pageCF = flatMapCF(pagesCFS);
CompletableFuture<Stream<Document>> docCF=
pageCF.thenCompose(a ->
flatMapCF(a.flatMap(
b -> b.getDocsId()
.stream()
.map(c -> getDocument(c))
)));
Run Code Online (Sandbox Code Playgroud)
问题可能是,只有CompletableFuture
当所有结果都可用时才返回