tic*_*fab 14 android rx-java retrofit retrofit2
我可以使用Retrofit + RxJava来聆听无尽的流吗?例如Twitter流.我有的是这个:
public interface MeetupAPI {
@GET("http://stream.meetup.com/2/rsvps/")
Observable<RSVP> getRSVPs();
}
MeetupAPI api = new Retrofit.Builder()
.baseUrl(MeetupAPI.RSVP_API)
.addCallAdapterFactory(RxJavaCallAdapterFactory.create())
.addConverterFactory(GsonConverterFactory.create())
.build()
.create(MeetupAPI.class);
api.getRSVPs()
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(rsvp -> Log.d(TAG, "got rsvp"),
error -> Log.d(TAG, "error: " + error),
() -> Log.d(TAG, "onComplete"));
Run Code Online (Sandbox Code Playgroud)
但在解析第一个对象后调用"onComplete".有没有办法告诉Retrofit保持开放直到另行通知?
zel*_*lla 17
我的解决方案:
您可以使用@Streaming批注:
public interface ITwitterAPI {
@GET("/2/rsvps")
@Streaming
Observable<ResponseBody> twitterStream();
}
ITwitterAPI api = new Retrofit.Builder()
.baseUrl("http://stream.meetup.com")
.addCallAdapterFactory(RxJavaCallAdapterFactory.create())
.build().create(ITwitterAPI.class);
Run Code Online (Sandbox Code Playgroud)
随着@Streaming我们可以得到原始输入从ResponseBody.
这里我的函数用于通过事件将行换行:
public static Observable<String> events(BufferedSource source) {
return Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
try {
while (!source.exhausted()) {
subscriber.onNext(source.readUtf8Line());
}
subscriber.onCompleted();
} catch (IOException e) {
e.printStackTrace();
subscriber.onError(e);
}
}
});
}
Run Code Online (Sandbox Code Playgroud)
结果用法:
api.twitterStream()
.flatMap(responseBody -> events(responseBody.source()))
.subscribe(System.out::println);
Run Code Online (Sandbox Code Playgroud)
关于优雅地停止
当我们取消订阅时,改造将关闭输入流.但是不可能从输入流本身检测输入流是否关闭,所以只有这样 - 尝试从流中读取 - 我们得到Socket closed消息异常.我们可以将此异常解释为结束:
@Override
public void call(Subscriber<? super String> subscriber) {
boolean isCompleted = false;
try {
while (!source.exhausted()) {
subscriber.onNext(source.readUtf8Line());
}
} catch (IOException e) {
if (e.getMessage().equals("Socket closed")) {
isCompleted = true;
subscriber.onCompleted();
} else {
throw new UncheckedIOException(e);
}
}
//if response end we get here
if (!isCompleted) {
subscriber.onCompleted();
}
}
Run Code Online (Sandbox Code Playgroud)
如果连接因响应结束而关闭,我们没有任何例外.在这里isCompleted检查.如果我错了,请告诉我:)
| 归档时间: |
|
| 查看次数: |
4203 次 |
| 最近记录: |