RxJava:在每个订阅者处理完资源后关闭资源

G. *_*ard 4 android rx-java rx-android

我是RxJava的新手,我正在努力弄清楚如何正确地关闭资源,特别是在处理多个订阅者时.

我有一个Observable<T>地方T是一些Closeable资源(例如说一个Android数据库Cursor.

我可能在observable上有多个订阅者.close()在每个订阅者完成处理之后我想要资源.换句话说,在新资源交付/发出后关闭旧资源,最后在最后一个订阅者取消订阅时关闭最后一个资源.

我试图使用我调用的自定义操作符使其工作AutoCloseOperator,并且它几乎正常工作,但不太正确.即我仍然是竞争条件和/或泄漏,例如资源没有关闭.

在RxJava中执行此操作的正确方法是什么?

说我有这个代码:

final AutoCloseOperator<MyResource> autoClose = new AutoCloseOperator<MyResource>();
Subject<MyResource, MyResource> subject = PublishSubject.create();
Observable<MyResource> o = subject.lift(autoClose);

Subscription s1 = o.subscribe(new Action1<MyResource>() {
    public void call(MyResource myObj) {
        System.out.println("s1 handling " + myObj);
    }
});

subject.onNext(new MyResource(1));
subject.onNext(new MyResource(2)); // This should close Resource #1 after Resource #2 is delivered

Subscription s2 = o.subscribe(new Action1<MyResource>() {
    public void call(MyResource myObj) {
        System.out.println("s2 handling " + myObj);
    }
});

subject.onNext(new MyResource(3));
subject.onNext(new MyResource(4));

s1.unsubscribe();

subject.onNext(new MyResource(5));
subject.onNext(new MyResource(6));

s2.unsubscribe();

subject.onNext(new MyResource(7));
subject.onNext(new MyResource(8));
Run Code Online (Sandbox Code Playgroud)

然后我会期待以下行为:

s1 handling Resource #1
s1 handling Resource #2
Closing Resource #1
s1 handling Resource #3
Closing Resource #2
s2 handling Resource #3
s1 handling Resource #4
s2 handling Resource #4
Closing Resource #3
s2 handling Resource #5
Closing Resource #4
s2 handling Resource #6
Closing Resource #5
Closing Resource #6
Closing Resource #7
Closing Resource #8
Run Code Online (Sandbox Code Playgroud)

注意:我没有PublishSubject在我的真实代码中使用,我只是在这里使用它来进行说明,我使用每次更新数据库表时都会Observable.create发出Cursor...

概括问题:我可以使用doOnNextdoOnUnsubscribe关闭旧项目,但这并没有考虑到这些事件会多次发生(对于每个订阅者),而我只想在所有订阅者都拥有时关闭资源收到了新项目.

是自定义运算符使用lift()的方式,还是有一些现有的运算符可能有助于此?

我已经把我的问题减少到了GitHub上的一个小命令行应用程序.谢谢你的期待!

Dav*_*ten 6

Observable.using() 是你需要的.

如果你t的类型T有一个.close()方法,你想从t(你的光标)中提取一些内容,Observable<R>那么这里是如何做到的:

Func0<T> resourceFactory = () -> t;
Func1<T, Observable<R>> observableFactory = x -> ...
Action1<T> disposeAction = x -> x.close();

Observable<R> results = Observable.using(resourceFactory, observableFactory, disposeAction);
Run Code Online (Sandbox Code Playgroud)

你提到过你Observable<T>.要从所有Ts中获取所有R,请使用上面的代码,如下所示:

Observable<T> source = ...
Observable<R> results = 
    source.flatMap(t -> {
        Func0<T> resourceFactory = () -> t;
        Func1<T, Observable<R>> observableFactory = x -> ...
        Action1<T> disposeAction = x -> x.close();
        return Observable.using(resourceFactory, observableFactory, disposeAction);});
Run Code Online (Sandbox Code Playgroud)