似乎这两个功能非常相似.它们具有相同的签名(接受rx.functions.Func1<? super T, ? extends Observable<? extends R>> func
),并且它们的大理石图看起来完全相同.不能在这里粘贴图片,但这里是concatMap的一个,这里是flatMap的一个.在结果的描述中似乎存在一些细微的差别Observable
,其中一个由concatMap
包含由结合的Observable产生的flatMap
项产生,而由包含由首先合并结果的Observable 产生的项产生,并且发出该合并的结果.
然而,这种微妙之处对我来说完全不清楚.任何人都可以更好地解释这种差异,并且理想地给出一些说明这种差异的例子.
Android Studio 3.1 RC 2
kotlin 1.2.30
Run Code Online (Sandbox Code Playgroud)
Java中fetchMessage的签名
Single<Response> fetchMessage(final String Id);
Run Code Online (Sandbox Code Playgroud)
kotlin代码
fun translate(Id: String): Completable {
return repository.fetchMessage(Id)
.flatMap {
Single.fromCallable<State>({
update(messageId, it, State.COMPLETED)
State.COMPLETED
})
}
.onErrorReturn({
update(Id, null, State.ERROR)
State.ERROR
})
.toCompletable()
}
Run Code Online (Sandbox Code Playgroud)
我想在fetchMessage之前运行的方法
fun insertMessage(Id: String): Completable {
return Completable.fromCallable {
insert(Id, State.IDLE)
}
}
Run Code Online (Sandbox Code Playgroud)
我希望insertMessage e以某种方式在fetchMessage之前运行.我在考虑使用concatMap但不确定如何组合translate和insertMessage.这样insertMessage将首先运行,然后一旦完成,将运行translate.
非常感谢任何建议,
Update solution 1 using startWith(..):
Run Code Online (Sandbox Code Playgroud)
通过将translate的返回值更改为Single.我这样做了:
fun translate(Id: String): Single<State> {
return repository.fetchMessage(Id)
.flatMap {
Single.fromCallable<State>({
update(messageId, it, State.COMPLETED)
State.COMPLETED
})
}
.onErrorReturn({
update(Id, null, State.ERROR)
State.ERROR …
Run Code Online (Sandbox Code Playgroud) 我使用concatMap
长时间运行的操作一次处理一个项目流。在某些时候,我需要“中断”这个长时间运行的操作,但只针对当前项:
@Test
public void main() throws InterruptedException {
TestObserver<String> test = Observable.just(1, 2, 3, 4, 5)
.concatMap(this::doLongRunningOperation)
.test();
Thread.sleep(10000);
System.out.println("interrupt NOW");
// Now I need to interrupt whichever longRunningOperation in
// progress, but I don't want to interrupt the whole stream.
// In other words, I want to force it to move onto the next
// integer.
}
Observable<String> doLongRunningOperation(final Integer integer) {
return Observable
.just("\tStart working on " + integer,
"\tStill working on " + integer,
"\tAlmost …
Run Code Online (Sandbox Code Playgroud) 我遇到了一个问题,我一直在尝试使用 找到解决方案RxJs
,但似乎找不到适合它的解决方案......
这是我的想法:
concatMap()
避免嵌套订阅,并在前一个请求完成后订阅每个请求。考虑这个非常简化的版本。假设每个of
代表一个完整的 REST 成功请求(稍后将处理错误),并且我将使用该n
参数执行未显示的工作...
const request1 = of('success 1').pipe(
delay(500),
tap(n => console.log('received ' + n)),
);
const request2 = (n) => of('success 2').pipe(
delay(1000),
tap(n => console.log('received ' + n))
);
const request3 = (n) => of('success 3').pipe(
delay(400),
tap(n => console.log('received ' + n))
);
request1.pipe(
concatMap(n => request2(n).pipe(
concatMap(n => request3(n))
))
)
Run Code Online (Sandbox Code Playgroud)
但是,当我订阅最后一段代码时,我只会得到最后一个请求的响应,这是预期的,因为管道解析为该响应。
因此,使用concatMap()
,我可以正确链接我的依赖 REST 调用,但无法跟踪进度。
虽然我可以通过嵌套订阅轻松跟踪进度,但我正在努力避免这种情况并使用最佳实践方法。 …
可能是一个基本问题,但我有一个 Angular 应用程序,它进行后端服务调用以检索一些数据,然后使用该数据进行另一个后端服务调用。
第二个服务调用依赖于第一个成功完成,所以我使用 RxJS 中的 concatMap() 函数。
但是,我下面的代码仅返回第二个服务调用的数据。我需要从两个服务调用返回的所有数据。
感觉我搞砸了 .pipe 调用,但没有取得太大进展。提前致谢。
getData(id: String): Observable<any[]> {
return this.http.get<any>(`${this.baseUrl}/path/${id}`).pipe(
concatMap(
evt =>
<Observable<any[]>>(
this.http.get<any[]>(
`${this.baseUrl}/path/relatedby/${evt.child_id}`
)
)
),
retry(3),
catchError(this.handleError("getData", []))
);}
Run Code Online (Sandbox Code Playgroud) 根据这个线程, conCatMap和flatmap仅根据项目的发射顺序而不同.所以我做了一个测试并创建了一个简单的整数流,并希望看到它们将以什么顺序发出.我制作了一个小的可观察量,它将在1-5的范围内取数,并将它们乘以2.简单.
以下是flatmap的代码:
myObservable.flatMap(new Func1<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> call(Integer integer) {
return Observable.just(integer * 2);
}
}).subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer integer) {
Log.v("myapp","from flatMap:"+integer);
}
});
Run Code Online (Sandbox Code Playgroud)
和使用concatMap完全相同的代码:
myObservable.concatMap(new Func1<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> call(Integer integer) {
return Observable.just(integer * 2);
}
}).subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void …
Run Code Online (Sandbox Code Playgroud) 我有以下代码:
Observable.from(modifiedNodes)
.concatMap(node => {
return this.Model.setData(node);
})
.subscribe(() => {
NodeSaved++;
}
Run Code Online (Sandbox Code Playgroud)
如果setData
为任何中间节点抛出错误,则不会恢复下一个节点序列。我们如何修改它,以便即使一个节点出现错误,也将执行下一个节点序列?我读过,onErrorResumeNext
但我不知道如何在这里使用它。
方法签名setData
如下:
public setData(node: INode): Observable<Object>{}
Run Code Online (Sandbox Code Playgroud) 当使用 RXJS 时,我问自己嵌套的 concatMap 是否等同于顺序的。考虑以下示例:
observable1.pipe(
concatMap(result1 => observable2.pipe(
concatMap(result2 => observable3.pipe(
concatMap(result3 => of(result3)
)
)
).susbcribe((result) => {...});
Run Code Online (Sandbox Code Playgroud)
这将result
导致result3
. 为了避免嵌套,我想写成如下:
of(null).pipe(
concatMap(() => observable1),
concatMap(result1 => observable2),
concatMap(result2 => observable3),
concatMap(result3 => of(result3))
).susbcribe((result) => {...});
Run Code Online (Sandbox Code Playgroud)
这也会导致result
存在result3
。即使在细节级别上可能存在差异,我是否可以假设连接可观察量的两种方法都被认为是等效的(例如,关于失败)?
任何帮助表示赞赏...