是否有可能在rxjava中获得两个Observable的笛卡尔积?
像这样的东西:
A -> 1,2,3
B -> a,b
A x B -> (1, a), (1, b), (2, a), (2, b), (3, a), (3, b)
Run Code Online (Sandbox Code Playgroud) 我正在使用Retrofit为我的异步网络调用返回rxjava Observable.
我发现自己重复以下调用:
someApiCall().subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread())
好像我总是订阅IO线程并观察Android主线程.这似乎是我发现的所有资源的最佳实践.也许除了长时间运行的计算之外,我不太明白何时我们想要偏离这种模式.
有没有办法通过默认subscribeOn和observeOn线程来删除这个样板?
这是rxjava插件的用例吗?(我找不到很多使用它们的例子.)
我可以通过搞乱改造执行器来设置网络边界的默认线程吗?
我连续两个observable来显示缓存中的数据,然后开始从网络加载数据并显示更新的数据.
Observable.concat(
getContentFromCache.subscribeOn(dbScheduler),
getContentFromNetwork.subscibeOn(networkScheduler)
).observeOn(AndroidSchedulers.mainThread())
.subscribe(subscriber);
Run Code Online (Sandbox Code Playgroud)
如果没有网络连接,则在调用OnSubscribe后第二个observable会立即失败.
如果第二个observable立即失败,则第一个observable的数据将丢失.永远不会在订阅者中调用onNext方法.
我想,这可能是由于OperatorConcat.ConcatSubscriber中的以下代码造成的
@Override
public void onNext(Observable<? extends T> t) {
queue.add(nl.next(t));
if (WIP_UPDATER.getAndIncrement(this) == 0) {
subscribeNext();
}
}
@Override
public void onError(Throwable e) {
child.onError(e);
unsubscribe();
}
Run Code Online (Sandbox Code Playgroud)
看起来在收到错误后它取消订阅,并且所有挂起的onNext都将丢失.
解决问题的最佳方法是什么?
更新
看起来我找到了解决方案,而不是为连接的observable设置observOn我为每个observable设置了observOn.
Observable.concat(
getContentFromCache.subscribeOn(dbScheduler).observeOn(AndroidSchedulers.mainThread()),
getContentFromNetwork.subscibeOn(networkScheduler).observeOn(AndroidSchedulers.mainThread())
)
.subscribe(subscriber);
Run Code Online (Sandbox Code Playgroud) 在我的Android项目中,我使用realm作为我的数据存储引擎.我喜欢它!
我也使用RxJava,因为它使"线程化"变得如此简单,我真的很喜欢整个"反应性思维".我喜欢它!
我使用MVP模式+一些"清洁架构"的想法来构建我的应用程序.
我Interactors是唯一知道的人Realm.我在Observable的帮助下公开数据,如下所示:
@Override
public Observable<City> getHomeTown() {
final Realm realm = Realm.getDefaultInstance();
return realm.where(City.class).equalTo("name", "Cluj-Napoca").findAllAsync().asObservable()
.doOnUnsubscribe(new Action0() {
@Override
public void call() {
realm.close();
}
})
.compose(new NullIfNoRealmObject<City>());
}
Run Code Online (Sandbox Code Playgroud)
问题是我doOnUnsubscribe的副作用被调用之前Realm可以做它的事情,处理暴露的observable:
Caused by: java.lang.IllegalStateException: This Realm instance has already been closed, making it unusable.
at io.realm.BaseRealm.checkIfValid(BaseRealm.java:344)
at io.realm.RealmResults.removeChangeListener(RealmResults.java:818)
at io.realm.rx.RealmObservableFactory$3$2.call(RealmObservableFactory.java:137)
at rx.subscriptions.BooleanSubscription.unsubscribe(BooleanSubscription.java:71)
at rx.internal.util.SubscriptionList.unsubscribeFromAll(SubscriptionList.java:124)
at rx.internal.util.SubscriptionList.unsubscribe(SubscriptionList.java:113)
at rx.Subscriber.unsubscribe(Subscriber.java:98)
at rx.internal.util.SubscriptionList.unsubscribeFromAll(SubscriptionList.java:124)
at rx.internal.util.SubscriptionList.unsubscribe(SubscriptionList.java:113)
at rx.Subscriber.unsubscribe(Subscriber.java:98)
at rx.subscriptions.CompositeSubscription.unsubscribeFromAll(CompositeSubscription.java:150)
at …Run Code Online (Sandbox Code Playgroud) 我正在寻找一种干净的方式来在线创建可破坏的对象.kotlin.Pair并kotlin.Triple涵盖了很多用例,但有时需要传递更多的对象.
一个示例用例是RX的zip函数,其中几个I/O调用的结果需要映射到另一个对象:
Single
.zip(repositoryA.loadData(someId),
repositoryB.loadData(someId),
repositoryC.loadAll(),
repositoryD.loadAll()),
{ objectA, objectB, objectsC, objectsD -> /*some Kotlin magic*/ }
)
.map { (objectA, objectB, objectsC, objectsD) -> /*do the mapping*/ }
Run Code Online (Sandbox Code Playgroud)
我想弄清楚"一些Kotlin魔法"部分会发生什么.如果只有3个存储库,那就是
Triple(objectA, objectB, objectsC)
Run Code Online (Sandbox Code Playgroud)
我是否需要为此创建一个新的数据类,以及任何n元组的情况,还是有另一种方法?
Observable类中的hide方法用于什么?我阅读了该文档,但仍然不知道它的用途是什么,我看到很多人都使用它
hide()
Hides the identity of this Observable and its Disposable.
Run Code Online (Sandbox Code Playgroud)
http://reactivex.io/RxJava/javadoc/io/reactivex/Observable.html
我们应该何时使用这种方法?
我有一个使用Androids ViewModel类和导航组件的应用程序,用于在片段之间导航.我如何处理ViewModel的导航?我正在使用RxJava,我正在考虑让Fragments监听导航事件,然后以这种方式触发导航.处理这个问题的正常方法是什么?如果有帮助,我也使用Dagger进行依赖注入.
android mvvm rx-java android-viewmodel android-architecture-navigation
我有两种方法.
主要方法:
@PostMapping("/login")
public Mono<ResponseEntity<ApiResponseLogin>> loginUser(@RequestBody final LoginUser loginUser) {
return socialService.verifyAccount(loginUser)
.flatMap(socialAccountIsValid -> {
if (socialAccountIsValid) {
return this.userService.getUserByEmail(loginUser.getEmail())
.switchIfEmpty(insertUser(loginUser))
.flatMap(foundUser -> updateUser(loginUser, foundUser))
.map(savedUser -> {
String jwts = jwt.createJwts(savedUser.get_id(), savedUser.getFirstName(), "user");
return new ResponseEntity<>(HttpStatus.OK);
});
} else {
return Mono.just(new ResponseEntity<>(HttpStatus.UNAUTHORIZED));
}
});
}
Run Code Online (Sandbox Code Playgroud)
这个被调用的方法(该服务调用外部api):
public Mono<User> getUserByEmail(String email) {
UriComponentsBuilder builder = UriComponentsBuilder
.fromHttpUrl(USER_API_BASE_URI)
.queryParam("email", email);
return this.webClient.get()
.uri(builder.toUriString())
.exchange()
.flatMap(resp -> {
if (Integer.valueOf(404).equals(resp.statusCode().value())) {
return Mono.empty();
} else {
return resp.bodyToMono(User.class);
}
});
} …Run Code Online (Sandbox Code Playgroud) 因此,我正在使用一个明确定义的API,旨在不返回有效负载主体DELETE和PUT操作.
这在Rx 0.X和Rx 1.x中是可接受的.现在我正在更新到Rx 2并且存在一个存在的危机,我应该如何处理空值.内容长度和正文当然是null,导致:
java.lang.NullPointerException: Null is not a valid element
at io.reactivex.internal.queue.SpscLinkedArrayQueue.offer(SpscLinkedArrayQueue.java:68)
at io.reactivex.internal.operators.observable.ObservableObserveOn$ObserveOnObserver.onNext(ObservableObserveOn.java:116)
at io.reactivex.internal.operators.observable.ObservableSubscribeOn$SubscribeOnObserver.onNext(ObservableSubscribeOn.java:63)
Run Code Online (Sandbox Code Playgroud)
在doOnNext中.
我见过很多人的建议Optional<>但是我也需要支持Java7以用于用例原因.我尝试了后端移植,但我无法让它工作.我也不想膨胀并为他们的版本导入Guava库.
我也注意到flatMap也可以帮我处理这个与地图相反的问题,而我正在阅读这些差异.
目前我有一个非常粗糙的OkHttp3拦截器,它将检查状态,检查有效负载是否为空,并添加错误的虚拟内容.
我也试过添加转换工厂.
任何人都可以提供建议并指导我正确的道路是什么?当然,API可以改变,但204不应该因为它的定义为HTTP状态代码而具有有效载荷.
compile('com.squareup.retrofit2:retrofit:2.1.0') {
exclude module: 'okhttp'
}
compile 'com.squareup.retrofit2:converter-gson:2.1.0'
compile 'com.jakewharton.retrofit:retrofit2-rxjava2-adapter:1.0.0'
compile 'com.squareup.okhttp3:okhttp:3.5.0'
compile 'com.squareup.okhttp3:logging-interceptor:3.5.0'
compile 'io.reactivex.rxjava2:rxjava:2.0.5'
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
compile 'com.trello.rxlifecycle2:rxlifecycle:2.0.1'
compile 'com.trello.rxlifecycle2:rxlifecycle-components:2.0.1'
Run Code Online (Sandbox Code Playgroud) 我为调试和发布设置了相同的构建类型,
buildTypes {
debug {
buildConfigField "String", "API_BASE_URL", "\"https://www.testUrl.com/api/\""
minifyEnabled true
shrinkResources true
proguardFiles getDefaultProguardFile('proguard-android.txt'), 'proguard-rules.pro'
signingConfig signingConfigs.release_key
}
release {
buildConfigField "String", "API_BASE_URL", "\"https://www.testUrl.com/api/\""
minifyEnabled true
shrinkResources true
proguardFiles getDefaultProguardFile('proguard-android.txt'), 'proguard-rules.pro'
signingConfig signingConfigs.release_key
}
}
Run Code Online (Sandbox Code Playgroud)
但是,如果我使用该版本构建,我会收到以下错误.此外,服务器响应完全相同.
W/System.err: java.lang.NullPointerException: The mapper function returned a null value.
W/System.err: at b.a.e.b.b.a(Unknown Source)
W/System.err: at b.a.e.e.b.bs$a.onNext(Unknown Source)
W/System.err: at b.a.e.e.b.cm$a.onNext(Unknown Source)
W/System.err: at retrofit2.adapter.rxjava2.BodyObservable$BodyObserver.onNext(Unknown Source)
W/System.err: at retrofit2.adapter.rxjava2.BodyObservable$BodyObserver.onNext(Unknown Source)
W/System.err: at retrofit2.adapter.rxjava2.CallExecuteObservable.subscribeActual(Unknown Source)
W/System.err: at b.a.l.subscribe(Unknown Source)
W/System.err: at retrofit2.adapter.rxjava2.BodyObservable.subscribeActual(Unknown Source)
W/System.err: at …Run Code Online (Sandbox Code Playgroud)