我正在努力了解RxJava.我的测试代码是:
import rx.Observable;
import rx.Subscriber;
import rx.functions.Action1;
import java.util.concurrent.TimeUnit;
public class Hello {
public static void main(String[] args) {
Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
try {
Thread.sleep(1000);
subscriber.onNext("a");
Thread.sleep(1000);
subscriber.onNext("b");
Thread.sleep(1000);
subscriber.onNext("c");
Thread.sleep(1000);
subscriber.onNext("d");
Thread.sleep(1000);
subscriber.onNext("e");
Thread.sleep(1000);
subscriber.onNext("f");
Thread.sleep(1000);
subscriber.onNext("g");
Thread.sleep(1000);
subscriber.onNext("h");
} catch (InterruptedException e) {
subscriber.onError(e);
}
}
});
observable
.delay(2, TimeUnit.SECONDS)
.subscribe(new Action1<String>() {
@Override
public void call(String string) {
System.out.println(string);
}
});
}
}
Run Code Online (Sandbox Code Playgroud)
没有.delay(2, TimeUnit.SECONDS)我的输出:a b c d e f g h但是.delay(2, TimeUnit.SECONDS)
输出缺少"g"和"h":a b c d e f
怎么可能?文档说延迟只是发出源Observable发出的项目在时间上向前移动指定的延迟
Aar*_*ron 12
在delay您使用时间表超载在不同的线程,并导致一个隐含的比赛condition.All时间运营商合作(如delay,buffer和window)需要使用调度安排为后来的影响,这可能会导致意想不到的竞争条件,如果你不知道它并仔细使用它们.在这种情况下,延迟操作员在单独的线程池上调度下游工作.以下是测试中的执行顺序(在主线程上).
onNext("a")onNext("b")到延迟.延迟将"b"的onNext计划为2秒钟.onNext("h")它调度工作然后立即从subscribe返回并终止你的测试(导致计划的工作消失).为了让它以异步方式执行,您可以安排trampoline调度程序实现的延迟.
.delay(2, TimeUnit.SECONDS, Schedulers.trampoline())
Run Code Online (Sandbox Code Playgroud)