我看到许多框架/库声称他们可以帮助用Java构建反应式应用程序,例如:Akka,Vert.x,RxJava,Reactor,QBit等.
他们似乎有不同的方法,功能,优点,缺点等.我找不到详细的比较.有关于这些框架中的每一个的文档,但我不能理解这些差异.
主要Java反应框架之间有什么区别?
什么是可以推动Java反应式框架选择的应用程序需求?
感谢您的时间.
我正在学习观察者模式,我希望我的观察者在改变它的值并做一些操作时跟踪某个变量,我做了类似的事情:
public class Test extends MyChildActivity {
private int VARIABLE_TO_OBSERVE = 0;
Observable<Integer> mObservable = Observable.just(VARIABLE_TO_OBSERVE);
protected void onCreate() {/*onCreate method*/
super();
setContentView();
method();
changeVariable();
}
public void changeVariable() {
VARIABLE_TO_OBSERVE = 1;
}
public void method() {
mObservable.map(value -> {
if (value == 1) doMethod2();
return String.valueOf(value);
}).subScribe(string -> System.out.println(string));
}
public void doMethod2() {/*Do additional operations*/}
}
Run Code Online (Sandbox Code Playgroud)
但doMethod2()不会被调用
我是一名Android学员.我想学习RxJava.我的问题是"RxJava中的CompositeDisposable是什么?".请详细说明.
请考虑以下示例:
Observable.range(1, 10).subscribe(i -> {
System.out.println(i);
if (i == 5) {
throw new RuntimeException("oops!");
}
}, Throwable::printStackTrace);
Run Code Online (Sandbox Code Playgroud)
这将输出1到5之间的数字,然后打印异常.
我想要实现的是让观察者保持订阅并在抛出异常后继续运行,即打印从1到10的所有数字.
我已经尝试过使用retry()和其他各种错误处理操作符,但是,如文档中所述,它们的目的是处理observable本身发出的错误.
最简单的解决方案就是将整个主体包装onNext成一个try-catch块,但这对我来说听起来不是一个好方法.在类似的Rx.NET问题中,建议的解决方案是创建一个扩展方法,通过创建代理可观察来进行包装.我试图重拍它:
Observable<Integer> origin = Observable.range(1, 10);
Observable<Integer> proxy = Observable.create((Observable.OnSubscribe<Integer>) s ->
origin.subscribe(i -> {try { s.onNext(i); } catch (Exception ignored) {}}, s::onError, s::onCompleted));
proxy.subscribe(i -> {
System.out.println(i);
if (i == 5) {
throw new RuntimeException("oops!");
}
}, Throwable::printStackTrace);
Run Code Online (Sandbox Code Playgroud)
这不会改变任何东西,因为RxJava本身将订阅者包装成一个SafeSubscriber.使用unsafeSubscribe它来解决它似乎也不是一个好的解决方案.
我该怎么做才能解决这个问题?
我正在获取设备上已安装应用的列表.这是一个代价高昂的操作,所以我正在使用Rx:
Observable<List> observable = Observable.create(subscriber -> {
List result = getUserApps();
subscriber.onNext(result);
subscriber.onError(new Throwable());
subscriber.onCompleted();
});
observable
.map(s -> {
ArrayList<String> list = new ArrayList<>();
ArrayList<Application> applist = new ArrayList<>();
for (Application p : (ArrayList<Application>) s) {
list.add(p.getAppName());
applist.add(p);
}
return applist;
})
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.doOnError(throwable -> L.e(TAG, "Throwable " + throwable.getMessage()))
.subscribe(s -> createListView(s, view));
Run Code Online (Sandbox Code Playgroud)
但是,我的问题是处理错误.通常,用户启动此屏幕,等待加载应用程序,选择最佳选择并转到下一页.但是,当用户快速更改UI时 - 应用程序与NullPointer崩溃.
好的,所以我实现了这个onError.但是它仍然无效,并且在上面的用例中它会抛出这个:
04-15 18:12:42.530 22388-22388/pl.digitalvirgo.safemob E/AndroidRuntime? FATAL EXCEPTION: main
java.lang.IllegalStateException: Exception thrown on Scheduler.Worker thread. Add `onError` handling.
at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:52) …Run Code Online (Sandbox Code Playgroud) 我有一个Observable<<List<Foo>> getFoo()从Retrofit服务创建的,在调用该
.getFoo()方法后,我需要与多个订阅者共享它..share()但是,调用该方法会导致重新执行网络呼叫.重播运算符也不起作用.我知道可能有一个潜在的解决方案.cache(),但我不知道为什么会出现这种行为.
// Create an instance of our GitHub API interface.
Retrofit retrofit = new Retrofit.Builder()
.baseUrl(API_URL)
.addConverterFactory(GsonConverterFactory.create())
.addCallAdapterFactory(RxJavaCallAdapterFactory.create())
.build();
// Create a call instance for looking up Retrofit contributors.
Observable<List<Contributor>> testObservable = retrofit
.create(GitHub.class)
.contributors("square", "retrofit")
.share();
Subscription subscription1 = testObservable
.subscribe(new Subscriber<List<Contributor>>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onNext(List<Contributor> contributors) {
System.out.println(contributors);
}
});
Subscription subscription2 = testObservable
.subscribe(new …Run Code Online (Sandbox Code Playgroud) 如果一个observable完成了,我是否还必须取消订阅/处置(在RxJava2中)observable以删除Observer(防止内存泄漏),或者一旦发生onComplete或onError事件发生,这是由RxJava内部处理的吗?
对其他类型的像什么Single,Completable,Flowable等.
它们之间有什么相似之处和不同之处,看起来Java Parallel Stream有一些RXJava中可用的元素,是吗?
我在Android应用程序上使用Retrofit + RxJava,并且问自己如何处理链接调用的API分页,直到检索到所有数据.是这样的:
Observable<ApiResponse> getResults(@Query("page") int page);
Run Code Online (Sandbox Code Playgroud)
所述ApiResponse对象具有简单的结构:
class ApiResponse {
int current;
Integer next;
List<ResponseObject> results;
}
Run Code Online (Sandbox Code Playgroud)
API将返回下一个值,直到最后一页为止.
有一些很好的方法来实现这一目标吗?试图结合一些flatMaps(),但没有成功.
我想一个接一个地执行2个网络呼叫.两个网络调用都返回Observable.第二呼叫从所述第一呼叫的成功的结果使用数据,在第二个呼叫的成功的结果从方法使用数据两者第二呼叫的所述第一的成功的结果和.此外,我应该能够处理这两个 onError的"事件"是不同的.我怎样才能避免回调地狱,如下例所示:
API().auth(email, password)
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<AuthResponse>() {
@Override
public void call(final AuthResponse authResponse) {
API().getUser(authResponse.getAccessToken())
.subscribe(new Action1<List<User>>() {
@Override
public void call(List<User> users) {
doSomething(authResponse, users);
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
onErrorGetUser();
}
});
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
onErrorAuth();
}
});
Run Code Online (Sandbox Code Playgroud)
我知道zip,但我想避免创建"Combiner类".
更新1. 试图实现akarnokd的答案:
API()
.auth(email, password)
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.flatMap(authResponse -> API()
.getUser(authResponse.getAccessToken())
.doOnError(throwable -> {
getView().setError(processFail(throwable));
}), ((authResponse, users) …Run Code Online (Sandbox Code Playgroud)