标签: rx-java

RxJava中的笛卡尔积

是否有可能在rxjava中获得两个Observable的笛卡尔积?

像这样的东西:

A -> 1,2,3
B -> a,b
A x B -> (1, a), (1, b), (2, a), (2, b), (3, a), (3, b)
Run Code Online (Sandbox Code Playgroud)

java rx-java

13
推荐指数
1
解决办法
1070
查看次数

Android上rxjava的默认调度程序

我正在使用Retrofit为我的异步网络调用返回rxjava Observable.

我发现自己重复以下调用:

someApiCall().subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread())

好像我总是订阅IO线程并观察Android主线程.这似乎是我发现的所有资源的最佳实践.也许除了长时间运行的计算之外,我不太明白何时我们想要偏离这种模式.

有没有办法通过默认subscribeOn和observeOn线程来删除这个样板?

这是rxjava插件的用例吗?(我找不到很多使用它们的例子.)

我可以通过搞乱改造执行器来设置网络边界的默认线程吗?

java multithreading android rx-java retrofit

13
推荐指数
2
解决办法
5621
查看次数

如果第二个立即失败,ReactiveX concat不会从第一个observable产生onNext

我连续两个observable来显示缓存中的数据,然后开始从网络加载数据并显示更新的数据.

Observable.concat(
    getContentFromCache.subscribeOn(dbScheduler),
    getContentFromNetwork.subscibeOn(networkScheduler)
).observeOn(AndroidSchedulers.mainThread())
 .subscribe(subscriber);
Run Code Online (Sandbox Code Playgroud)

如果没有网络连接,则在调用OnSubscribe后第二个observable会立即失败.

如果第二个observable立即失败,则第一个observable的数据将丢失.永远不会在订阅者中调用onNext方法.

我想,这可能是由于OperatorConcat.ConcatSubscriber中的以下代码造成的

    @Override
    public void onNext(Observable<? extends T> t) {
        queue.add(nl.next(t));
        if (WIP_UPDATER.getAndIncrement(this) == 0) {
            subscribeNext();
        }
    }

    @Override
    public void onError(Throwable e) {
        child.onError(e);
        unsubscribe();
    }
Run Code Online (Sandbox Code Playgroud)

看起来在收到错误后它取消订阅,并且所有挂起的onNext都将丢失.

解决问题的最佳方法是什么?

更新

看起来我找到了解决方案,而不是为连接的observable设置observOn我为每个observable设置了observOn.

Observable.concat(
    getContentFromCache.subscribeOn(dbScheduler).observeOn(AndroidSchedulers.mainThread()),
    getContentFromNetwork.subscibeOn(networkScheduler).observeOn(AndroidSchedulers.mainThread())
)
 .subscribe(subscriber);
Run Code Online (Sandbox Code Playgroud)

rx-java

13
推荐指数
1
解决办法
2490
查看次数

Realm,RxJava,asObservable()和doOnUnsubscribe()

在我的Android项目中,我使用realm作为我的数据存储引擎.我喜欢它!
我也使用RxJava,因为它使"线程化"变得如此简单,我真的很喜欢整个"反应性思维".我喜欢它!

我使用MVP模式+一些"清洁架构"的想法来构建我的应用程序.

Interactors是唯一知道的人Realm.我在Observable的帮助下公开数据,如下所示:

@Override
public Observable<City> getHomeTown() {
    final Realm realm = Realm.getDefaultInstance();
    return realm.where(City.class).equalTo("name", "Cluj-Napoca").findAllAsync().asObservable()
            .doOnUnsubscribe(new Action0() {
                @Override
                public void call() {
                    realm.close();
                }
            })
            .compose(new NullIfNoRealmObject<City>());
}
Run Code Online (Sandbox Code Playgroud)

问题是我doOnUnsubscribe的副作用被调用之前Realm可以做它的事情,处理暴露的observable:

Caused by: java.lang.IllegalStateException: This Realm instance has already been closed, making it unusable.
at io.realm.BaseRealm.checkIfValid(BaseRealm.java:344)
at io.realm.RealmResults.removeChangeListener(RealmResults.java:818)
at io.realm.rx.RealmObservableFactory$3$2.call(RealmObservableFactory.java:137)
at rx.subscriptions.BooleanSubscription.unsubscribe(BooleanSubscription.java:71)
at rx.internal.util.SubscriptionList.unsubscribeFromAll(SubscriptionList.java:124)
at rx.internal.util.SubscriptionList.unsubscribe(SubscriptionList.java:113)
at rx.Subscriber.unsubscribe(Subscriber.java:98)
at rx.internal.util.SubscriptionList.unsubscribeFromAll(SubscriptionList.java:124)
at rx.internal.util.SubscriptionList.unsubscribe(SubscriptionList.java:113)
at rx.Subscriber.unsubscribe(Subscriber.java:98)
at rx.subscriptions.CompositeSubscription.unsubscribeFromAll(CompositeSubscription.java:150)
at …
Run Code Online (Sandbox Code Playgroud)

mvp android realm rx-java

13
推荐指数
1
解决办法
3260
查看次数

用于解构的Kotlin四倍,五倍等

我正在寻找一种干净的方式来在线创建可破坏的对象.kotlin.Pairkotlin.Triple涵盖了很多用例,但有时需要传递更多的对象.

一个示例用例是RX的zip函数,其中几个I/O调用的结果需要映射到另一个对象:

Single
    .zip(repositoryA.loadData(someId),
         repositoryB.loadData(someId),
         repositoryC.loadAll(),
         repositoryD.loadAll()),
         { objectA, objectB, objectsC, objectsD -> /*some Kotlin magic*/ }
    )
    .map { (objectA, objectB, objectsC, objectsD) -> /*do the mapping*/ }
Run Code Online (Sandbox Code Playgroud)

我想弄清楚"一些Kotlin魔法"部分会发生什么.如果只有3个存储库,那就是

Triple(objectA, objectB, objectsC)
Run Code Online (Sandbox Code Playgroud)

我是否需要为此创建一个新的数据类,以及任何n元组的情况,还是有另一种方法?

kotlin rx-java

13
推荐指数
4
解决办法
4738
查看次数

反应式java方法hide()

Observable类中的hide方法用于什么?我阅读了该文档,但仍然不知道它的用途是什么,我看到很多人都使用它

 hide()
    Hides the identity of this Observable and its Disposable.
Run Code Online (Sandbox Code Playgroud)

http://reactivex.io/RxJava/javadoc/io/reactivex/Observable.html

我们应该何时使用这种方法?

java android rx-java

13
推荐指数
1
解决办法
2260
查看次数

用MVVM android导航

我有一个使用Androids ViewModel类和导航组件的应用程序,用于在片段之间导航.我如何处理ViewModel的导航?我正在使用RxJava,我正在考虑让Fragments监听导航事件,然后以这种方式触发导航.处理这个问题的正常方法是什么?如果有帮助,我也使用Dagger进行依赖注入.

android mvvm rx-java android-viewmodel android-architecture-navigation

13
推荐指数
2
解决办法
5124
查看次数

始终调用Mono switchIfEmpty()

我有两种方法.
主要方法:

@PostMapping("/login")
public Mono<ResponseEntity<ApiResponseLogin>> loginUser(@RequestBody final LoginUser loginUser) {
    return socialService.verifyAccount(loginUser)
            .flatMap(socialAccountIsValid -> {
                if (socialAccountIsValid) {
                    return this.userService.getUserByEmail(loginUser.getEmail())
                            .switchIfEmpty(insertUser(loginUser))
                            .flatMap(foundUser -> updateUser(loginUser, foundUser))
                            .map(savedUser -> {
                                String jwts = jwt.createJwts(savedUser.get_id(), savedUser.getFirstName(), "user");
                                return new ResponseEntity<>(HttpStatus.OK);
                            });
                } else {
                    return Mono.just(new ResponseEntity<>(HttpStatus.UNAUTHORIZED));
                }
            });

}
Run Code Online (Sandbox Code Playgroud)

这个被调用的方法(该服务调用外部api):

public Mono<User> getUserByEmail(String email) {
    UriComponentsBuilder builder = UriComponentsBuilder
            .fromHttpUrl(USER_API_BASE_URI)
            .queryParam("email", email);
    return this.webClient.get()
            .uri(builder.toUriString())
            .exchange()
            .flatMap(resp -> {
                if (Integer.valueOf(404).equals(resp.statusCode().value())) {
                    return Mono.empty();
                } else {
                    return resp.bodyToMono(User.class);
                }
            });
} …
Run Code Online (Sandbox Code Playgroud)

java rx-java spring-boot reactive spring-webflux

13
推荐指数
2
解决办法
3455
查看次数

RxJava2/Retrofit2 - 为204 PUT和DELETE请求处理null

因此,我正在使用一个明确定义的API,旨在不返回有效负载主体DELETEPUT操作.

这在Rx 0.X和Rx 1.x中是可接受的.现在我正在更新到Rx 2并且存在一个存在的危机,我应该如何处理空值.内容长度和正文当然是null,导致:

java.lang.NullPointerException: Null is not a valid element
   at io.reactivex.internal.queue.SpscLinkedArrayQueue.offer(SpscLinkedArrayQueue.java:68)
   at io.reactivex.internal.operators.observable.ObservableObserveOn$ObserveOnObserver.onNext(ObservableObserveOn.java:116)
   at io.reactivex.internal.operators.observable.ObservableSubscribeOn$SubscribeOnObserver.onNext(ObservableSubscribeOn.java:63)
Run Code Online (Sandbox Code Playgroud)

在doOnNext中.

我见过很多人的建议Optional<>但是我也需要支持Java7以用于用例原因.我尝试了后端移植,但我无法让它工作.我也不想膨胀并为他们的版本导入Guava库.

我也注意到flatMap也可以帮我处理这个与地图相反的问题,而我正在阅读这些差异.

目前我有一个非常粗糙的OkHttp3拦截器,它将检查状态,检查有效负载是否为空,并添加错误的虚拟内容.

我也试过添加转换工厂.

任何人都可以提供建议并指导我正确的道路是什么?当然,API可以改变,但204不应该因为它的定义为HTTP状态代码而具有有效载荷.

相关依赖

compile('com.squareup.retrofit2:retrofit:2.1.0') {
     exclude module: 'okhttp'
} 
compile 'com.squareup.retrofit2:converter-gson:2.1.0'
compile 'com.jakewharton.retrofit:retrofit2-rxjava2-adapter:1.0.0'
compile 'com.squareup.okhttp3:okhttp:3.5.0'
compile 'com.squareup.okhttp3:logging-interceptor:3.5.0'

compile 'io.reactivex.rxjava2:rxjava:2.0.5'
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
compile 'com.trello.rxlifecycle2:rxlifecycle:2.0.1'
compile 'com.trello.rxlifecycle2:rxlifecycle-components:2.0.1'
Run Code Online (Sandbox Code Playgroud)

rx-java retrofit2 okhttp3

12
推荐指数
2
解决办法
5223
查看次数

mapper函数返回null值

我为调试和发布设置了相同的构建类型,

buildTypes {
    debug {
        buildConfigField "String", "API_BASE_URL", "\"https://www.testUrl.com/api/\""
        minifyEnabled true
        shrinkResources true
        proguardFiles getDefaultProguardFile('proguard-android.txt'), 'proguard-rules.pro'
        signingConfig signingConfigs.release_key
    }
    release {
        buildConfigField "String", "API_BASE_URL", "\"https://www.testUrl.com/api/\""
        minifyEnabled true
        shrinkResources true
        proguardFiles getDefaultProguardFile('proguard-android.txt'), 'proguard-rules.pro'
        signingConfig signingConfigs.release_key
    }
}
Run Code Online (Sandbox Code Playgroud)

但是,如果我使用该版本构建,我会收到以下错误.此外,服务器响应完全相同.

W/System.err: java.lang.NullPointerException: The mapper function returned a null value.
W/System.err:     at b.a.e.b.b.a(Unknown Source)
W/System.err:     at b.a.e.e.b.bs$a.onNext(Unknown Source)
W/System.err:     at b.a.e.e.b.cm$a.onNext(Unknown Source)
W/System.err:     at retrofit2.adapter.rxjava2.BodyObservable$BodyObserver.onNext(Unknown Source)
W/System.err:     at retrofit2.adapter.rxjava2.BodyObservable$BodyObserver.onNext(Unknown Source)
W/System.err:     at retrofit2.adapter.rxjava2.CallExecuteObservable.subscribeActual(Unknown Source)
W/System.err:     at b.a.l.subscribe(Unknown Source)
W/System.err:     at retrofit2.adapter.rxjava2.BodyObservable.subscribeActual(Unknown Source)
W/System.err:     at …
Run Code Online (Sandbox Code Playgroud)

java android rx-java retrofit2 rx-java2

12
推荐指数
1
解决办法
9660
查看次数