如何通过将两个 Flux 中的值配对到一个元组中来组合发布者?

Kay*_*ayV 2 java publisher publish-subscribe project-reactor reactive-streams

假设我有两个 Flux,如下所示:

    Flux<Integer> f1 = Flux.just(10,20,30,40);
    Flux<Integer> f2 = Flux.just(100,200,300,400);
Run Code Online (Sandbox Code Playgroud)

现在我想要的是将这些通量组合成单个通量或两个通量的元组,这将在单个通量中具有两个通量的元素。

我使用 zipwith 方法尝试了以下操作:

  Flux<Integer, Integer> zipped = f1.zipWith(f2,
                                            (one, two) ->  one + "," +two)
                                    .subscribe();
Run Code Online (Sandbox Code Playgroud)

但这会产生编译时错误:

 Incorrect number of arguments for type Flux<T>; it cannot be parameterized with arguments <Integer, Integer>
Run Code Online (Sandbox Code Playgroud)

我怎样才能做到这一点?请建议。

Jer*_*eke 5

Flux 只有一个类型参数,所以这Flux<Integer,Integer>是不可能的,我不确定你想用 来实现什么one + "," + two,但这个表达式的类型是 String。

因此,本质上,您将两个整数映射到一个字符串,因此 的类型zipped应该是Flux<String>.

或者,您可以映射到您自己制作的特殊元组类(或者可能来自您正在使用的库):

public class Pair<A,B> {
    private final A first;

    private final B second;

    public Pair(A first, B second) {
        this.first = first;
        this.second = second;
    }

    public A getFirst() {
        return first;
    }

    public B getSecond() {
        return second;
    }
}
Run Code Online (Sandbox Code Playgroud)

然后您可以按如下方式映射:

Flux<Integer> f1 = Flux.just(10,20,30,40);
Flux<Integer> f2 = Flux.just(100,200,300,400);

Flux<Pair<Integer,Integer>> result = f1.zipWith(f2,
    (one, two) ->  new Pair<>(one, two));
Run Code Online (Sandbox Code Playgroud)

或者甚至更短:

Flux<Pair<Integer,Integer>> result = f1.zipWith(f2, Pair::new);
Run Code Online (Sandbox Code Playgroud)