RxJava - 与switchMap()运算符相反?

tmn*_*tmn 8 reactive-programming rx-java

我想知道是否有办法组合现有的运营商来执行与a相反的操作switchMap().

switchMap()收到最后的发射后会追逐并取消所有Observable先前被执行.假设我翻了它,我想忽略所有排放给xxxMap()运营商的排放,因为它忙于收到的第一个排放.它将继续忽略排放,直到它完成在其Observable内部发射电流.然后它将处理它接收的下一个发射.

Observable.interval(1, TimeUnit.SECONDS)
        .doOnNext(i -> System.out.println("Source Emitted Value: " + i))
        .ignoreWhileBusyMap(i -> doIntensiveProcess(i).subcribeOn(Schedulers.computation())) 
        .subscribe(i -> System.out.println("Subscriber received Value: " + i));
Run Code Online (Sandbox Code Playgroud)

有没有办法实现这个目标?在上面的例子,如果intensiveProcess()是持续三秒钟后,ignoreWhileBusyMap()会处理0,但可能忽略了排放12来自哪里interval().它会再处理3,但可能忽略45,等等...

aka*_*okd 6

当然,通过处理完成后设置的布尔来处理值的处理:

AtomicBoolean gate = new AtomicBoolean(true);

Observable.interval(200, TimeUnit.MILLISECONDS)
.flatMap(v -> {
    if (gate.get()) {
        gate.set(false);

        return Observable.just(v).delay(500, TimeUnit.MILLISECONDS)
                .doAfterTerminate(() -> gate.set(true));
    } else {
        return Observable.empty();
    }
})
.take(10)
.toBlocking()
.subscribe(System.out::println, Throwable::printStackTrace);
Run Code Online (Sandbox Code Playgroud)

编辑

替代方案:

Observable.interval(200, TimeUnit.MILLISECONDS)
.onBackpressureDrop()
.flatMap(v -> {
    return Observable.just(v).delay(500, TimeUnit.MILLISECONDS);
}, 1)
.take(10)
.toBlocking()
.subscribe(System.out::println, Throwable::printStackTrace);
Run Code Online (Sandbox Code Playgroud)

您可以更改onBackpressureDroponBackpressureLatest立即继续使用最新值.

  • 谢谢你的替代方案.我能够将它组成一个Kotlin的扩展函数:`inline fun <T,R> Observable <T> .ignoreWhileBusyMap(crossinline mapper:(T) - > Observable <R>)= onBackpressureDrop().flatMap({mapper .invoke(it)},1)` (2认同)

Bou*_*udi 5

我知道这是一个旧线程,但目前有一个 RxJs 运算符可以做到这一点。

运营商是exhaustMap

根据文档

ExhaustMap 将每个源值投影到一个 Observable 中,只有在前一个投影的 Observable 已完成时,该 Observable 才会合并到输出 Observable 中。

文档示例

import { fromEvent, interval } from 'rxjs';
import { exhaustMap, take } from 'rxjs/operators';

const clicks = fromEvent(document, 'click');
const result = clicks.pipe(
  exhaustMap(ev => interval(1000).pipe(take(5)))
);
result.subscribe(x => console.log(x));
Run Code Online (Sandbox Code Playgroud)

  • ExhaustMap 在 RxJava 中仍然不存在,这正是最初提出的问题。我很失望 RxJava 没有它,来自 RxJs。 (2认同)