如何在RxJava中继承Observable?

dir*_*rls 17 rx-java

我知道你应该不惜一切代价避免这种情况,但如果我在RxJava中有一个子类Observable的有效用例怎么办?可能吗?我怎么能这样做?

在这种特定情况下,我有一个"存储库"类,它当前返回请求:

class Request<T> {
    public abstract Object key();
    public abstract Observable<T> asObservable();

    [...]

    public Request<T> transform(Func1<Request<T>, Observable<T>> transformation) {
        Request<T> self = this;
        return new Request<T>() {
             @Override public Object key() { return self.key; }
             @Override public Observable<T> asObservable() { return transformation.call(self); }
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

然后我使用transform方法在需要请求键的上下文中修改响应observable(asObservable)(如缓存):

 service.getItemList() // <- returns a Request<List<Item>>
     .transform(r -> r.asObservable()
             // The activity is the current Activity in Android
             .compose(Operators.ensureThereIsAnAccount(activity))
             // The cache comes last because we don't need auth for cached responses
             .compose(cache.cacheTransformation(r.key())))
     .asObservable()
     [...  your common RxJava code ...]
Run Code Online (Sandbox Code Playgroud)

现在,如果我的Request类是一个Observable子类,那将非常方便,因为我可以消除所有.asObservable()调用,客户端甚至不需要知道我的Request类.

aka*_*okd 19

可以继承Observable(我们为Subjects和ConnectableObservables 执行此操作),但它需要额外考虑,因为您需要传入OnSubscribe回调来处理传入的Subscribers.我不清楚你的请求应该做什么,以防有人订阅它,所以我将给你两个扩展Observable的例子:

没有共享可变状态的Observable

如果您没有在订阅者之间共享可变状态,则可以只扩展Observable并将您的操作传递给 super

public final class MyObservable extends Observable<Long> {
    public MyObservable() {
        super(new OnSubscribe<Long>() {
            @Override public void call(Subscriber<? super Long> child) {
                child.onNext(System.currentTimeMillis());
                child.onCompleted();
            }
        });
    }
}
Run Code Online (Sandbox Code Playgroud)

具有共享可变状态的可观察

这个通常比较棘手,因为你需要从OnSubscribe方法和Observable的方法访问共享状态,但Java不会让你OnSubscribesuper完成之前触及内部类的实例字段.解决方案是分解出这样一个共享状态和OnSubscribe构造函数,并使用静态工厂方法来设置两者:

public final class MySharedObservable extends Observable<Long> {
    public static MySharedObservable create() {
        final AtomicLong counter = new AtomicLong();
        OnSubscribe<Long> onSubscribe = new OnSubscribe<Long>() {
            @Override
            public void call(Subscriber<? super Long> t1) {
                t1.onNext(counter.incrementAndGet());
                t1.onCompleted();
            }
        };
        return new MySharedObservable(onSubscribe, counter);
    }
    private AtomicLong counter;

    private MySharedObservable(OnSubscribe<Long> onSubscribe, AtomicLong counter) {
        super(onSubscribe);
        this.counter = counter;
    }
    public long getCounter() {
        return counter.get();
    }
}
Run Code Online (Sandbox Code Playgroud)