在运行另一个observable之前使用concatMap运行Single

ant*_*009 8 kotlin rx-java concatmap

Android Studio 3.1 RC 2
kotlin 1.2.30
Run Code Online (Sandbox Code Playgroud)

Java中fetchMessage的签名

Single<Response> fetchMessage(final String Id); 
Run Code Online (Sandbox Code Playgroud)

kotlin代码

fun translate(Id: String): Completable {
        return repository.fetchMessage(Id)
                .flatMap {
                    Single.fromCallable<State>({
                        update(messageId, it, State.COMPLETED)
                        State.COMPLETED
                    })
                }
                .onErrorReturn({
                    update(Id, null, State.ERROR)
                    State.ERROR
                })
                .toCompletable()
    }
Run Code Online (Sandbox Code Playgroud)

我想在fetchMessage之前运行的方法

 fun insertMessage(Id: String): Completable {
        return Completable.fromCallable {
            insert(Id, State.IDLE)
        }
    }
Run Code Online (Sandbox Code Playgroud)

我希望insertMessage e以某种方式在fetchMessage之前运行.我在考虑使用concatMap但不确定如何组合translate和insertMessage.这样insertMessage将首先运行,然后一旦完成,将运行translate.

非常感谢任何建议,

Update solution 1 using startWith(..):
Run Code Online (Sandbox Code Playgroud)

通过将translate的返回值更改为Single.我这样做了:

fun translate(Id: String): Single<State> {
        return repository.fetchMessage(Id)
                .flatMap {
                    Single.fromCallable<State>({
                        update(messageId, it, State.COMPLETED)
                        State.COMPLETED
                    })
                }
                .onErrorReturn({
                    update(Id, null, State.ERROR)
                    State.ERROR
                })
    }
Run Code Online (Sandbox Code Playgroud)

然后我可以有一个方法来执行以下insertMessage(..) - > translate(..):

translate(Id).toCompletable().startWith(insertMessage(id, State.IDLE))
Run Code Online (Sandbox Code Playgroud)

这是一个理想的解决方案吗?

Update solution 2 using concatWith(..):
Run Code Online (Sandbox Code Playgroud)

我返回一个Observable并在链中调用toObservable().

fun translate(Id: String): Observable<State> {
        return repository.fetchMessage(Id)
                .flatMap {
                    Single.fromCallable<State>({
                        update(messageId, it, State.COMPLETED)
                        State.COMPLETED
                    })
                }
                .onErrorReturn({
                    update(Id, null, State.ERROR)
                    State.ERROR
                })
                .toObservable()
    }
Run Code Online (Sandbox Code Playgroud)

我可以使用concatWith,因此序列将是insertMessage(..) - > translate(..):

translate(Id).toCompletable().concatWith(insertMessage(id, State.IDLE).toObservable())
    .toCompletable()
Run Code Online (Sandbox Code Playgroud)

这些正确的解决方案?

aka*_*okd 5

如果您有Completable,您可以通过以下方式链接任何其他反应类型andThen:

insertMessage("id")
.andThen(translate("id"))
Run Code Online (Sandbox Code Playgroud)


Tpo*_*6oH 2

你的两个选择都是有道理的,但我建议你稍微清理一下。

首先,您需要清楚地了解每种情况下使用什么返回类型:ObservableSingleCompletable

定义如下:

  • Single表示发出单个值或错误的 Observable。
  • Completable表示不发出任何值,而只发出终止事件(onError 或 onCompleted)的 Observable。

在这两种情况下,您不需要返回任何数据,您所需要的只是知道操作是否成功。Completable 正是为了处理这种情况而设计的。

所以我建议你:

fun translate(Id: String): Completable {
    return repository.fetchMessage(Id)
             .flatMapCompletable {
                Completable.fromAction {
                  update(messageId, it, State.COMPLETED)
                }
             }.doOnError {
                update(Id, null, State.ERROR)
             }
}

fun insertMessage(Id: String): Completable {
    return Completable.fromCallable {
        insert(Id, State.IDLE)
    }
}
Run Code Online (Sandbox Code Playgroud)

使代码更简洁的好选择是使用Completable.fromAction而不是Completable.fromCallable,因此您不需要返回任何内容。

然后您可以使用任何选项startWithconcatWith。两者都会等到第一个可观察对象完成后再运行第二个可观察对象。我更喜欢使用 concatWith,因为它按照函数编写的顺序运行函数。

最终我们得到了一个优雅的解决方案:

insertMessage(id).concatWith(translate(id))
Run Code Online (Sandbox Code Playgroud)

或者

translate(id).startWith(insertMessage(id))
Run Code Online (Sandbox Code Playgroud)

有关 concat 的更多信息:http://reactivex.io/documentation/operators/concat.html

如果您好奇的话,这里是 rxJava 库中函数的实现:

public final Completable startWith(Completable other) {
    requireNonNull(other);
    return concat(other, this);
}

public final Completable concatWith(Completable other) {
    requireNonNull(other);
    return concat(this, other);
}
Run Code Online (Sandbox Code Playgroud)

如您所见,唯一的区别是顺序。