标签: rx-java

使用未收到的消息调试RxJava问题的最佳方法是什么

我有一个Android应用程序,其中有多个A类观察者订阅了几个类型为B的Observable.订阅在IO Scheduler中完成,并在Android主线程上观察.

问题是我在一些工作之后随机发现 B发出的一条消息从未在A中收到过,经过几个小时的重新调整后我无法找到原因.

问题发生时的相关代码:

"NEXT1"和"NEXT2"被打印但"收到","错误","完成"不是.

            //The subscription
            B.getMessate()
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(A);

            //B
            Observable<msg> getMessage() {
                 return Observable.create(new Observable.OnSubscribe<msg>() {    
                      public void call(Subscriber<? super msg> subscriber) {
                         ...
                         subscriber.onNext(msg)
                         println("NEXT1")
                      }
                 }).doOnNext({ (o) -> println("NEXT2")});
            }



            //A 
            onNext(msg) {
                  //Never called when problem happens
                  println("RECEIVED")
            }
            onError(msg) {
                  //Never called when problem happens
                  println("ERROR")
            }
            onError(msg) {
                  //Never called when problem happens
                  println("COMPLETED")
            }
Run Code Online (Sandbox Code Playgroud)

任何人都有一些线索?或任何调试建议?

我检查了什么:

  • 我暂停了应用程序并检查了所有线程以查看是否已锁定.并且所有工作线程都被停放,主线程正在等待android消息队列中的消息.
  • 观察员从不打电话取消订阅()

multithreading android rx-java

17
推荐指数
1
解决办法
9569
查看次数

如何使用RxJava处理分页?

我正在考虑转换我的Android应用程序以使用Rxjava进行网络请求.我目前访问的Web服务类似于:

getUsersByKeyword(String query, int limit, int offset)
Run Code Online (Sandbox Code Playgroud)

据我了解,Observables是一个"推"而不是一个"拉"接口.所以这就是我理解要解决的问题:

  • app注册服务,获取Observable进行查询
  • 结果被推送到应用程序
  • 应用处理结果
  • 当app想要更多结果时......?

这就是我崩溃的地方.以前我只是问网络服务我想要什么,再次使用偏移量进行查询.但在这种情况下,将涉及创建另一个Observable并订阅它,有点打败这一点.

我应该如何在我的应用程序中处理分页?(这是一个Android应用程序,但我不认为这是相关的).

rx-java

17
推荐指数
3
解决办法
6298
查看次数

如何在RxJava中继承Observable?

我知道你应该不惜一切代价避免这种情况,但如果我在RxJava中有一个子类Observable的有效用例怎么办?可能吗?我怎么能这样做?

在这种特定情况下,我有一个"存储库"类,它当前返回请求:

class Request<T> {
    public abstract Object key();
    public abstract Observable<T> asObservable();

    [...]

    public Request<T> transform(Func1<Request<T>, Observable<T>> transformation) {
        Request<T> self = this;
        return new Request<T>() {
             @Override public Object key() { return self.key; }
             @Override public Observable<T> asObservable() { return transformation.call(self); }
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

然后我使用transform方法在需要请求键的上下文中修改响应observable(asObservable)(如缓存):

 service.getItemList() // <- returns a Request<List<Item>>
     .transform(r -> r.asObservable()
             // The activity is the current Activity in Android
             .compose(Operators.ensureThereIsAnAccount(activity))
             // The cache comes last because we don't need auth for cached responses …
Run Code Online (Sandbox Code Playgroud)

rx-java

17
推荐指数
1
解决办法
6507
查看次数

RxJava单个后台线程调度程序

我对RxJava很新,所以这可能是一个愚蠢的问题.我将描述我的情景.

我在UI线程上运行了一些代码,它们将更新一些图像,但这些图像不是很重要,它们在生成它们时消耗了一些资源,所以我想在单个线程上生成它们(当然不是UI线程)和逐个生成它们.我猜蹦床调度程序是我想要的,但我的问题是,如果我使用它然后它在UI线程上工作,我希望它在另一个线程上做.

显然我可以编写自己的线程,我可以在其中对项目进行排队,然后逐个处理这些,但我想RxJava可能会有一个简单的解决方案吗?

我当前的代码如下所示:

Observable<Bitmap> getImage = Observable.create(new Observable.OnSubscribe<Bitmap>() {
    @Override public void call(Subscriber<? super Bitmap> subscriber) {
        Log.w(TAG,"On ui thread? "+ UIUtils.isRunningOnUIThread());
        subscriber.onNext(doComplexTaskToGetImage());
        subscriber.onCompleted();
    }
});

getImage.subscribeOn(Schedulers.trampoline()).subscribe(new Action1<Bitmap>() {
    @Override public void call(Bitmap bitmap) {
        codeToSetTheBitmap(bitmap);
    }
});
Run Code Online (Sandbox Code Playgroud)

我的日志上写着"On ui thread?" 永远都是如此.那么我如何使代码和所有后续尝试做同样的事情按顺序在单个线程(而不是ui线程)上运行而不编写一堆代码来排队工作?

编辑:

我相信现在可以使用Schedulers.single()或者如果你想要你自己可以使用它new SingleScheduler().我还在测试,但是当我发布这个帖子时,我认为它确实符合我的要求.

java android rx-java rx-android

17
推荐指数
2
解决办法
1万
查看次数

在活动生命周期中处理RxJava/RxAndroid中的订阅的正确方法是什么?

我刚刚开始使用RxJava/RxAndroid.我想避免上下文泄漏,所以我创建了一个像这样的BaseFragment:

public abstract class BaseFragment extends Fragment {

    protected CompositeSubscription compositeSubscription = new CompositeSubscription();

    @Override
    public void onDestroy() {
        super.onDestroy();

        compositeSubscription.unsubscribe();
    } 
} 
Run Code Online (Sandbox Code Playgroud)

在我的片段中扩展了BaseFragment,我这样做:

protected void fetchNewerObjects(){
        if(!areNewerObjectsFetching()){ //if it is not already fetching newer objects

            Runtime.getRuntime().gc();//clean out memory if possible

            fetchNewObjectsSubscription = Observable
                .just(new Object1())
                .map(new Func1<Object1, Object2>() {
                    @Override
                    public Object2 call(Object1 obj1) {
                        //do bg stuff
                        return obj2;
                    }
                })
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<Object2>() {
                    @Override
                    public void onCompleted() {
                        compositeSubscription.remove(fetchNewObjectsSubscription);
                        fetchNewObjectsSubscription = null;
                    }

                    @Override
                    public …
Run Code Online (Sandbox Code Playgroud)

java android rx-java rx-android

17
推荐指数
2
解决办法
1万
查看次数

改造2.0 + RxJava +错误JSON正文

我是RxJava和Retrofit的新手,我正在尝试用它编写API调用.所有API调用都返回错误的JSON主体,其格式为,

{"errors":[{"code":100, "message":"Login/Password not valid", "arguments":null}]}
Run Code Online (Sandbox Code Playgroud)

目前我的登录API调用代码(其他也类似)是,

mConnect.login(id, password)
        .subscribe(new Subscriber<Token>() {
            @Override
            public void onCompleted() {
                Log.d(TAG, "onCompleted()");
            }

            @Override
            public void onError(Throwable e) {
                Log.e(TAG, "onError(): " + e);
                if (e instanceof HttpException) {
                  // dump e.response().errorBody()
                }
            }

            @Override
            public void onNext(Token token) {
                Log.d(TAG, "onNext(): " + token);
            }
        });
Run Code Online (Sandbox Code Playgroud)

当我在onError()上收到错误时,我想自动将错误体中的JSON解码为POJO并使用它.有没有办法在一个地方为所有其他API调用执行此操作.任何帮助表示赞赏.

android json rx-java retrofit

17
推荐指数
1
解决办法
4385
查看次数

RxJava和Retrofit2:NetworkOnMainThreadException

我意识到我在MainThread上使用了subscribeOn()/ observeOn().我可以传递给subscribeOn()的选项有哪些?我可以传递给observeOn()的选项有哪些?

12-17 21:36:09.154 20550-20550/rx.test D/MainActivity2: [onCreate]
12-17 21:36:09.231 20550-20550/rx.test D/MainActivity2: starting up observable...
12-17 21:36:09.256 20550-20550/rx.test D/MainActivity2: [onError] 
12-17 21:36:09.256 20550-20550/rx.test W/System.err: android.os.NetworkOnMainThreadException
Run Code Online (Sandbox Code Playgroud)

GovService.java

import java.util.List;
import retrofit.Call;
import retrofit.http.GET;
import rx.Observable;

public interface GovService {
    @GET("/txt2lrn/sat/index_1.json")
    Observable<MyTest> getOneTestRx();
}
Run Code Online (Sandbox Code Playgroud)

MyTest.java

public class MyTest {
    private String name, url;
    private int num;

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getUrl() {
        return url;
    }

    public void setUrl(String url) …
Run Code Online (Sandbox Code Playgroud)

android rx-java retrofit

17
推荐指数
2
解决办法
1万
查看次数

单元测试android应用程序与改造和rxjava

我已经开发了一个使用rxJava改造的Android应用程序,现在我正在尝试使用Mockito设置单元测试,但我不知道如何模拟api响应以创建不做真正的测试电话但有假响应.

例如,我想测试方法syncGenres对我的SplashPresenter工作正常.我的课程如下:

public class SplashPresenterImpl implements SplashPresenter {

private SplashView splashView;

public SplashPresenterImpl(SplashView splashView) {
    this.splashView = splashView;
}

@Override
public void syncGenres() {
    Api.syncGenres(new Subscriber<List<Genre>>() {
        @Override
        public void onError(Throwable e) {
            if(splashView != null) {
                splashView.onError();
            }
        }

        @Override
        public void onNext(List<Genre> genres) {
            SharedPreferencesUtils.setGenres(genres);
            if(splashView != null) {
                splashView.navigateToHome();
            }
        }
    });
}
}
Run Code Online (Sandbox Code Playgroud)

Api类就像:

public class Api {
    ...
    public static Subscription syncGenres(Subscriber<List<Genre>> apiSubscriber) {
        final Observable<List<Genre>> call = ApiClient.getService().syncGenres();
        return call
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(apiSubscriber);
    } …
Run Code Online (Sandbox Code Playgroud)

android mocking mockito rx-java retrofit2

17
推荐指数
1
解决办法
1万
查看次数

用RxJava中的observable替换回调

我使用侦听器作为回调来观察Android的异步操作,但我认为用RxJava替换这个侦听器可能很棒,我是新的使用这个库,但我真的很喜欢它,我总是在Android项目中使用它.

这是我重构的代码:

public void getData( final OnResponseListener listener ){
   if(data!=null && !data.isEmpty()){
       listener.onSuccess();
   }
   else{
       listener.onError();
   }
}
Run Code Online (Sandbox Code Playgroud)

一个简单的回调:

public interface OnResponseListener {
   public void onSuccess();
   public void onError(); 
}
Run Code Online (Sandbox Code Playgroud)

而"观察者":

object.getData( new OnResponseListener() {
    @Override
    public void onSuccess() {
       Log.w(TAG," on success");
    }

    @Override
    public void onError() {
       Log.e(TAG," on error");
    }
});
Run Code Online (Sandbox Code Playgroud)

谢谢!

java rx-java

17
推荐指数
3
解决办法
1万
查看次数

将CompletableFuture <Stream <T >>转换为Publisher <T>是否正确?

允许对结果流进行多次迭代,CompletableFuture<Stream<String>>我正在考虑以下方法之一:

  1. 将生成的未来转换为CompletableFuture<List<String>>:teams.thenApply(st -> st.collect(toList()))

  2. 将生成的未来转换为Flux<String>缓存:Flux.fromStream(teams::join).cache();

Flux<T>Publisher<T>项目反应堆的实施.

使用案例:

我想Stream<String>从一个数据源获得一个具有顶级联赛球队名称的序列(例如),该数据源提供一个League对象Standing[](基于足球数据RESTful API,例如http://api.football-data.org/v1/ soccerseasons/445/leagueTable).使用AsyncHttpClientGson我们有:

CompletableFuture<Stream<String>> teams = asyncHttpClient
    .prepareGet("http://api.football-data.org/v1/soccerseasons/445/leagueTable")
    .execute()
    .toCompletableFuture()
    .thenApply(Response::getResponseBody)
    .thenApply(body -> gson.fromJson(body, League.class));
    .thenApply(l -> stream(l.standings).map(s -> s.teamName));
Run Code Online (Sandbox Code Playgroud)

要重新使用生成的流,我有两个选择:

1. CompletableFuture<List<String>> res = teams.thenApply(st -> st.collect(toList()))

2. Flux<String> res = Flux.fromStream(teams::join).cache()
Run Code Online (Sandbox Code Playgroud)

Flux<T>不那么冗长,并提供我所需要的一切.然而,在这种情况下使用它是否正确?

或者我应该使用CompletableFuture<List<String>>?或者还有其他更好的选择吗?

更新了一些想法(2018-03-16):

CompletableFuture<List<String>>:

  • [PROS] List<String>将继续收集,当我们需要继续处理未来的结果时,可能已经完成了.
  • [CONS]宣言冗长.
  • [CONS]如果我们只想使用它一次,那么我们就不需要收集那些物品了List<T>.

Flux<String> …

java java-8 rx-java project-reactor completable-future

17
推荐指数
1
解决办法
2030
查看次数