我有一个Android应用程序,其中有多个A类观察者订阅了几个类型为B的Observable.订阅在IO Scheduler中完成,并在Android主线程上观察.
问题是我在一些工作之后随机发现 B发出的一条消息从未在A中收到过,经过几个小时的重新调整后我无法找到原因.
问题发生时的相关代码:
"NEXT1"和"NEXT2"被打印但"收到","错误","完成"不是.
//The subscription
B.getMessate()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(A);
//B
Observable<msg> getMessage() {
return Observable.create(new Observable.OnSubscribe<msg>() {
public void call(Subscriber<? super msg> subscriber) {
...
subscriber.onNext(msg)
println("NEXT1")
}
}).doOnNext({ (o) -> println("NEXT2")});
}
//A
onNext(msg) {
//Never called when problem happens
println("RECEIVED")
}
onError(msg) {
//Never called when problem happens
println("ERROR")
}
onError(msg) {
//Never called when problem happens
println("COMPLETED")
}
Run Code Online (Sandbox Code Playgroud)
任何人都有一些线索?或任何调试建议?
我检查了什么:
我正在考虑转换我的Android应用程序以使用Rxjava进行网络请求.我目前访问的Web服务类似于:
getUsersByKeyword(String query, int limit, int offset)
Run Code Online (Sandbox Code Playgroud)
据我了解,Observables是一个"推"而不是一个"拉"接口.所以这就是我理解要解决的问题:
这就是我崩溃的地方.以前我只是问网络服务我想要什么,再次使用偏移量进行查询.但在这种情况下,将涉及创建另一个Observable并订阅它,有点打败这一点.
我应该如何在我的应用程序中处理分页?(这是一个Android应用程序,但我不认为这是相关的).
我知道你应该不惜一切代价避免这种情况,但如果我在RxJava中有一个子类Observable的有效用例怎么办?可能吗?我怎么能这样做?
在这种特定情况下,我有一个"存储库"类,它当前返回请求:
class Request<T> {
public abstract Object key();
public abstract Observable<T> asObservable();
[...]
public Request<T> transform(Func1<Request<T>, Observable<T>> transformation) {
Request<T> self = this;
return new Request<T>() {
@Override public Object key() { return self.key; }
@Override public Observable<T> asObservable() { return transformation.call(self); }
}
}
}
Run Code Online (Sandbox Code Playgroud)
然后我使用transform方法在需要请求键的上下文中修改响应observable(asObservable)(如缓存):
service.getItemList() // <- returns a Request<List<Item>>
.transform(r -> r.asObservable()
// The activity is the current Activity in Android
.compose(Operators.ensureThereIsAnAccount(activity))
// The cache comes last because we don't need auth for cached responses …Run Code Online (Sandbox Code Playgroud) 我对RxJava很新,所以这可能是一个愚蠢的问题.我将描述我的情景.
我在UI线程上运行了一些代码,它们将更新一些图像,但这些图像不是很重要,它们在生成它们时消耗了一些资源,所以我想在单个线程上生成它们(当然不是UI线程)和逐个生成它们.我猜蹦床调度程序是我想要的,但我的问题是,如果我使用它然后它在UI线程上工作,我希望它在另一个线程上做.
显然我可以编写自己的线程,我可以在其中对项目进行排队,然后逐个处理这些,但我想RxJava可能会有一个简单的解决方案吗?
我当前的代码如下所示:
Observable<Bitmap> getImage = Observable.create(new Observable.OnSubscribe<Bitmap>() {
@Override public void call(Subscriber<? super Bitmap> subscriber) {
Log.w(TAG,"On ui thread? "+ UIUtils.isRunningOnUIThread());
subscriber.onNext(doComplexTaskToGetImage());
subscriber.onCompleted();
}
});
getImage.subscribeOn(Schedulers.trampoline()).subscribe(new Action1<Bitmap>() {
@Override public void call(Bitmap bitmap) {
codeToSetTheBitmap(bitmap);
}
});
Run Code Online (Sandbox Code Playgroud)
我的日志上写着"On ui thread?" 永远都是如此.那么我如何使代码和所有后续尝试做同样的事情按顺序在单个线程(而不是ui线程)上运行而不编写一堆代码来排队工作?
编辑:
我相信现在可以使用Schedulers.single()或者如果你想要你自己可以使用它new SingleScheduler().我还在测试,但是当我发布这个帖子时,我认为它确实符合我的要求.
我刚刚开始使用RxJava/RxAndroid.我想避免上下文泄漏,所以我创建了一个像这样的BaseFragment:
public abstract class BaseFragment extends Fragment {
protected CompositeSubscription compositeSubscription = new CompositeSubscription();
@Override
public void onDestroy() {
super.onDestroy();
compositeSubscription.unsubscribe();
}
}
Run Code Online (Sandbox Code Playgroud)
在我的片段中扩展了BaseFragment,我这样做:
protected void fetchNewerObjects(){
if(!areNewerObjectsFetching()){ //if it is not already fetching newer objects
Runtime.getRuntime().gc();//clean out memory if possible
fetchNewObjectsSubscription = Observable
.just(new Object1())
.map(new Func1<Object1, Object2>() {
@Override
public Object2 call(Object1 obj1) {
//do bg stuff
return obj2;
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Object2>() {
@Override
public void onCompleted() {
compositeSubscription.remove(fetchNewObjectsSubscription);
fetchNewObjectsSubscription = null;
}
@Override
public …Run Code Online (Sandbox Code Playgroud) 我是RxJava和Retrofit的新手,我正在尝试用它编写API调用.所有API调用都返回错误的JSON主体,其格式为,
{"errors":[{"code":100, "message":"Login/Password not valid", "arguments":null}]}
Run Code Online (Sandbox Code Playgroud)
目前我的登录API调用代码(其他也类似)是,
mConnect.login(id, password)
.subscribe(new Subscriber<Token>() {
@Override
public void onCompleted() {
Log.d(TAG, "onCompleted()");
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError(): " + e);
if (e instanceof HttpException) {
// dump e.response().errorBody()
}
}
@Override
public void onNext(Token token) {
Log.d(TAG, "onNext(): " + token);
}
});
Run Code Online (Sandbox Code Playgroud)
当我在onError()上收到错误时,我想自动将错误体中的JSON解码为POJO并使用它.有没有办法在一个地方为所有其他API调用执行此操作.任何帮助表示赞赏.
我意识到我在MainThread上使用了subscribeOn()/ observeOn().我可以传递给subscribeOn()的选项有哪些?我可以传递给observeOn()的选项有哪些?
12-17 21:36:09.154 20550-20550/rx.test D/MainActivity2: [onCreate]
12-17 21:36:09.231 20550-20550/rx.test D/MainActivity2: starting up observable...
12-17 21:36:09.256 20550-20550/rx.test D/MainActivity2: [onError]
12-17 21:36:09.256 20550-20550/rx.test W/System.err: android.os.NetworkOnMainThreadException
Run Code Online (Sandbox Code Playgroud)
GovService.java
import java.util.List;
import retrofit.Call;
import retrofit.http.GET;
import rx.Observable;
public interface GovService {
@GET("/txt2lrn/sat/index_1.json")
Observable<MyTest> getOneTestRx();
}
Run Code Online (Sandbox Code Playgroud)
MyTest.java
public class MyTest {
private String name, url;
private int num;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getUrl() {
return url;
}
public void setUrl(String url) …Run Code Online (Sandbox Code Playgroud) 我已经开发了一个使用rxJava改造的Android应用程序,现在我正在尝试使用Mockito设置单元测试,但我不知道如何模拟api响应以创建不做真正的测试电话但有假响应.
例如,我想测试方法syncGenres对我的SplashPresenter工作正常.我的课程如下:
public class SplashPresenterImpl implements SplashPresenter {
private SplashView splashView;
public SplashPresenterImpl(SplashView splashView) {
this.splashView = splashView;
}
@Override
public void syncGenres() {
Api.syncGenres(new Subscriber<List<Genre>>() {
@Override
public void onError(Throwable e) {
if(splashView != null) {
splashView.onError();
}
}
@Override
public void onNext(List<Genre> genres) {
SharedPreferencesUtils.setGenres(genres);
if(splashView != null) {
splashView.navigateToHome();
}
}
});
}
}
Run Code Online (Sandbox Code Playgroud)
Api类就像:
public class Api {
...
public static Subscription syncGenres(Subscriber<List<Genre>> apiSubscriber) {
final Observable<List<Genre>> call = ApiClient.getService().syncGenres();
return call
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(apiSubscriber);
} …Run Code Online (Sandbox Code Playgroud) 我使用侦听器作为回调来观察Android的异步操作,但我认为用RxJava替换这个侦听器可能很棒,我是新的使用这个库,但我真的很喜欢它,我总是在Android项目中使用它.
这是我重构的代码:
public void getData( final OnResponseListener listener ){
if(data!=null && !data.isEmpty()){
listener.onSuccess();
}
else{
listener.onError();
}
}
Run Code Online (Sandbox Code Playgroud)
一个简单的回调:
public interface OnResponseListener {
public void onSuccess();
public void onError();
}
Run Code Online (Sandbox Code Playgroud)
而"观察者":
object.getData( new OnResponseListener() {
@Override
public void onSuccess() {
Log.w(TAG," on success");
}
@Override
public void onError() {
Log.e(TAG," on error");
}
});
Run Code Online (Sandbox Code Playgroud)
谢谢!
允许对结果流进行多次迭代,CompletableFuture<Stream<String>>我正在考虑以下方法之一:
将生成的未来转换为CompletableFuture<List<String>>:teams.thenApply(st -> st.collect(toList()))
将生成的未来转换为Flux<String>缓存:Flux.fromStream(teams::join).cache();
Flux<T>是Publisher<T>项目反应堆的实施.
使用案例:
我想Stream<String>从一个数据源获得一个具有顶级联赛球队名称的序列(例如),该数据源提供一个League对象Standing[](基于足球数据RESTful API,例如http://api.football-data.org/v1/ soccerseasons/445/leagueTable).使用AsyncHttpClient和Gson我们有:
CompletableFuture<Stream<String>> teams = asyncHttpClient
.prepareGet("http://api.football-data.org/v1/soccerseasons/445/leagueTable")
.execute()
.toCompletableFuture()
.thenApply(Response::getResponseBody)
.thenApply(body -> gson.fromJson(body, League.class));
.thenApply(l -> stream(l.standings).map(s -> s.teamName));
Run Code Online (Sandbox Code Playgroud)
要重新使用生成的流,我有两个选择:
1. CompletableFuture<List<String>> res = teams.thenApply(st -> st.collect(toList()))
2. Flux<String> res = Flux.fromStream(teams::join).cache()
Run Code Online (Sandbox Code Playgroud)
Flux<T>不那么冗长,并提供我所需要的一切.然而,在这种情况下使用它是否正确?
或者我应该使用CompletableFuture<List<String>>?或者还有其他更好的选择吗?
更新了一些想法(2018-03-16):
CompletableFuture<List<String>>:
List<String>将继续收集,当我们需要继续处理未来的结果时,可能已经完成了.List<T>.Flux<String> …