在可观察链中应用调度程序两次(使用compose)

Pet*_*r F 5 java android rx-java

我的应用中有多个网络电话.我喜欢在这个变换器中使用compose运算符在IO线程中运行网络请求:

public static <T> Transformer<T, T> runOnIoThread()
{
    return tObservable -> tObservable.subscribeOn( Schedulers.io() )
        .observeOn( AndroidSchedulers.mainThread() );
}
Run Code Online (Sandbox Code Playgroud)

只要我只有一个网络呼叫,这似乎运作良好.但是,如果我按照以下示例链接它们,我将获得Android的NetworkInMainThreadException.

public Observable<String> networkCall1()
{
    return <NETWORK_CALL_1()>
            .compose( runOnIoThread() );
}

public Observable<String> networkCall2( String input )
{
    return <NETWORK_CALL_2(input)>
            .compose( runOnIoThread() );
}

public Observable<String> chainedCalls()
{
    return networkCall1()
            .flatMap( result1 -> networkCall2( result1 ) );
}
Run Code Online (Sandbox Code Playgroud)

之前我的想法是compose在调用之前应用于完整的可观察链,而后来的compose调用将"覆盖"前一个的行​​为.但实际上,看起来observeOn第一个compose(observeOn主线程)的compose调用支配第二个调用(subscribeOnIO线程).一个明显的解决方案是有两个版本networkCall1- 一个应用调度程序而另一个不应用.但是,这会使我的代码非常冗长.

你知道更好的解决方案吗?你能解释一下在一个可观察的链中应用调度程序两次(使用compose)的行为吗?

编辑:我正在使用RxJava进行网络调用改造.

Jah*_*old 8

subscribeOn()每个流只能使用一次.如果你第二次使用它,它将不会做任何事情.因此,当您将两种方法链接在一起时,您运行:

observeOn(AndroidSchedulers.mainThread())

它将操作切换到主线程.之后它会停留在那里,因为下一个subscribeOn()实际上被忽略了.

我建议你实际上用你的compose方法过度复杂化了.只需添加

subscribeOn(Schedulers.io())

两个网络电话然后再使用

observeOn(AndroidSchedulers.mainThread())

就在您想要在主线程上处理结果之前.你最终得到的结果如下:

public Observable<String> networkCall1()
{
    return <NETWORK_CALL_1()>
            .subscribeOn(Schedulers.io);
}

public Observable<String> networkCall2( String input )
{
    return <NETWORK_CALL_2(input)>
            .subscribeOn(Schedulers.io);
}

public Observable<String> chainedCalls()
{
    return networkCall1()
            .flatMap( result1 -> networkCall2( result1 ) )
            .observeOn(AndroidSchedulers.mainThread());
}
Run Code Online (Sandbox Code Playgroud)

编辑

如果您真的希望observeOn()通过单独的网络呼叫方法进行呼叫,则可以.您必须在方法中添加额外observeOn()的内容chainedCalls().observeOn()每个流可以拥有任意数量的呼叫.它会是这样的:

public Observable<String> networkCall1()
{
    return <NETWORK_CALL_1()>
            .subscribeOn(Schedulers.io)
            .observeOn(AndroidSchedulers.mainThread());
}

public Observable<String> networkCall2( String input )
{
    return <NETWORK_CALL_2(input)>
            .subscribeOn(Schedulers.io)
            .observeOn(AndroidSchedulers.mainThread());
}

public Observable<String> chainedCalls()
{
    return networkCall1()
            .observeOn(Schedulers.io)
            .flatMap( result1 -> networkCall2( result1 ) )
            .observeOn(AndroidSchedulers.mainThread());
}
Run Code Online (Sandbox Code Playgroud)