标签: rx-java2

在RxJava 2.X中暂停并恢复基于布尔门的observable?

假设我有一个处理器,当按下按钮时会发出一个布尔值,将其视为切换.

    boolean gateValue = true;
    PublishProcessor<Boolean> gate = PublishProcessor.create();
    view.onButtonClicked()
            .subscribe(new Action1<Void>() {
                @Override
                public void call(final Void aVoid) {
                    gate.onNext(gateValue = !gateValue);
                }
            }));
Run Code Online (Sandbox Code Playgroud)

我想要做的是使用门的值来暂停和恢复可观察的序列,在暂停时缓冲发出的值.

我已经阅读了很多内容,虽然它似乎可以在其他语言的反应式扩展中使用,但RxJava似乎并不支持它.

这是我想要实现的一个例子,它只是每秒输出一个增量值.当我按下按钮时,我希望输出停止,直到我再次按下它,这将输出两次按下按钮之间发出的每个项目:

Flowable.interval(1, TimeUnit.SECONDS)
                .bufferWhile(gate)
                .flatMapIterable(longs -> longs)
                .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(final Long aLong) throws Exception {
                        view.displayTime(aLong);
                    }
                });
Run Code Online (Sandbox Code Playgroud)

有谁知道如何实现这样的事情?

编辑我写了一篇关于如何实现这个的博客文章https://medium.com/@scottalancooper/pausing-and-resuming-a-stream-in-rxjava-988a0977b771#.gj7fsi1xk

android reactive-programming rx-java rx-java2

6
推荐指数
1
解决办法
1215
查看次数

RXJava2.我是否需要一次发出的处理流?(单身,也许)

我的代码中有很多Single,比如

Disposable disp = Single.fromCallable(()-> loadData())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribeOn(Schedulers.io())
            .subscribe(res-> showInUI(res),
                    throwable -> Log.e(TAG, throwable.getMessage()))
            );
Run Code Online (Sandbox Code Playgroud)

当我从资料了解,之间的差别Observable,并Single是单可以响应错误,决不响应,有成功的回答它发射只有一次.现在我没有在任何地方处置,一切正常.

所以我需要执行disp.dispose()吗?

java rx-java rx-java2

6
推荐指数
1
解决办法
2096
查看次数

改造JavaRx2线程中断

我正在创建嵌套请求,如下所示(省略了一些错误处理):

return Single.create((SingleOnSubscribe<String>) emitter -> getPages()
    .subscribe(pages -> getPageData(emitter, pages), emitter::onError))
    .compose(applySchedulers());

    // ...

private void getPageData(SingleEmitter<String> emitter, List<Page> pages) {
    service.getPage(pages.get(0).id)
            .subscribe(emitter::onSuccess, e -> {
                pages.remove(0);
                getPageData(emitter, pages);
            });
}
Run Code Online (Sandbox Code Playgroud)

我曾经有一个迭代解决方案,它产生了相同的结果.页面列表按顺序排序,应按原样处理.如果连接良好,这部分代码可以正常工作,但是如果我碰巧遇到了错误的连接java.io.InterruptedIOException: thread interrupted.解决这个问题的好方法是什么?

编辑:

堆栈跟踪:

W/System.err: java.io.InterruptedIOException: thread interrupted
W/System.err:     at okio.Timeout.throwIfReached(Timeout.java:145)
W/System.err:     at okio.Okio$2.read(Okio.java:136)
W/System.err:     at okio.AsyncTimeout$2.read(AsyncTimeout.java:237)
W/System.err:     at okio.RealBufferedSource.read(RealBufferedSource.java:46)
W/System.err:     at okhttp3.internal.http1.Http1Codec$ChunkedSource.read(Http1Codec.java:429)
W/System.err:     at okio.RealBufferedSource.read(RealBufferedSource.java:46)
W/System.err:     at okio.RealBufferedSource.exhausted(RealBufferedSource.java:56)
W/System.err:     at okio.InflaterSource.refill(InflaterSource.java:101)
W/System.err:     at okio.InflaterSource.read(InflaterSource.java:62)
W/System.err:     at okio.GzipSource.read(GzipSource.java:80)
W/System.err:     at okio.RealBufferedSource.read(RealBufferedSource.java:46)
W/System.err:     at okio.ForwardingSource.read(ForwardingSource.java:35)
W/System.err:     at …
Run Code Online (Sandbox Code Playgroud)

android retrofit2 rx-java2

6
推荐指数
1
解决办法
949
查看次数

RxJava获取链式网络请求中先前可观察到的结果

我的应用程序中有2个服务端点,a并且b(两个都是“ Singles”)。服务请求b取决于的响应a。在响应之后,b我需要访问订户中的两个响应。我打算这样打电话:

services.a()
    .flatMap(a -> services.b(a))
    .subscribe(b ->
        // Access a and b here
    )
Run Code Online (Sandbox Code Playgroud)

但是通过这种方式,我只能访问b订户中的结果。我又如何传递对此的回应a

我的第一个尝试是使用类似这样的东西:

// Note: Code will not compile... Just to show the concept
services.a()
    .flatMap(
        a -> Observable.combineLatest(
                Observable.just(a)
                services.b(a).toObservable()
                Bifunction((a, b) -> {return Pair<ResponseA, ResponseB>(a, b)}))
    )
    .toSingle()
    .subscribe(pair -> {
        ResponseA a = pair.first();
        ResponseB b = pair.second();
    })
Run Code Online (Sandbox Code Playgroud)

但是随着用例变得更加复杂,代码将演变为丑陋的怪物。

java rx-java rx-java2

6
推荐指数
1
解决办法
1616
查看次数

每分钟rxjava重复一次观察的最佳方法

我有以下方法:

public class ParentalControlInteractor {
   public Single<Boolean> isPinSet() {
       return bamSdk.getPinManager().isPINSet();
   }
}
Run Code Online (Sandbox Code Playgroud)

我想调用此函数运行一次,然后每分钟重复一次,直到无穷大但这看起来很笨拙:

    parentalControlInteractor.isPinSet()
            .subscribeOn(Schedulers.io())
            .repeat(10000)
            .timeout(1600,TimeUnit.MILLISECONDS)
            .doOnError(throwable -> {
                Timber.e(throwable,"Error getting if Pin is set");
                throwable.printStackTrace();
            })
            .subscribe(isPinSet -> {
                this.isPinSet = isPinSet;
                Timber.d("Pin is set = " + isPinSet.toString());
                });
Run Code Online (Sandbox Code Playgroud)

有没有更好的方法呢?我正在使用RxJava2.此外,上述方法仅调用10000次.我想永远调用它,就像使用Handler.postDelayed()一样.

android repeat rx-java2

6
推荐指数
5
解决办法
1万
查看次数

在处理时停止rxJava可观察链执行

尽管调试rxJava网络电话在一个应用程序我碰到这样一种情况,如果我们disposeclear处置对象通过链的订阅返回observables那么只有第一个observable得到处理而不是其他链接observablesflatMap.

看看下面的演示代码片段:

CompositeDisposable testCompositeDisposal = new CompositeDisposable();

private void testLoadData() {
    Disposable disposable = Observable.create(sbr -> {
        for (int i = 0; i < 5; i++) {
            Thread.sleep(3000);
            Log.w("Debug: ", "First: " + i);
            sbr.onNext(true);
        }
        sbr.onComplete();
    }).subscribeOn(Schedulers.io()).flatMap(value -> Observable.create(sbr -> {
        for (int i = 0; i < 5; i++) {
            Thread.sleep(3000);
            Log.w("Debug: ", "Second: " + i);
            sbr.onNext(true);
        }
        sbr.onComplete();
    })).doOnNext(value -> {
        Log.w("Debug: ", "doONNext"); …
Run Code Online (Sandbox Code Playgroud)

android rx-java rx-java2

6
推荐指数
1
解决办法
4928
查看次数

使用RxJava2和Retrofit2时如何访问响应头?

前提

我正在开发一个简单的应用程序,我想在RecyclerView中列出用户的GitHub存储库.我在构建它时使用作为我的端点.

问题

我面临的问题是GitHub API一次只返回30个repos.为了获得更多,我可以per_page=100在我的查询字符串中附加一个(100是最大值); 但是,我们如何处理超过100个回购的用户?

API 文档提供的解决方案是next;从"链接"响应标头获取URL以进行第二次API调用.

怎么会这样呢?谢谢!

android github-api retrofit2 clean-architecture rx-java2

6
推荐指数
1
解决办法
1545
查看次数

Kotlin和RxJava2 zip运算符-使用提供的参数无法调用以下函数

使用Android Studio 3.0 IDE在Kotlin中编写.zip Observable时遇到问题。

这是我的代码:

internal var updateStringEventsSubject = PublishSubject.create<String>()
internal var updateIntEventsSubject = PublishSubject.create<Int>()

internal var triggerSave = PublishSubject.create<Boolean>()
internal var triggerStopAndSave = PublishSubject.create<Boolean>()

internal var normalSaveTrigger = triggerSave.debounce(30, TimeUnit.SECONDS)
internal var trigger = Observable.merge(normalSaveTrigger, triggerStopAndSave)

private fun saveEvents(
        strings: List<String>,
        integers: List<Int>,
        someBoolean: Boolean): Boolean {

    return true //doesn't matter for now
}

fun temp() {
    Observable.zip<List<String>, List<Int>, Boolean, Boolean>(updateStringEventsSubject.buffer(trigger),
            updateIntEventsSubject.buffer(trigger),
            trigger, { strings: List<String>, integers: List<Int>, someBoolean: Boolean -> saveEvents(strings, integers, someBoolean) })
            .subscribe()
}
Run Code Online (Sandbox Code Playgroud)

但是,我的IDE(Android …

kotlin android-studio rx-java rx-java2 android-studio-3.0

6
推荐指数
1
解决办法
2286
查看次数

在主线程上调用doOnSubscribe

在阅读了多篇博客文章和文档后,我得出结论,以下doOnSubscribe将在工作线程上执行:

Observable.just(1)
            .observeOn(Schedulers.io())
            .doOnSubscribe(__ -> Log.d("Testing", "Testing")) // Shouldn't this be on worker thread?
            .subscribe();
Run Code Online (Sandbox Code Playgroud)

但是在调试之后,我看到doOnSubscribe在主线程上执行了.我认为doOnSubscribe它与其他运算符类似,因此在与subscribeOn和相结合时具有类似的线程行为observeOn.

我错过了什么?如何将doOnSubscribe执行移至后台线程?

android reactive-programming rx-java2

6
推荐指数
2
解决办法
1480
查看次数

如何在RxJava中正确转换多播可观察对象

假设我有一个事件发射数据源,我想将其转换为反应流.数据源由资源绑定(例如,定期发送更新状态的套接字),因此我希望共享该资源的单个订阅.使用单个observable replay(对于新订户立即获得当前值),refCount运营商似乎非常适合这种情况.例如,这是他的MyDataProvider单身人士的样子:

private final Observable<MyData> myDataObservable = Observable.<MyData>create(emitter -> {
    // Open my resource here and emit data into observable
})
    .doOnDispose(() -> {
        // Close my resource here
    })
    .replay(1)
    .refCount();

public Observable<MyData> getMyDataObservable() {
    return myDataObservable;
}
Run Code Online (Sandbox Code Playgroud)

但是,现在假设我有另一个数据源需要第一个数据源的结果来计算自己的值:

private final Observable<AnotherData> anotherDataObservable = getMyDataProvider().getMyDataObservable()
    .flatMap(myData -> {
        // Call another data source and return the result here
    })

public Observable<AnotherData> getAnotherDataObservable() {
    return anotherDataObservable;
}
Run Code Online (Sandbox Code Playgroud)

在这里,我的设置开始崩溃.第一个可观测量的多播仅适用于refCount操作员.在那之后,一切都再次单播.这意味着如果进行两次单独的订阅anotherDataProvider,flatMap则将调用operator两次.我看到了两个解决方法,但我不喜欢这两个:

1.在多播发生之前转换第一个observable …

java android reactive-programming rx-java rx-java2

6
推荐指数
1
解决办法
597
查看次数