Realm,RxJava,asObservable()和doOnUnsubscribe()

Tud*_*uca 13 mvp android realm rx-java

在我的Android项目中,我使用realm作为我的数据存储引擎.我喜欢它!
我也使用RxJava,因为它使"线程化"变得如此简单,我真的很喜欢整个"反应性思维".我喜欢它!

我使用MVP模式+一些"清洁架构"的想法来构建我的应用程序.

Interactors是唯一知道的人Realm.我在Observable的帮助下公开数据,如下所示:

@Override
public Observable<City> getHomeTown() {
    final Realm realm = Realm.getDefaultInstance();
    return realm.where(City.class).equalTo("name", "Cluj-Napoca").findAllAsync().asObservable()
            .doOnUnsubscribe(new Action0() {
                @Override
                public void call() {
                    realm.close();
                }
            })
            .compose(new NullIfNoRealmObject<City>());
}
Run Code Online (Sandbox Code Playgroud)

问题是我doOnUnsubscribe的副作用被调用之前Realm可以做它的事情,处理暴露的observable:

Caused by: java.lang.IllegalStateException: This Realm instance has already been closed, making it unusable.
at io.realm.BaseRealm.checkIfValid(BaseRealm.java:344)
at io.realm.RealmResults.removeChangeListener(RealmResults.java:818)
at io.realm.rx.RealmObservableFactory$3$2.call(RealmObservableFactory.java:137)
at rx.subscriptions.BooleanSubscription.unsubscribe(BooleanSubscription.java:71)
at rx.internal.util.SubscriptionList.unsubscribeFromAll(SubscriptionList.java:124)
at rx.internal.util.SubscriptionList.unsubscribe(SubscriptionList.java:113)
at rx.Subscriber.unsubscribe(Subscriber.java:98)
at rx.internal.util.SubscriptionList.unsubscribeFromAll(SubscriptionList.java:124)
at rx.internal.util.SubscriptionList.unsubscribe(SubscriptionList.java:113)
at rx.Subscriber.unsubscribe(Subscriber.java:98)
at rx.subscriptions.CompositeSubscription.unsubscribeFromAll(CompositeSubscription.java:150)
at rx.subscriptions.CompositeSubscription.unsubscribe(CompositeSubscription.java:139)
at ro.tudorluca.realm.sandbox.city.CityPresenter.onDestroy(CityPresenter.java:62)
at ro.tudorluca.realm.sandbox.city.CityActivity.onDestroy(CityActivity.java:35)
Run Code Online (Sandbox Code Playgroud)

我为这个用例创建了一个沙箱项目.

我真的很喜欢使用Realm + RxJava,但是close当我unsubscribe(我通常在活动被破坏时取消订阅)时,我似乎无法找到Realm实例的干净解决方案.有任何想法吗?

编辑1:https://github.com/realm/realm-java/issues/2357
编辑2:感谢非常活跃的领域团队,已经有一个解决此问题的拉取请求.

Tud*_*uca 2

21 小时后,这就是我想出的结果:

@Override
public Observable<City> getHomeTown() {
    return getManagedRealm()
            .concatMap(new Func1<Realm, Observable<City>>() {
                @Override
                public Observable<City> call(Realm realm) {
                    return realm.where(City.class).equalTo("name", "Cluj-Napoca").findAllAsync().asObservable()
                            .compose(new NullIfNoRealmObject<City>());
                }
            });
}

private static Observable<Realm> getManagedRealm() {
    return Observable.create(new Observable.OnSubscribe<Realm>() {
        @Override
        public void call(final Subscriber<? super Realm> subscriber) {
            final Realm realm = Realm.getDefaultInstance();
            subscriber.add(Subscriptions.create(new Action0() {
                @Override
                public void call() {
                    realm.close();
                }
            }));
            subscriber.onNext(realm);
        }
    });
}
Run Code Online (Sandbox Code Playgroud)

在将问题发布到 stackoverflow 之前,我尝试过类似的操作,但我的错误是使用flatMap(), 而不是concatMap()

与 不同的是flatMap()concatMap()将保持排放的顺序,在我的例子中,这意味着 myAction0 -> realm.close()将是从流取消订阅后调用的最后一个操作,在Action0 -> results.removeChangeListener(listener)导致问题的 Realm 操作之后。

完整的示例可以在github上找到。

编辑:感谢非常活跃的领域团队,已经有一个拉取请求来解决这个问题。