标签: rx-java

Java反应框架的比较

我看到许多框架/库声称他们可以帮助用Java构建反应式应用程序,例如:Akka,Vert.x,RxJava,Reactor,QBit等.

他们似乎有不同的方法,功能,优点,缺点等.我找不到详细的比较.有关于这些框架中的每一个的文档,但我不能理解这些差异.

主要Java反应框架之间有什么区别?

什么是可以推动Java反应式框架选择的应用程序需求?

感谢您的时间.

java reactive-programming akka rx-java project-reactor

29
推荐指数
1
解决办法
1万
查看次数

Android Rxjava订阅了变量

我正在学习观察者模式,我希望我的观察者在改变它的值并做一些操作时跟踪某个变量,我做了类似的事情:

public class Test extends MyChildActivity {

   private int VARIABLE_TO_OBSERVE = 0;

   Observable<Integer> mObservable = Observable.just(VARIABLE_TO_OBSERVE);  

   protected void onCreate() {/*onCreate method*/
       super();
       setContentView();
       method();
       changeVariable();
   }

   public void changeVariable() {
       VARIABLE_TO_OBSERVE = 1;
   }

   public void method() {
       mObservable.map(value -> {
            if (value == 1) doMethod2();
            return String.valueOf(value);
       }).subScribe(string -> System.out.println(string));
   }

   public void doMethod2() {/*Do additional operations*/}

}
Run Code Online (Sandbox Code Playgroud)

但doMethod2()不会被调用

android observer-pattern rx-java

28
推荐指数
3
解决办法
3万
查看次数

什么是RxJava中的CompositeDisposable

我是一名Android学员.我想学习RxJava.我的问题是"RxJava中的CompositeDisposable是什么?".请详细说明.

android rx-java

28
推荐指数
3
解决办法
9960
查看次数

如何处理RxJava中观察者的onNext引发的异常?

请考虑以下示例:

Observable.range(1, 10).subscribe(i -> {
    System.out.println(i);

    if (i == 5) {
        throw new RuntimeException("oops!");
    }
}, Throwable::printStackTrace);
Run Code Online (Sandbox Code Playgroud)

这将输出1到5之间的数字,然后打印异常.

我想要实现的是让观察者保持订阅并在抛出异常后继续运行,即打印从1到10的所有数字.

我已经尝试过使用retry()其他各种错误处理操作符,但是,如文档中所述,它们的目的是处理observable本身发出的错误.

最简单的解决方案就是将整个主体包装onNext成一个try-catch块,但这对我来说听起来不是一个好方法.在类似的Rx.NET问题中,建议的解决方案是创建一个扩展方法,通过创建代理可观察来进行包装.我试图重拍它:

Observable<Integer> origin = Observable.range(1, 10);
Observable<Integer> proxy = Observable.create((Observable.OnSubscribe<Integer>) s ->
        origin.subscribe(i -> {try { s.onNext(i); } catch (Exception ignored) {}}, s::onError, s::onCompleted));

proxy.subscribe(i -> {
    System.out.println(i);

    if (i == 5) {
        throw new RuntimeException("oops!");
    }
}, Throwable::printStackTrace);
Run Code Online (Sandbox Code Playgroud)

这不会改变任何东西,因为RxJava本身将订阅者包装成一个SafeSubscriber.使用unsafeSubscribe它来解决它似乎也不是一个好的解决方案.

我该怎么做才能解决这个问题?

java reactive-programming system.reactive rx-java

27
推荐指数
1
解决办法
6656
查看次数

如何正确处理RxJava(Android)中的onError?

我正在获取设备上已安装应用的列表.这是一个代价高昂的操作,所以我正在使用Rx:

    Observable<List> observable = Observable.create(subscriber -> {
        List result = getUserApps();

        subscriber.onNext(result);
        subscriber.onError(new Throwable());
        subscriber.onCompleted();
    });

    observable
            .map(s -> {
                ArrayList<String> list = new ArrayList<>();
                ArrayList<Application> applist = new ArrayList<>();
                for (Application p : (ArrayList<Application>) s) {
                    list.add(p.getAppName());
                    applist.add(p);
                }
                return applist;
            })
            .subscribeOn(Schedulers.newThread())
            .observeOn(AndroidSchedulers.mainThread())
            .doOnError(throwable -> L.e(TAG, "Throwable " + throwable.getMessage()))
            .subscribe(s -> createListView(s, view));
Run Code Online (Sandbox Code Playgroud)

但是,我的问题是处理错误.通常,用户启动此屏幕,等待加载应用程序,选择最佳选择并转到下一页.但是,当用户快速更改UI时 - 应用程序与NullPointer崩溃.

好的,所以我实现了这个onError.但是它仍然无效,并且在上面的用例中它会抛出这个:

    04-15 18:12:42.530  22388-22388/pl.digitalvirgo.safemob E/AndroidRuntime? FATAL EXCEPTION: main
        java.lang.IllegalStateException: Exception thrown on Scheduler.Worker thread. Add `onError` handling.
                at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:52) …
Run Code Online (Sandbox Code Playgroud)

android reactive-programming rx-java

27
推荐指数
3
解决办法
2万
查看次数

具有多个订户的单个可观察者

我有一个Observable<<List<Foo>> getFoo()从Retrofit服务创建的,在调用该 .getFoo()方法后,我需要与多个订阅者共享它..share()但是,调用该方法会导致重新执行网络呼叫.重播运算符也不起作用.我知道可能有一个潜在的解决方案.cache(),但我不知道为什么会出现这种行为.

// Create an instance of our GitHub API interface.
Retrofit retrofit = new Retrofit.Builder()
            .baseUrl(API_URL)
            .addConverterFactory(GsonConverterFactory.create())
            .addCallAdapterFactory(RxJavaCallAdapterFactory.create())
            .build();

// Create a call instance for looking up Retrofit contributors.
Observable<List<Contributor>> testObservable = retrofit
        .create(GitHub.class)
        .contributors("square", "retrofit")
        .share();

Subscription subscription1 = testObservable
       .subscribe(new Subscriber<List<Contributor>>() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable throwable) {

            }

            @Override
            public void onNext(List<Contributor> contributors) {
                System.out.println(contributors);
            }
         });

Subscription subscription2 = testObservable
        .subscribe(new …
Run Code Online (Sandbox Code Playgroud)

android rx-java retrofit

27
推荐指数
2
解决办法
1万
查看次数

我是否必须取消订阅已完成的观察?

如果一个observable完成了,我是否还必须取消订阅/处置(在RxJava2中)observable以删除Observer(防止内存泄漏),或者一旦发生onCompleteonError事件发生,这是由RxJava内部处理的吗?

对其他类型的像什么Single,Completable,Flowable等.

java rx-java rx-java2

27
推荐指数
2
解决办法
5858
查看次数

RxJava与Java 8并行流

它们之间有什么相似之处和不同之处,看起来Java Parallel Stream有一些RXJava中可用的元素,是吗?

java parallel-processing rx-java java-stream

26
推荐指数
1
解决办法
4972
查看次数

使用RxJava处理分页

我在Android应用程序上使用Retrofit + RxJava,并且问自己如何处理链接调用的API分页,直到检索到所有数据.是这样的:

Observable<ApiResponse> getResults(@Query("page") int page);
Run Code Online (Sandbox Code Playgroud)

所述ApiResponse对象具有简单的结构:

class ApiResponse {
    int current;
    Integer next;
    List<ResponseObject> results;
}
Run Code Online (Sandbox Code Playgroud)

API将返回下一个值,直到最后一页为止.

有一些很好的方法来实现这一目标吗?试图结合一些flatMaps(),但没有成功.

pagination android rx-java retrofit

26
推荐指数
1
解决办法
1万
查看次数

使用RxJava连锁两个改造的可观测量

我想一个接一个地执行2个网络呼叫.两个网络调用都返回Observable.第二呼叫从所述第一呼叫的成功的结果使用数据,在第二个呼叫的成功的结果从方法使用数据两者第二呼叫的所述第一的成功的结果和.此外,我应该能够处理这两个 onError的"事件"是不同的.我怎样才能避免回调地狱,如下例所示:

       API().auth(email, password)
            .subscribeOn(Schedulers.newThread())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Action1<AuthResponse>() {
                @Override
                public void call(final AuthResponse authResponse) {
                    API().getUser(authResponse.getAccessToken())
                            .subscribe(new Action1<List<User>>() {
                                @Override
                                public void call(List<User> users) {
                                    doSomething(authResponse, users);
                                }
                            }, new Action1<Throwable>() {
                                @Override
                                public void call(Throwable throwable) {
                                    onErrorGetUser();
                                }
                            });
                }
            }, new Action1<Throwable>() {
                @Override
                public void call(Throwable throwable) {
                    onErrorAuth();
                }
            });
Run Code Online (Sandbox Code Playgroud)

我知道zip,但我想避免创建"Combiner类".

更新1. 试图实现akarnokd的答案:

         API()
            .auth(email, password)
            .subscribeOn(Schedulers.newThread())
            .observeOn(AndroidSchedulers.mainThread())
            .flatMap(authResponse -> API()
                    .getUser(authResponse.getAccessToken())
                    .doOnError(throwable -> {
                        getView().setError(processFail(throwable));
                    }), ((authResponse, users) …
Run Code Online (Sandbox Code Playgroud)

java android reactive-programming rx-java retrofit

26
推荐指数
2
解决办法
1万
查看次数