RXJava - 创建一个可暂停的可观察对象(例如缓冲区和窗口)

pro*_*m85 16 android observable rx-java

我想创建以下内容的observable:

  • 缓冲所有项目,同时暂停
  • 立即发出物品,而不是暂停
  • 暂停/恢复触发器必须来自另一个可观察者
  • 它必须保存以供不在主线程上运行的observable使用,并且必须保存以更改主线程中的暂停/恢复状态

我想使用BehaviorSubject<Boolean>as触发器并将此触发器绑定到activity onResumeonPause事件.(附加代码示例)

我已经设置了一些东西,但它没有按预期工作.我用它如下:

Observable o = ...;
// Variant 1
o = o.lift(new RxValve(getPauser(), 1000, getPauser().getValue())
// Variant 2
// o = o.compose(RXPauser.applyPauser(getPauser()));
o
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe();
Run Code Online (Sandbox Code Playgroud)

目前的问题是,Variant 1应该可以正常工作,但有时,事件只是没有发出 - 阀门没有发射,直到阀门一切正常工作(可能是一个穿线问题......)!解决方案2更简单,似乎有效,但我不确定它是否真的更好,我不这么认为.我实际上不确定,为什么解决方案有时会失败,所以我不确定解决方案2是否解决了(目前对我不知道)问题...

有人可以告诉我可能是什么问题或者简单的解决方案应该可靠地工作吗?或者给我一个可靠的解决方案?

RxValue

https://gist.github.com/akarnokd/1c54e5a4f64f9b1e46bdcf62b4222f08

RXPauser功能

public static <T> Observable.Transformer<T, T> applyPauser(Observable<Boolean> pauser)
{
    return observable -> pauser(observable, pauser);
}

private static <T> Observable<T> pauser(Observable<T> source, Observable<Boolean> pauser)
{
    // this observable buffers all items that are emitted while emission is paused
    Observable<T> sharedSource = source.publish().refCount();
    Observable<T> queue = sharedSource
            .buffer(pauser.distinctUntilChanged().filter(isResumed -> !isResumed), aBoolean -> pauser.distinctUntilChanged().filter(isResumed -> isResumed))
            .flatMap(l -> Observable.from(l))
            .doOnNext(t -> L.d(RXPauser.class, "Pauser QUEUED: " + t));

    // this observable emits all items that are emitted while emission is not paused
    Observable<T> window = sharedSource.window(pauser.distinctUntilChanged().filter(isResumed -> isResumed), aBoolean ->  pauser.distinctUntilChanged().filter(isResumed -> !isResumed))
            .switchMap(tObservable -> tObservable)
            .doOnNext(t -> L.d(RXPauser.class, "Pauser NOT QUEUED: " + t));

    // combine both observables
    return queue.mergeWith(window)
            .doOnNext(t -> L.d(RXPauser.class, "Pauser DELIVERED: " + t));
}
Run Code Online (Sandbox Code Playgroud)

活动

public class BaseActivity extends AppCompatActivity {

    private final BehaviorSubject<Boolean> pauser = BehaviorSubject.create(false);

    public BaseActivity(Bundle savedInstanceState)
    {
        super(args);
        final Class<?> clazz = this.getClass();
        pauser
                .doOnUnsubscribe(() -> {
                    L.d(clazz, "Pauser unsubscribed!");
                })
                .subscribe(aBoolean -> {
                    L.d(clazz, "Pauser - " + (aBoolean ? "RESUMED" : "PAUSED"));
                });
    }

    public PublishSubject<Boolean> getPauser()
    {
        return pauser;
    }

    @Override
    protected void onResume()
    {
        super.onResume();
        pauser.onNext(true);
    }

    @Override
    protected void onPause()
    {
        pauser.onNext(false);
        super.onPause();
    }
}
Run Code Online (Sandbox Code Playgroud)

Ale*_*tov 3

您实际上可以使用.buffer()运算符将​​其传递为可观察的,定义何时停止缓冲,来自书中的示例:

Observable.interval(100, TimeUnit.MILLISECONDS).take(10)
    .buffer(Observable.interval(250, TimeUnit.MILLISECONDS))
    .subscribe(System.out::println);
Run Code Online (Sandbox Code Playgroud)

来自第 5 章“驯服序列”:https://github.com/Froussios/Intro-To-RxJava/blob/master/Part%203%20-%20Taming%20the%20sequence/5.%20Time-shifted% 20个序列.md

您可以使用PublishSubjectasObservable在自定义运算符中为其提供元素。每次需要开始缓冲时,通过以下方式创建实例Observable.defer(() -> createBufferingValve())