我有一个调用onNext()一些UI事件的PublishSubject .订户通常需要2秒钟才能完成其工作.onNext()在订户忙时,我需要忽略除最后一个之外的所有呼叫.我尝试了以下,但是我无法控制流量.请求似乎排队等待,每个请求都得到处理(因此背压似乎不起作用).如何让它忽略所有请求,但最后一个?(我不想使用,debounce因为代码需要立即做出反应,任何合理的小超时都不起作用).
而且我意识到使用subscribeOn一个主题没有任何影响,因此我正在使用observeOn其中一个运算符进行异步工作.这是正确的方法吗?
Subject<Boolean> loadingQueue = PublishSubject.<Boolean>create().toSerialized();
loadingQueue
.toFlowable(BackpressureStrategy.LATEST)
.observeOn(AndroidSchedulers.mainThread())
.map(discarded -> {
// PRE-LOADING
Log.d("RXLOADING", "PRE-LOADING: " + Thread.currentThread().getName());
return discarded;
})
.observeOn(Schedulers.computation())
.map(b -> {
Log.d("RXLOADING", "LOADING: " + Thread.currentThread().getName());
Thread.sleep(2000);
return b;
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(b -> {
Log.d("RXLOADING", "FINISHED: " + Thread.currentThread().getName() + "\n\n");
});
loadingQueue.onNext(true);
loadingQueue.onNext(true);
loadingQueue.onNext(true);
....
Run Code Online (Sandbox Code Playgroud)
我看到的输出是:
PRE-LOADING: main
PRE-LOADING: main
LOADING: RxComputationThreadPool-1
PRE-LOADING: main
PRE-LOADING: main
PRE-LOADING: main
PRE-LOADING: main
PRE-LOADING: main
PRE-LOADING: main
LOADING: RxComputationThreadPool-1
FINISHED: main
LOADING: RxComputationThreadPool-1
FINISHED: main
LOADING: RxComputationThreadPool-1
FINISHED: main
LOADING: RxComputationThreadPool-1
FINISHED: main
LOADING: RxComputationThreadPool-1
FINISHED: main
LOADING: RxComputationThreadPool-1
FINISHED: main
LOADING: RxComputationThreadPool-1
FINISHED: main
FINISHED: main
Run Code Online (Sandbox Code Playgroud)
相反,我希望代码执行以下操作(即加载一次,当它加载时,背压以阻止所有请求并发出最后一个请求,一旦第一个观察者完成 - 所以总的来说理想情况下应该加载两次最多):
PRE-LOADING: main
LOADING: RxComputationThreadPool-1
FINISHED: main
PRE-LOADING: main
LOADING: RxComputationThreadPool-1
FINISHED: main
Run Code Online (Sandbox Code Playgroud)
您无法执行此操作,observeOn因为它将缓冲至少1个元素,因此如果已经发生一次"加载",则始终执行"PRE-LOADING"阶段.
但是你可以这样做,delay因为它不会操纵链上的请求数量,并在调度程序上单独调度每个onNext而不自行排队:
public static void main(String[] args) throws Exception {
Subject<Boolean> loadingQueue =
PublishSubject.<Boolean>create().toSerialized();
loadingQueue
.toFlowable(BackpressureStrategy.LATEST)
.delay(0, TimeUnit.MILLISECONDS, Schedulers.single()) // <-------
.map(discarded -> {
// PRE-LOADING
System.out.println("PRE-LOADING: "
+ Thread.currentThread().getName());
return discarded;
})
.delay(0, TimeUnit.MILLISECONDS, Schedulers.computation()) // <-------
.map(b -> {
System.out.println("LOADING: "
+ Thread.currentThread().getName());
Thread.sleep(2000);
return b;
})
.delay(0, TimeUnit.MILLISECONDS, Schedulers.single()) // <-------
.rebatchRequests(1) // <----------------------------------- one-by-one
.subscribe(b -> {
System.out.println("FINISHED: "
+ Thread.currentThread().getName() + "\n\n");
});
loadingQueue.onNext(true);
loadingQueue.onNext(true);
loadingQueue.onNext(true);
Thread.sleep(10000);
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1072 次 |
| 最近记录: |