假设我有一个处理器,当按下按钮时会发出一个布尔值,将其视为切换.
boolean gateValue = true;
PublishProcessor<Boolean> gate = PublishProcessor.create();
view.onButtonClicked()
.subscribe(new Action1<Void>() {
@Override
public void call(final Void aVoid) {
gate.onNext(gateValue = !gateValue);
}
}));
Run Code Online (Sandbox Code Playgroud)
我想要做的是使用门的值来暂停和恢复可观察的序列,在暂停时缓冲发出的值.
我已经阅读了很多内容,虽然它似乎可以在其他语言的反应式扩展中使用,但RxJava似乎并不支持它.
这是我想要实现的一个例子,它只是每秒输出一个增量值.当我按下按钮时,我希望输出停止,直到我再次按下它,这将输出两次按下按钮之间发出的每个项目:
Flowable.interval(1, TimeUnit.SECONDS)
.bufferWhile(gate)
.flatMapIterable(longs -> longs)
.subscribe(new Consumer<Long>() {
@Override
public void accept(final Long aLong) throws Exception {
view.displayTime(aLong);
}
});
Run Code Online (Sandbox Code Playgroud)
有谁知道如何实现这样的事情?
编辑我写了一篇关于如何实现这个的博客文章https://medium.com/@scottalancooper/pausing-and-resuming-a-stream-in-rxjava-988a0977b771#.gj7fsi1xk
我的代码中有很多Single
,比如
Disposable disp = Single.fromCallable(()-> loadData())
.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.io())
.subscribe(res-> showInUI(res),
throwable -> Log.e(TAG, throwable.getMessage()))
);
Run Code Online (Sandbox Code Playgroud)
当我从资料了解,之间的差别Observable
,并Single
是单可以响应错误,决不响应,有成功的回答它发射只有一次.现在我没有在任何地方处置,一切正常.
所以我需要执行disp.dispose()
吗?
我正在创建嵌套请求,如下所示(省略了一些错误处理):
return Single.create((SingleOnSubscribe<String>) emitter -> getPages()
.subscribe(pages -> getPageData(emitter, pages), emitter::onError))
.compose(applySchedulers());
// ...
private void getPageData(SingleEmitter<String> emitter, List<Page> pages) {
service.getPage(pages.get(0).id)
.subscribe(emitter::onSuccess, e -> {
pages.remove(0);
getPageData(emitter, pages);
});
}
Run Code Online (Sandbox Code Playgroud)
我曾经有一个迭代解决方案,它产生了相同的结果.页面列表按顺序排序,应按原样处理.如果连接良好,这部分代码可以正常工作,但是如果我碰巧遇到了错误的连接java.io.InterruptedIOException: thread interrupted
.解决这个问题的好方法是什么?
编辑:
堆栈跟踪:
W/System.err: java.io.InterruptedIOException: thread interrupted
W/System.err: at okio.Timeout.throwIfReached(Timeout.java:145)
W/System.err: at okio.Okio$2.read(Okio.java:136)
W/System.err: at okio.AsyncTimeout$2.read(AsyncTimeout.java:237)
W/System.err: at okio.RealBufferedSource.read(RealBufferedSource.java:46)
W/System.err: at okhttp3.internal.http1.Http1Codec$ChunkedSource.read(Http1Codec.java:429)
W/System.err: at okio.RealBufferedSource.read(RealBufferedSource.java:46)
W/System.err: at okio.RealBufferedSource.exhausted(RealBufferedSource.java:56)
W/System.err: at okio.InflaterSource.refill(InflaterSource.java:101)
W/System.err: at okio.InflaterSource.read(InflaterSource.java:62)
W/System.err: at okio.GzipSource.read(GzipSource.java:80)
W/System.err: at okio.RealBufferedSource.read(RealBufferedSource.java:46)
W/System.err: at okio.ForwardingSource.read(ForwardingSource.java:35)
W/System.err: at …
Run Code Online (Sandbox Code Playgroud) 我的应用程序中有2个服务端点,a
并且b
(两个都是“ Singles”)。服务请求b
取决于的响应a
。在响应之后,b
我需要访问订户中的两个响应。我打算这样打电话:
services.a()
.flatMap(a -> services.b(a))
.subscribe(b ->
// Access a and b here
)
Run Code Online (Sandbox Code Playgroud)
但是通过这种方式,我只能访问b
订户中的结果。我又如何传递对此的回应a
?
我的第一个尝试是使用类似这样的东西:
// Note: Code will not compile... Just to show the concept
services.a()
.flatMap(
a -> Observable.combineLatest(
Observable.just(a)
services.b(a).toObservable()
Bifunction((a, b) -> {return Pair<ResponseA, ResponseB>(a, b)}))
)
.toSingle()
.subscribe(pair -> {
ResponseA a = pair.first();
ResponseB b = pair.second();
})
Run Code Online (Sandbox Code Playgroud)
但是随着用例变得更加复杂,代码将演变为丑陋的怪物。
我有以下方法:
public class ParentalControlInteractor {
public Single<Boolean> isPinSet() {
return bamSdk.getPinManager().isPINSet();
}
}
Run Code Online (Sandbox Code Playgroud)
我想调用此函数运行一次,然后每分钟重复一次,直到无穷大但这看起来很笨拙:
parentalControlInteractor.isPinSet()
.subscribeOn(Schedulers.io())
.repeat(10000)
.timeout(1600,TimeUnit.MILLISECONDS)
.doOnError(throwable -> {
Timber.e(throwable,"Error getting if Pin is set");
throwable.printStackTrace();
})
.subscribe(isPinSet -> {
this.isPinSet = isPinSet;
Timber.d("Pin is set = " + isPinSet.toString());
});
Run Code Online (Sandbox Code Playgroud)
有没有更好的方法呢?我正在使用RxJava2.此外,上述方法仅调用10000次.我想永远调用它,就像使用Handler.postDelayed()一样.
使用Android Studio 3.0 IDE在Kotlin中编写.zip Observable时遇到问题。
这是我的代码:
internal var updateStringEventsSubject = PublishSubject.create<String>()
internal var updateIntEventsSubject = PublishSubject.create<Int>()
internal var triggerSave = PublishSubject.create<Boolean>()
internal var triggerStopAndSave = PublishSubject.create<Boolean>()
internal var normalSaveTrigger = triggerSave.debounce(30, TimeUnit.SECONDS)
internal var trigger = Observable.merge(normalSaveTrigger, triggerStopAndSave)
private fun saveEvents(
strings: List<String>,
integers: List<Int>,
someBoolean: Boolean): Boolean {
return true //doesn't matter for now
}
fun temp() {
Observable.zip<List<String>, List<Int>, Boolean, Boolean>(updateStringEventsSubject.buffer(trigger),
updateIntEventsSubject.buffer(trigger),
trigger, { strings: List<String>, integers: List<Int>, someBoolean: Boolean -> saveEvents(strings, integers, someBoolean) })
.subscribe()
}
Run Code Online (Sandbox Code Playgroud)
但是,我的IDE(Android …
我尝试在MV活动中应用MVVM模式(我是Android菜鸟)。
我将Room与RxJava 2一起使用,例如,这是我的存储库中方法的签名:
public Single<MissionTask> getMissionTaskByID(long id) {..}
Run Code Online (Sandbox Code Playgroud)
在我的ViewModel类中,我对存储库和代码有如下引用:
private void doSomethingOnUserEvent() {
...
missionTaskRepository.getMissionTaskByID(firstID).
observeOn(AndroidSchedulers.mainThread()).
subscribeOn(Schedulers.io()).
subscribe(missionTask ->
{
// do some work and update live data
},
t -> {
// handle error
});
...
}
Run Code Online (Sandbox Code Playgroud)
到目前为止,一切似乎都很好。现在- subscribe
返回Disposable
。
我的问题是:
在我经历的一些示例中,没有处理Disposable。
更新:我已经在android-architecture-components中看到了复合一次性材料的用法。
谢谢。
假设我有一个事件发射数据源,我想将其转换为反应流.数据源由资源绑定(例如,定期发送更新状态的套接字),因此我希望共享该资源的单个订阅.使用单个observable replay
(对于新订户立即获得当前值),refCount
运营商似乎非常适合这种情况.例如,这是他的MyDataProvider
单身人士的样子:
private final Observable<MyData> myDataObservable = Observable.<MyData>create(emitter -> {
// Open my resource here and emit data into observable
})
.doOnDispose(() -> {
// Close my resource here
})
.replay(1)
.refCount();
public Observable<MyData> getMyDataObservable() {
return myDataObservable;
}
Run Code Online (Sandbox Code Playgroud)
但是,现在假设我有另一个数据源需要第一个数据源的结果来计算自己的值:
private final Observable<AnotherData> anotherDataObservable = getMyDataProvider().getMyDataObservable()
.flatMap(myData -> {
// Call another data source and return the result here
})
public Observable<AnotherData> getAnotherDataObservable() {
return anotherDataObservable;
}
Run Code Online (Sandbox Code Playgroud)
在这里,我的设置开始崩溃.第一个可观测量的多播仅适用于refCount
操作员.在那之后,一切都再次单播.这意味着如果进行两次单独的订阅anotherDataProvider
,flatMap
则将调用operator两次.我看到了两个解决方法,但我不喜欢这两个:
我是android的新手,我有一个场景,我想从多个api获取数据.假设api_a,api_b,api_c,api_d.这些api彼此独立,但我想在混合Recycler View(水平和垂直)中显示来自这些api的数据.所以我想以这样的方式进行这些api调用,以便我可以一次获取每个api数据,以便我可以在回收器视图中显示.我已经使用了改造2,但为此我必须逐个链接它们,这是非常冗长的,我认为这不是一个可行的方法.我对RX JAVA知之甚少,但我只知道如何一次发出一个请求.请帮忙
我使用concatMap
长时间运行的操作一次处理一个项目流。在某些时候,我需要“中断”这个长时间运行的操作,但只针对当前项:
@Test
public void main() throws InterruptedException {
TestObserver<String> test = Observable.just(1, 2, 3, 4, 5)
.concatMap(this::doLongRunningOperation)
.test();
Thread.sleep(10000);
System.out.println("interrupt NOW");
// Now I need to interrupt whichever longRunningOperation in
// progress, but I don't want to interrupt the whole stream.
// In other words, I want to force it to move onto the next
// integer.
}
Observable<String> doLongRunningOperation(final Integer integer) {
return Observable
.just("\tStart working on " + integer,
"\tStill working on " + integer,
"\tAlmost …
Run Code Online (Sandbox Code Playgroud)