mrp*_*l89 6 rest android polling rx-java retrofit
我正在Android上构建一个2人游戏.游戏以转向方式工作,因此玩家1等待玩家2输入,反之亦然.我有一个网络服务器,我使用Slim Framework 运行API .在客户端我使用Retrofit.因此,在客户端,我想每隔X秒轮询一次我的网络服务器(我知道这不是最好的方法)来检查是否有来自玩家2的输入,如果是更改UI(游戏板).
处理Retrofit我遇到了RxJava.我的问题是弄清楚我是否需要使用RxJava?如果是,是否有任何非常简单的例子用于改造轮询?(因为我只发送了几个键/值对)如果不是如何用改造来做呢?
我在这里找到了这个帖子,但它也没有帮助我,因为我仍然不知道我是否需要Retrofit + RxJava,是否有更简单的方法?
dav*_*ola 15
假设你为Retrofit定义的接口包含这样的方法:
public Observable<GameState> loadGameState(@Query("id") String gameId);
Run Code Online (Sandbox Code Playgroud)
可以通过以下三种方式之一定义改造方法:
1.)一个简单的同步:
public GameState loadGameState(@Query("id") String gameId);
Run Code Online (Sandbox Code Playgroud)
2.)采用Callback异步处理的方法:
public void loadGameState(@Query("id") String gameId, Callback<GameState> callback);
Run Code Online (Sandbox Code Playgroud)
3.)和返回rxjava的那个Observable,见上文.我想如果你打算将rtrjit与rxjava一起使用,那么使用这个版本是最有意义的.
这样你就可以直接使用Observable作为单个请求,如下所示:
mApiService.loadGameState(mGameId)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<GameState>() {
@Override
public void onNext(GameState gameState) {
// use the current game state here
}
// onError and onCompleted are also here
});
Run Code Online (Sandbox Code Playgroud)
如果你想使用你能提供反复轮询服务器的"脉搏"使用的版本timer()或interval():
Observable.timer(0, 2000, TimeUnit.MILLISECONDS)
.flatMap(mApiService.loadGameState(mGameId))
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<GameState>() {
@Override
public void onNext(GameState gameState) {
// use the current game state here
}
// onError and onCompleted are also here
}).
Run Code Online (Sandbox Code Playgroud)
重要的是要注意我在flatMap这里使用而不是map- 因为返回值loadGameState(mGameId)本身就是一个Observable.
但是您在更新中使用的版本也应该有效:
Observable.interval(2, TimeUnit.SECONDS, Schedulers.io())
.map(tick -> Api.ReceiveGameTurn())
.doOnError(err -> Log.e("Polling", "Error retrieving messages" + err))
.retry()
.observeOn(AndroidSchedulers.mainThread())
.subscribe(sub);
Run Code Online (Sandbox Code Playgroud)
也就是说,如果ReceiveGameTurn()像我上面的1.)那样同步定义,你可以使用map而不是flatMap.
在这两种情况下onNext,您Subscriber将使用服务器中的最新游戏状态每两秒调用一次.您可以一个接一个地处理它们,通过take(1)之前插入将发射限制为单个项目subscribe().
但是,关于第一个版本:首先会传递一个网络错误onError,然后Observable将停止发出更多项目,使您的订阅者无用而无需输入(请记住,onError只能调用一次).要解决此问题,您可以使用onError*rxjava的任何方法将故障"重定向"到onNext.
例如:
Observable.timer(0, 2000, TimeUnit.MILLISECONDS)
.flatMap(new Func1<Long, Observable<GameState>>(){
@Override
public Observable<GameState> call(Long tick) {
return mApiService.loadGameState(mGameId)
.doOnError(err -> Log.e("Polling", "Error retrieving messages" + err))
.onErrorResumeNext(new Func1<Throwable, Observable<GameState>(){
@Override
public Observable<GameState> call(Throwable throwable) {
return Observable.emtpy());
}
});
}
})
.filter(/* check if it is a valid new game state */)
.take(1)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<GameState>() {
@Override
public void onNext(GameState gameState) {
// use the current game state here
}
// onError and onCompleted are also here
}).
Run Code Online (Sandbox Code Playgroud)
这将每两秒钟:*使用Retrofit从服务器获取当前游戏状态*过滤掉无效的*取第一个有效的*和取消订阅
在一个错误的情况下:*它将打印错误消息doOnNext
*否则忽略错误:onErrorResumeNext将"消费"了onError-活动(即你Subscriber的onError就不会被调用),并没有取代它(Observable.empty()).
并且,关于第二个版本:如果出现网络错误,retry将立即重新订阅该间隔 - 并且由于interval在订阅后立即发出第一个Integer,所以也会立即发送下一个请求 - 而不是在您可能需要的3秒之后. .
最后说明:此外,如果您的游戏状态非常大,您还可以首先轮询服务器以询问是否有新状态可用,并且仅在肯定答案的情况下重新加载新游戏状态.
如果您需要更详细的示例,请询问.
更新:我已经重写了这篇文章的部分内容并在其间添加了更多信息.
更新2:我添加了一个错误处理的完整示例onErrorResumeNext.