如何在 RxJava 中将单个输出传递给可完成的?

Adi*_*Adi 3 java reactive-programming rx-java rx-android rx-java2

我想调用一个 APIapi1并将API返回的输出传递给第二个 API api2。第一个 API 是获取请求,第二个 API 是POST请求。因此,api1返回 aSingle<String>api2返回 a Completable

函数看起来像这样:

// api1
public Single<String> getToken() {
  ...
}
// api2
public Completable saveTokenToBackend(String token, String userId) {
 ...
}
Run Code Online (Sandbox Code Playgroud)

我想将这两个操作链接在一起。订阅者只关心获取令牌并保存它的过程是否成功。因此,最终操作链的返回类型应该是 a Completable。但是,当我这样做时,API 要求api2停止发生。仅api1根据日志成功运行。

Single<Completable> r1 = getToken().map(t -> saveTokenToBackend(t, userId));
Completable r2 = Completable.fromSingle(r1);
Run Code Online (Sandbox Code Playgroud)

我在这里更广泛的问题是,我如何将响应从 Single 链接到 Completable?

第二个问题是为什么上面的代码不起作用?

::编辑::

根据评论中的建议,我尝试了:

public Completable getAndSaveToken() {
    getToken().flatMapCompletable(t -> saveTokenToBackend(t, "dummyuser");
}
Run Code Online (Sandbox Code Playgroud)

在我的应用程序代码中,我正在做:

getAndSaveToken()
   .subscribeOn(Schedulers.io())
   .observeOn(AndroidSchedulers.mainThread())
   .subscribe(() -> {
                Log.v(TAG, "call success");
    }, error -> {
                Log.e(TAG, "Error", error);
    });
Run Code Online (Sandbox Code Playgroud)

结果是: java.io.IOException: Must not be called on the main application thread

Ign*_*aca 5

你有两个不同的问题,组合和调度。上下文:getToken返回单个并saveToken(token)返回一个可完成的。

composition:要组合一个单一的和一个可完成的,正如您已经注意到的,您可以使用 flatMap 运算符,这将返回一个新的可完成的,它首先获取令牌,然后保存它(可能,您需要在之前修改令牌保存它;)

getToken().flatMapCompletable(n -> saveToken(n)) // returns a completable
Run Code Online (Sandbox Code Playgroud)

如果您想将其保留为单个,则可以将其映射回第一个实例:

getToken().flatMap(n -> saveToken(n).toSingleDefault(n)) // returns a single
Run Code Online (Sandbox Code Playgroud)

调度:在android中,你不能在主线程中启动一个请求。为了避免它,您可以使用subscribeOn运算符,看起来您也已经注意到

getToken().subscribeOn(io()).doOnNext(n -> {/*this should be evaluated in io thread*/})
getToken().subscribeOn(io()).observeOn(mainThread()).doOnNext(n -> {/*this on mainThread*/})
Run Code Online (Sandbox Code Playgroud)

如果仍然出现错误,则observeOnsubscribeOn已在某处重新配置。需要更多代码才能确定。但无论如何,您可以断言这两个请求都在应用运算符两次的 io 线程中执行:

getToken().subscribeOn(io).flatMapCompletable(token -> saveToken(token).subscribeOn(io()))
Run Code Online (Sandbox Code Playgroud)

另一种选择,如果您正在使用改造,是应用RxJava2CallAdapterFactory使用createWithScheduler(io())工厂,而不是默认的。这将断言所有请求都是在 io() 中创建的,因此您可以组合和准备数据,最后应用observeOn(mainThread())来更新 UI。