寻找一种干净的方法来转换源Observable以null在一段时间内不发出项目后发出单个(或标记值).
例如,如果源可观察的发射1, 2, 3然后在发射之前停止发射10秒,4, 5, 6我希望发射的项目是1, 2, 3, null, 4, 5, 6.
用例用于在UI中显示值,其中显示的值应变为破折号-或者N/A最后发出的值是陈旧/旧的.
我查看了timeout运算符,但它终止Observable了超时发生的时间,这是不可取的.
使用RxJava.
基于akarnokd 的回答和类似问题中的回答,另一种实现:
如果您正在寻找一个值来指示排放之间的时间间隔:
final TestScheduler scheduler = new TestScheduler();
final TestSubject<Integer> subject = TestSubject.create(scheduler);
final TestSubscriber<Integer> subscriber = new TestSubscriber<>();
final long duration = 100;
final Observable<Integer> timeout = Observable.just(-1).delay(duration, TimeUnit.MILLISECONDS, scheduler)
.concatWith(Observable.never())
.takeUntil(subject)
.repeat();
subject.mergeWith(timeout).subscribe(subscriber);
subject.onNext(1, 0);
subject.onNext(2, 100);
subject.onNext(3, 200);
scheduler.advanceTimeBy(200, TimeUnit.MILLISECONDS);
scheduler.advanceTimeBy(300, TimeUnit.MILLISECONDS);
subject.onNext(4, 0);
subject.onNext(5, 100);
subject.onNext(6, 200);
scheduler.advanceTimeBy(200, TimeUnit.MILLISECONDS);
subscriber.assertNoTerminalEvent();
subscriber.assertReceivedOnNext(Arrays.asList(1, 2, 3, -1, 4, 5, 6));
Run Code Online (Sandbox Code Playgroud)
如果您希望在源 observable 在一段时间内不发射后继续接收值:
final TestScheduler scheduler = new TestScheduler();
final TestSubject<Integer> subject = TestSubject.create(scheduler);
final TestSubscriber<Integer> subscriber = new TestSubscriber<>();
final long duration = 100;
final Observable<Integer> timeout = Observable.interval(duration, duration, TimeUnit.MILLISECONDS, scheduler)
.map(x -> -1)
.takeUntil(subject)
.repeat();
subject.mergeWith(timeout).subscribe(subscriber);
subject.onNext(1, 0);
subject.onNext(2, 100);
subject.onNext(3, 200);
scheduler.advanceTimeBy(200, TimeUnit.MILLISECONDS);
scheduler.advanceTimeBy(300, TimeUnit.MILLISECONDS);
subject.onNext(4, 0);
subject.onNext(5, 100);
subject.onNext(6, 200);
scheduler.advanceTimeBy(200, TimeUnit.MILLISECONDS);
subscriber.assertNoTerminalEvent();
subscriber.assertReceivedOnNext(Arrays.asList(1, 2, 3, -1, -1, -1, 4, 5, 6));
Run Code Online (Sandbox Code Playgroud)
区别在于timeout可观察性以及它是否重复出现。
您可以替换-1使用null需要。
以上所有内容均使用 Java 使用 RxJava 1.0.17 进行测试1.8.0_72。