Rx 操作符的开头类似于combineLatest,但随后的行为类似于withLatestFrom

Sco*_*t H 5 reactive-programming rx-swift reactivex

我正在寻找一种运算符,它Observables通过在两者都Observables发出一个元素之前不发出任何内容来组合两者(类似于combineLatest),但随后仅通过将一个元素与Observable另一个最近发出的元素组合来发出元素Observable(类似于withLatestFrom)。结果看起来像这样(y可观察的是“控制”):

在此输入图像描述

这样的运营商存在吗?

fla*_*kes 3

我用 Java 解决了这个问题,但同样的理论也应该适用于你。

你所拥有的实际上是两种基本模式;一个combineLatest值后跟withLatestFrom多个值。如果withLatestFrom首先触发,那么您想要跳过该combineLatest值。

首先创建withLatestFrom可观察的:

Observable<Result> wlf = o1.withLatestFrom(o2, f::apply);
Run Code Online (Sandbox Code Playgroud)

接下来我们要创建combineLatest发出单个值的可观察对象。我们还想在wlf触发时停止这个可观察的:

Observable<Result> cl = Observable.combineLatest(o1, o2, f::apply)
    .take(1).takeUntil(wlf);
Run Code Online (Sandbox Code Playgroud)

最后将这两个可观察量添加在一起......为了方便起见,我创建了一个辅助方法来接受任意两个可观察量和一个双功能运算符:

public static <Result,
    Param1, Source1 extends Param1,
    Param2, Source2 extends Param2>
Observable<Result> combineThenLatestFrom(
    final Observable<Source1> o1,
    final Observable<Source2> o2,
    final BiFunction<Param1, Param2, Result> f
) {
    final Observable<Result> base = o1
        .withLatestFrom(o2, f::apply);
    return Observable
        .combineLatest(o1, o2, f::apply)
        .take(1).takeUntil(base)
        .mergeWith(base);
}
Run Code Online (Sandbox Code Playgroud)

这是我用来验证该方法的测试代码:

public static void main(final String[] args) {
    final TestScheduler scheduler = new TestScheduler();
    final TestSubject<String> o1 = TestSubject.create(scheduler);
    final TestSubject<String> o2 = TestSubject.create(scheduler);
    final Observable<String> r = combineThenLatestFrom(o1, o2, (a, b) -> a + b);
    r.subscribe(System.out::println);
    o1.onNext("1");
    o1.onNext("2");
    o2.onNext("A");
    o2.onNext("B");
    o2.onNext("C");
    o2.onNext("D");
    o1.onNext("3");
    o2.onNext("E");
    scheduler.triggerActions();
}
Run Code Online (Sandbox Code Playgroud)

哪个输出:

2A
3D
Run Code Online (Sandbox Code Playgroud)