tmn*_*tmn 8 reactive-programming rx-java
我想知道是否有办法组合现有的运营商来执行与a相反的操作switchMap().
在switchMap()收到最后的发射后会追逐并取消所有Observable先前被执行.假设我翻了它,我想忽略所有排放给xxxMap()运营商的排放,因为它忙于收到的第一个排放.它将继续忽略排放,直到它完成在其Observable内部发射电流.然后它将处理它接收的下一个发射.
Observable.interval(1, TimeUnit.SECONDS)
.doOnNext(i -> System.out.println("Source Emitted Value: " + i))
.ignoreWhileBusyMap(i -> doIntensiveProcess(i).subcribeOn(Schedulers.computation()))
.subscribe(i -> System.out.println("Subscriber received Value: " + i));
Run Code Online (Sandbox Code Playgroud)
有没有办法实现这个目标?在上面的例子,如果intensiveProcess()是持续三秒钟后,ignoreWhileBusyMap()会处理0,但可能忽略了排放1和2来自哪里interval().它会再处理3,但可能忽略4和5,等等...
当然,通过处理完成后设置的布尔来处理值的处理:
AtomicBoolean gate = new AtomicBoolean(true);
Observable.interval(200, TimeUnit.MILLISECONDS)
.flatMap(v -> {
if (gate.get()) {
gate.set(false);
return Observable.just(v).delay(500, TimeUnit.MILLISECONDS)
.doAfterTerminate(() -> gate.set(true));
} else {
return Observable.empty();
}
})
.take(10)
.toBlocking()
.subscribe(System.out::println, Throwable::printStackTrace);
Run Code Online (Sandbox Code Playgroud)
编辑
替代方案:
Observable.interval(200, TimeUnit.MILLISECONDS)
.onBackpressureDrop()
.flatMap(v -> {
return Observable.just(v).delay(500, TimeUnit.MILLISECONDS);
}, 1)
.take(10)
.toBlocking()
.subscribe(System.out::println, Throwable::printStackTrace);
Run Code Online (Sandbox Code Playgroud)
您可以更改onBackpressureDrop为onBackpressureLatest立即继续使用最新值.
我知道这是一个旧线程,但目前有一个 RxJs 运算符可以做到这一点。
运营商是exhaustMap。
根据文档:
ExhaustMap 将每个源值投影到一个 Observable 中,只有在前一个投影的 Observable 已完成时,该 Observable 才会合并到输出 Observable 中。
文档示例:
import { fromEvent, interval } from 'rxjs';
import { exhaustMap, take } from 'rxjs/operators';
const clicks = fromEvent(document, 'click');
const result = clicks.pipe(
exhaustMap(ev => interval(1000).pipe(take(5)))
);
result.subscribe(x => console.log(x));
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
792 次 |
| 最近记录: |