ReactiveX在超时后发出null或sentinel值

jen*_*ert 6 rx-java reactivex

寻找一种干净的方法来转换源Observablenull在一段时间内不发出项目后发出单个(或标记值).

例如,如果源可观察的发射1, 2, 3然后在发射之前停止发射10秒,4, 5, 6我希望发射的项目是1, 2, 3, null, 4, 5, 6.

用例用于在UI中显示值,其中显示的值应变为破折号-或者N/A最后发出的值是陈旧/旧的.

我查看了timeout运算符,但它终止Observable了超时发生的时间,这是不可取的.

使用RxJava.

Why*_*rrh 5

基于akarnokd 的回答类似问题中的回答,另一种实现:

单个哨兵值(根据 OP)

如果您正在寻找一个值来指示排放之间的时间间隔:

final TestScheduler scheduler = new TestScheduler();
final TestSubject<Integer> subject = TestSubject.create(scheduler);
final TestSubscriber<Integer> subscriber = new TestSubscriber<>();

final long duration = 100;
final Observable<Integer> timeout = Observable.just(-1).delay(duration, TimeUnit.MILLISECONDS, scheduler)
    .concatWith(Observable.never())
    .takeUntil(subject)
    .repeat();

subject.mergeWith(timeout).subscribe(subscriber);

subject.onNext(1,   0);
subject.onNext(2, 100);
subject.onNext(3, 200);

scheduler.advanceTimeBy(200, TimeUnit.MILLISECONDS);
scheduler.advanceTimeBy(300, TimeUnit.MILLISECONDS);

subject.onNext(4,   0);
subject.onNext(5, 100);
subject.onNext(6, 200);

scheduler.advanceTimeBy(200, TimeUnit.MILLISECONDS);

subscriber.assertNoTerminalEvent();
subscriber.assertReceivedOnNext(Arrays.asList(1, 2, 3, -1, 4, 5, 6));
Run Code Online (Sandbox Code Playgroud)

连续哨兵值

如果您希望在源 observable 在一段时间内不发射后继续接收值:

final TestScheduler scheduler = new TestScheduler();
final TestSubject<Integer> subject = TestSubject.create(scheduler);
final TestSubscriber<Integer> subscriber = new TestSubscriber<>();

final long duration = 100;
final Observable<Integer> timeout = Observable.interval(duration, duration, TimeUnit.MILLISECONDS, scheduler)
    .map(x -> -1)
    .takeUntil(subject)
    .repeat();

subject.mergeWith(timeout).subscribe(subscriber);

subject.onNext(1,   0);
subject.onNext(2, 100);
subject.onNext(3, 200);

scheduler.advanceTimeBy(200, TimeUnit.MILLISECONDS);
scheduler.advanceTimeBy(300, TimeUnit.MILLISECONDS);

subject.onNext(4,   0);
subject.onNext(5, 100);
subject.onNext(6, 200);

scheduler.advanceTimeBy(200, TimeUnit.MILLISECONDS);

subscriber.assertNoTerminalEvent();
subscriber.assertReceivedOnNext(Arrays.asList(1, 2, 3, -1, -1, -1, 4, 5, 6));
Run Code Online (Sandbox Code Playgroud)

区别在于timeout可观察性以及它是否重复出现。

您可以替换-1使用null需要。

以上所有内容均使用 Java 使用 RxJava 1.0.17 进行测试1.8.0_72