在这个博客中,他给出了这个(复制/粘贴以下代码)回调地狱的例子.但是,没有提到如何使用Reactive Extensions消除该问题.
所以这里F3取决于F1完成,F4和F5取决于F2完成.
注意:我目前正试图绕过Rx,所以在问这个问题之前我没有尝试解决这个例子.
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
public class CallbackB {
/**
* Demonstration of nested callbacks which then need to composes their responses together.
* <p>
* Various different approaches for composition can be done but eventually they end up relying upon
* synchronization techniques such as the CountDownLatch used here or converge on callback design
* changes similar to <a href="https://github.com/Netflix/RxJava">Rx</a>.
*/ …Run Code Online (Sandbox Code Playgroud) 我的问题是我无法获得无限的流Retrofit.在我获得初始poll()请求的凭证后 - 我做了初始poll()请求.如果没有更改,则每个poll()请求在25秒内响应,如果有任何更改,则每个poll()请求更早 - 返回changed_data [].每个响应都包含timestamp下一个轮询请求所需的数据 - 我应该在每个poll()响应之后执行新的poll()请求.这是我的代码:
getServerApi().getLongPollServer()
.flatMap(longPollServer -> getLongPollServerApi(longPollServer.getServer()).poll("a_check", Config.LONG_POLLING_SERVER_TIMEOUT, 2, longPollServer.getKey(), longPollServer.getTs(), "")
.take(1)
.flatMap(longPollEnvelope -> getLongPollServerApi(longPollServer.getServer()).poll("a_check", Config.LONG_POLLING_SERVER_TIMEOUT, 2, longPollServer.getKey(), longPollEnvelope.getTs(), "")))
.retry()
.subscribe(longPollEnvelope1 -> {
processUpdates(longPollEnvelope1.getUpdates());
});
Run Code Online (Sandbox Code Playgroud)
我是RxJava的新手,也许我不明白,但我无法获得无限的流.我接到3个电话,然后是onNext和onComplete.
PS也许有更好的解决方案在Android上实现长轮询?
我正在使用RxAndroid进行流操作.在我的实际用例中,我从服务器获取列表(使用Retrofit).我正在使用调度程序在后台线程上完成工作,并在Android UI(主)线程上获得最终发射.
这适用于网络调用,但是我意识到网络调用后我的运算符不使用后台线程,而是在主线程上调用.
myService.fetchSomeIntegersFromServer()
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.filter(integer -> {
System.out.println(Looper.getMainLooper().getThread() == Thread.currentThread());
return true;
})
.subscribe(integer1 -> {});
Run Code Online (Sandbox Code Playgroud)
如何确保所有操作都在后台线程上执行?
我从Android的角度来问这个问题,但这应该适用于RxJava.
作为最佳实践,应我的观点始终处分甚至短暂的Completable,Single,Maybe和终止Observable的Rx类型应该在短期内结束,但是当用户关闭该视图可以仍在执行?我知道当Rx链终止时,它会被丢弃,但这可能会在视图关闭后的某个时候发生.
例如,Single正在执行HTTP GET.调用将完成,但可能是在视图被破坏后,暂时阻止垃圾回收.
如果a CompositeDisposable用于Disposable长期观察收集这些s,我会认为应该注意clear()或Disposable定期删除这些s以防止无限大小的增长CompositeDisposable?
我试图在全局级别的应用程序中处理异常,因此改造会抛出一个错误,我会在一些特定的类中捕获它,并使用逻辑来处理这些错误.
我有一个界面
@POST("/token")
AuthToken refreshToken(@Field("grant_type") String grantType, @Field("refresh_token") String refreshToken);
Run Code Online (Sandbox Code Playgroud)
和可观察的
/**
* Refreshes auth token
*
* @param refreshToken
* @return
*/
public Observable<AuthToken> refreshToken(String refreshToken) {
return Observable.create((Subscriber<? super AuthToken> subscriber) -> {
try {
subscriber.onNext(apiManager.refreshToken(REFRESH_TOKEN, refreshToken));
subscriber.onCompleted();
} catch (Exception e) {
subscriber.onError(e);
}
}).subscribeOn(Schedulers.io());
}
Run Code Online (Sandbox Code Playgroud)
当我从服务器获得401(无效令牌或其他一些网络相关的错误)时,我想刷新令牌并重复其余的呼叫.是否有一种方法可以使用rxjava对所有其余的调用使用某种可观察的全局捕获此错误,处理它并重复抛出它的调用?
现在我使用subject来捕获.subscribe()这样的错误
private static BehaviorSubject errorEvent = BehaviorSubject.create();
public static BehaviorSubject<RetrofitError> getErrorEvent() {
return errorEvent;
}
Run Code Online (Sandbox Code Playgroud)
并在一些电话中
getCurrentUser = userApi.getCurrentUser().observeOn(AndroidSchedulers.mainThread())
.subscribe(
(user) -> {
this.user = user;
},
errorEvent::onNext
);
Run Code Online (Sandbox Code Playgroud)
然后在我的主要活动中,我订阅该行为主题并解析错误 …
在我的上一个项目中,我使用rxJava并且我意识到observable.doOnError('onErrorCallback').subscribe(action)并observable.subscribe(action, 'onErrorCallback')以不同的方式运行.即使是从文档中我也不明白它们之间的区别是什么,以及何时应该使用第一个和第二个变体.
我有以下方法使用otto和发布对UI的响应AsyncTask.
private static void onGetLatestStoryCollectionSuccess(final StoryCollection storyCollection, final Bus bus) {
new AsyncTask<Void, Void, Void>() {
@Override
protected Void doInBackground(Void... params) {
bus.post(new LatestStoryCollectionResponse(storyCollection));
return null;
}
}.execute();
}
Run Code Online (Sandbox Code Playgroud)
我需要帮助将其转换AsyncTask为RxJava使用RxAndroid库.
我在其中一项活动中遇到了一个奇怪的问题.当从拍摄照片/视频回来时,onActivityResult我正在显示一个对话框,让用户为摄像机命名.一旦用户按下OK,我就会发送onNext()一个主题,该主题具有复制文件的请求文件名(并显示进度对话框).
由于某种原因map(),即使我打电话,也总是在主线程上调用执行复制的函数subscribeOn(Schedulers.io()).
@Override
protected void onActivityResult(final int requestCode, int resultCode, Intent intent) {
...
final PublishSubject<String> subject = PublishSubject.create();`
mSubscription = subject
.subscribeOn(Schedulers.io())
.map(new Func1<String, String>() {
@Override
public String call(String fileName) {
Log.I.d(TAG,"map");
return doSomeIOHeavyFuncition();
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<String>() {
@Override
public void call(final String fullPath) {
Log.d(TAG,"onNext");
doSomethingOnUI(fullPath);
subject.onCompleted();
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
...
}
}, new Action0() {
@Override
public void …Run Code Online (Sandbox Code Playgroud) 我有2个API,我想按顺序请求并将它们的数据存储在SQLite中.
首先,我想向API发出请求A并将其数据存储在SQL表中a.然后请求API B并将其数据存储在表中,b并将一些数据存储在表中a_b.存储的数据a_b仅来自请求B.
我怎么能用RxJava做到这一点.我在这里读到了关于使用flatMap的地方,就像这样
apiService.A()
// store in DB here? How? maybe use map()?
.flatMap(modelA -> {
// or maybe store modelA in DB here?
return apiService.B().map(modelB -> {
storeInDB()l // store B here ?
return modelB;
});
});
Run Code Online (Sandbox Code Playgroud)
如果我没有使用lambda函数,这看起来就像普通的嵌套调用一样难看.这是一个更好的方法吗?
随着即将推出的RxJava2版本的一个重要变化是null不再被接受为流元素,即以下代码将抛出异常:Observable.just(null)
老实说,我对这种变化有着复杂的感觉,我的一部分理解它会强制执行干净的API,但是当这可能是一个问题时,我可以看到许多用例.
例如,在我的应用程序中,我有一个内存缓存:
@Nullable CacheItem findCacheItem(long id);
Run Code Online (Sandbox Code Playgroud)
CacheItem可能不存在于缓存中,因此方法可能返回null值.
它与Rx*一起使用的方式如下:
Observable<CacheItem> getStream(final long id) {
return Observable.fromCallable(new Callable<CacheItem>() {
@Override public CacheItem call() throws Exception {
return findCacheItem(id);
}
});
}
Run Code Online (Sandbox Code Playgroud)
所以使用这种方法,我可能在我的流中得到null,这是完全有效的情况,所以它在接收端正确处理 - 假设UI改变其状态,如果项目不在缓存中:
Observable.just(user)
.map(user -> user.getName())
.map(name -> convertNameToId(name))
.flatMap(id -> getStream(id))
.map(cacheItem -> getUserInfoFromCacheItem(cacheItem))
.subscribe(
userInfo -> {
if(userInfo != null) showUserInfo();
else showPrompt();
}
);
Run Code Online (Sandbox Code Playgroud)
使用RxJava2,我不再允许null在流中发布消息,因此我需要将我的CacheItem包装到其他类中,并使我的流生成该包装,或者进行相当大的体系结构更改.
将每个流元素包装成可空的对应元素对我来说并不合适.
我错过了一些基本的东西吗?
看起来像我这样的情况非常受欢迎,所以我很好奇在RxJava2中给出新的"无空"政策时,解决这个问题的建议策略是什么?
编辑 请参阅RxJava GitHub仓库中的后续对话
rx-java ×10
android ×6
java ×3
rx-android ×3
retrofit ×2
dispose ×1
future ×1
long-polling ×1
retrofit2 ×1
rx-java2 ×1