如何在 Flux 中使用 zipWith 迭代元组?

mem*_*und 4 java spring reactor spring-webflux

我想迭代提供的基本列表referenceIds。然后在下一步中,我想进行多个服务调用并将结果聚合到一个元组中。

然后作为最后一步,我想迭代我的所有referenceIds,并返回JSONObject通过事件流直接生成的每个。

@GetMapping(value = "/test", produces = TEXT_EVENT_STREAM_VALUE)
public Flux<JSONObject> test(Integer pages) {
    pages = 1;
    return Flux.range(1, pages)
            .map(pageNumber -> Arrays.asList(1 * pageNumber, 2 * pageNumber, 3 * pageNumber)) //referenceIds for testing
            .flatMap(numbers -> Flux.fromIterable(numbers).zipWith(Mono.zip(a(numbers), b(numbers)))) //results based on reference ids
            .map(tuple -> {
                Integer number = tuple.getT1();
                Tuple2<List<String>, List<String>> lookup = tuple.getT2();

                JSONObject json = new JSONObject();
                json.put("number", number);
                json.put("someMore", <fromLookup>);
                return json;
            });
}
Run Code Online (Sandbox Code Playgroud)

对于这个例子,方法的返回类型a()并不b()重要。重要的部分是:

如果我回来Flux.fromIterable(numbers);一切正常。但是当使用聚合时.zipWith(),我只收到数字列表的第一个元素。其他人都迷失了。为什么?

旁注:我需要用来.zipWith()并行执行这些方法调用(一些运行时间较长的事务)。

Vla*_*aev 5

其他人都迷失了。为什么?

来自 Flux 类文档:

将这个 {@link Flux} 与另一个 {@link Publisher} 源压缩在一起,也就是说,等待两者发出一个元素并将这些元素一次组合成一个 {@link Tuple2}。操作员将继续这样做,直到任何源完成。

您可以执行 a() 和 b() 调用,压缩结果,然后将numbers列表展开到 Flux 并添加结果,如下所示:

   .flatMap(numbers -> Mono.zip(a(numbers), b(numbers))
                .flatMapMany(tuple -> Flux.fromIterable(numbers).map(i -> Tuples.of(i,tuple))))
Run Code Online (Sandbox Code Playgroud)