假设我有一个处理器,当按下按钮时会发出一个布尔值,将其视为切换.
boolean gateValue = true;
PublishProcessor<Boolean> gate = PublishProcessor.create();
view.onButtonClicked()
.subscribe(new Action1<Void>() {
@Override
public void call(final Void aVoid) {
gate.onNext(gateValue = !gateValue);
}
}));
Run Code Online (Sandbox Code Playgroud)
我想要做的是使用门的值来暂停和恢复可观察的序列,在暂停时缓冲发出的值.
我已经阅读了很多内容,虽然它似乎可以在其他语言的反应式扩展中使用,但RxJava似乎并不支持它.
这是我想要实现的一个例子,它只是每秒输出一个增量值.当我按下按钮时,我希望输出停止,直到我再次按下它,这将输出两次按下按钮之间发出的每个项目:
Flowable.interval(1, TimeUnit.SECONDS)
.bufferWhile(gate)
.flatMapIterable(longs -> longs)
.subscribe(new Consumer<Long>() {
@Override
public void accept(final Long aLong) throws Exception {
view.displayTime(aLong);
}
});
Run Code Online (Sandbox Code Playgroud)
有谁知道如何实现这样的事情?
编辑我写了一篇关于如何实现这个的博客文章https://medium.com/@scottalancooper/pausing-and-resuming-a-stream-in-rxjava-988a0977b771#.gj7fsi1xk
我的代码中有很多Single
,比如
Disposable disp = Single.fromCallable(()-> loadData())
.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.io())
.subscribe(res-> showInUI(res),
throwable -> Log.e(TAG, throwable.getMessage()))
);
Run Code Online (Sandbox Code Playgroud)
当我从资料了解,之间的差别Observable
,并Single
是单可以响应错误,决不响应,有成功的回答它发射只有一次.现在我没有在任何地方处置,一切正常.
所以我需要执行disp.dispose()
吗?
我正在创建嵌套请求,如下所示(省略了一些错误处理):
return Single.create((SingleOnSubscribe<String>) emitter -> getPages()
.subscribe(pages -> getPageData(emitter, pages), emitter::onError))
.compose(applySchedulers());
// ...
private void getPageData(SingleEmitter<String> emitter, List<Page> pages) {
service.getPage(pages.get(0).id)
.subscribe(emitter::onSuccess, e -> {
pages.remove(0);
getPageData(emitter, pages);
});
}
Run Code Online (Sandbox Code Playgroud)
我曾经有一个迭代解决方案,它产生了相同的结果.页面列表按顺序排序,应按原样处理.如果连接良好,这部分代码可以正常工作,但是如果我碰巧遇到了错误的连接java.io.InterruptedIOException: thread interrupted
.解决这个问题的好方法是什么?
编辑:
堆栈跟踪:
W/System.err: java.io.InterruptedIOException: thread interrupted
W/System.err: at okio.Timeout.throwIfReached(Timeout.java:145)
W/System.err: at okio.Okio$2.read(Okio.java:136)
W/System.err: at okio.AsyncTimeout$2.read(AsyncTimeout.java:237)
W/System.err: at okio.RealBufferedSource.read(RealBufferedSource.java:46)
W/System.err: at okhttp3.internal.http1.Http1Codec$ChunkedSource.read(Http1Codec.java:429)
W/System.err: at okio.RealBufferedSource.read(RealBufferedSource.java:46)
W/System.err: at okio.RealBufferedSource.exhausted(RealBufferedSource.java:56)
W/System.err: at okio.InflaterSource.refill(InflaterSource.java:101)
W/System.err: at okio.InflaterSource.read(InflaterSource.java:62)
W/System.err: at okio.GzipSource.read(GzipSource.java:80)
W/System.err: at okio.RealBufferedSource.read(RealBufferedSource.java:46)
W/System.err: at okio.ForwardingSource.read(ForwardingSource.java:35)
W/System.err: at …
Run Code Online (Sandbox Code Playgroud) 我的应用程序中有2个服务端点,a
并且b
(两个都是“ Singles”)。服务请求b
取决于的响应a
。在响应之后,b
我需要访问订户中的两个响应。我打算这样打电话:
services.a()
.flatMap(a -> services.b(a))
.subscribe(b ->
// Access a and b here
)
Run Code Online (Sandbox Code Playgroud)
但是通过这种方式,我只能访问b
订户中的结果。我又如何传递对此的回应a
?
我的第一个尝试是使用类似这样的东西:
// Note: Code will not compile... Just to show the concept
services.a()
.flatMap(
a -> Observable.combineLatest(
Observable.just(a)
services.b(a).toObservable()
Bifunction((a, b) -> {return Pair<ResponseA, ResponseB>(a, b)}))
)
.toSingle()
.subscribe(pair -> {
ResponseA a = pair.first();
ResponseB b = pair.second();
})
Run Code Online (Sandbox Code Playgroud)
但是随着用例变得更加复杂,代码将演变为丑陋的怪物。
我有以下方法:
public class ParentalControlInteractor {
public Single<Boolean> isPinSet() {
return bamSdk.getPinManager().isPINSet();
}
}
Run Code Online (Sandbox Code Playgroud)
我想调用此函数运行一次,然后每分钟重复一次,直到无穷大但这看起来很笨拙:
parentalControlInteractor.isPinSet()
.subscribeOn(Schedulers.io())
.repeat(10000)
.timeout(1600,TimeUnit.MILLISECONDS)
.doOnError(throwable -> {
Timber.e(throwable,"Error getting if Pin is set");
throwable.printStackTrace();
})
.subscribe(isPinSet -> {
this.isPinSet = isPinSet;
Timber.d("Pin is set = " + isPinSet.toString());
});
Run Code Online (Sandbox Code Playgroud)
有没有更好的方法呢?我正在使用RxJava2.此外,上述方法仅调用10000次.我想永远调用它,就像使用Handler.postDelayed()一样.
尽管调试rxJava网络电话在一个应用程序我碰到这样一种情况,如果我们dispose
或clear
处置对象通过链的订阅返回observables
那么只有第一个observable
得到处理而不是其他链接observables
的flatMap
.
看看下面的演示代码片段:
CompositeDisposable testCompositeDisposal = new CompositeDisposable();
private void testLoadData() {
Disposable disposable = Observable.create(sbr -> {
for (int i = 0; i < 5; i++) {
Thread.sleep(3000);
Log.w("Debug: ", "First: " + i);
sbr.onNext(true);
}
sbr.onComplete();
}).subscribeOn(Schedulers.io()).flatMap(value -> Observable.create(sbr -> {
for (int i = 0; i < 5; i++) {
Thread.sleep(3000);
Log.w("Debug: ", "Second: " + i);
sbr.onNext(true);
}
sbr.onComplete();
})).doOnNext(value -> {
Log.w("Debug: ", "doONNext"); …
Run Code Online (Sandbox Code Playgroud) 使用Android Studio 3.0 IDE在Kotlin中编写.zip Observable时遇到问题。
这是我的代码:
internal var updateStringEventsSubject = PublishSubject.create<String>()
internal var updateIntEventsSubject = PublishSubject.create<Int>()
internal var triggerSave = PublishSubject.create<Boolean>()
internal var triggerStopAndSave = PublishSubject.create<Boolean>()
internal var normalSaveTrigger = triggerSave.debounce(30, TimeUnit.SECONDS)
internal var trigger = Observable.merge(normalSaveTrigger, triggerStopAndSave)
private fun saveEvents(
strings: List<String>,
integers: List<Int>,
someBoolean: Boolean): Boolean {
return true //doesn't matter for now
}
fun temp() {
Observable.zip<List<String>, List<Int>, Boolean, Boolean>(updateStringEventsSubject.buffer(trigger),
updateIntEventsSubject.buffer(trigger),
trigger, { strings: List<String>, integers: List<Int>, someBoolean: Boolean -> saveEvents(strings, integers, someBoolean) })
.subscribe()
}
Run Code Online (Sandbox Code Playgroud)
但是,我的IDE(Android …
在阅读了多篇博客文章和文档后,我得出结论,以下doOnSubscribe
将在工作线程上执行:
Observable.just(1)
.observeOn(Schedulers.io())
.doOnSubscribe(__ -> Log.d("Testing", "Testing")) // Shouldn't this be on worker thread?
.subscribe();
Run Code Online (Sandbox Code Playgroud)
但是在调试之后,我看到doOnSubscribe
在主线程上执行了.我认为doOnSubscribe
它与其他运算符类似,因此在与subscribeOn
和相结合时具有类似的线程行为observeOn
.
我错过了什么?如何将doOnSubscribe
执行移至后台线程?
假设我有一个事件发射数据源,我想将其转换为反应流.数据源由资源绑定(例如,定期发送更新状态的套接字),因此我希望共享该资源的单个订阅.使用单个observable replay
(对于新订户立即获得当前值),refCount
运营商似乎非常适合这种情况.例如,这是他的MyDataProvider
单身人士的样子:
private final Observable<MyData> myDataObservable = Observable.<MyData>create(emitter -> {
// Open my resource here and emit data into observable
})
.doOnDispose(() -> {
// Close my resource here
})
.replay(1)
.refCount();
public Observable<MyData> getMyDataObservable() {
return myDataObservable;
}
Run Code Online (Sandbox Code Playgroud)
但是,现在假设我有另一个数据源需要第一个数据源的结果来计算自己的值:
private final Observable<AnotherData> anotherDataObservable = getMyDataProvider().getMyDataObservable()
.flatMap(myData -> {
// Call another data source and return the result here
})
public Observable<AnotherData> getAnotherDataObservable() {
return anotherDataObservable;
}
Run Code Online (Sandbox Code Playgroud)
在这里,我的设置开始崩溃.第一个可观测量的多播仅适用于refCount
操作员.在那之后,一切都再次单播.这意味着如果进行两次单独的订阅anotherDataProvider
,flatMap
则将调用operator两次.我看到了两个解决方法,但我不喜欢这两个: