标签: concatmap

在RxJava中concatMap和flatMap有什么区别

似乎这两个功能非常相似.它们具有相同的签名(接受rx.functions.Func1<? super T, ? extends Observable<? extends R>> func),并且它们的大理石图看起来完全相同.不能在这里粘贴图片,但这里是concatMap的一个,这里是flatMap的一个.在结果的描述中似乎存在一些细微的差别Observable,其中一个由concatMap包含由结合的Observable产生的flatMap项产生,而由包含由首先合并结果的Observable 产生的项产生,并且发出该合并的结果.

然而,这种微妙之处对我来说完全不清楚.任何人都可以更好地解释这种差异,并且理想地给出一些说明这种差异的例子.

java rx-java flatmap concatmap

43
推荐指数
4
解决办法
2万
查看次数

在运行另一个observable之前使用concatMap运行Single

Android Studio 3.1 RC 2
kotlin 1.2.30
Run Code Online (Sandbox Code Playgroud)

Java中fetchMessage的签名

Single<Response> fetchMessage(final String Id); 
Run Code Online (Sandbox Code Playgroud)

kotlin代码

fun translate(Id: String): Completable {
        return repository.fetchMessage(Id)
                .flatMap {
                    Single.fromCallable<State>({
                        update(messageId, it, State.COMPLETED)
                        State.COMPLETED
                    })
                }
                .onErrorReturn({
                    update(Id, null, State.ERROR)
                    State.ERROR
                })
                .toCompletable()
    }
Run Code Online (Sandbox Code Playgroud)

我想在fetchMessage之前运行的方法

 fun insertMessage(Id: String): Completable {
        return Completable.fromCallable {
            insert(Id, State.IDLE)
        }
    }
Run Code Online (Sandbox Code Playgroud)

我希望insertMessage e以某种方式在fetchMessage之前运行.我在考虑使用concatMap但不确定如何组合translate和insertMessage.这样insertMessage将首先运行,然后一旦完成,将运行translate.

非常感谢任何建议,

Update solution 1 using startWith(..):
Run Code Online (Sandbox Code Playgroud)

通过将translate的返回值更改为Single.我这样做了:

fun translate(Id: String): Single<State> {
        return repository.fetchMessage(Id)
                .flatMap {
                    Single.fromCallable<State>({
                        update(messageId, it, State.COMPLETED)
                        State.COMPLETED
                    })
                }
                .onErrorReturn({
                    update(Id, null, State.ERROR)
                    State.ERROR …
Run Code Online (Sandbox Code Playgroud)

kotlin rx-java concatmap

8
推荐指数
2
解决办法
976
查看次数

中断concatMap中的单个可观察对象

我使用concatMap长时间运行的操作一次处理一个项目流。在某些时候,我需要“中断”这个长时间运行的操作,但只针对当前项:

@Test
public void main() throws InterruptedException {
    TestObserver<String> test = Observable.just(1, 2, 3, 4, 5)
            .concatMap(this::doLongRunningOperation)
            .test();

    Thread.sleep(10000);
    System.out.println("interrupt NOW");
    // Now I need to interrupt whichever longRunningOperation in
    // progress, but I don't want to interrupt the whole stream.
    // In other words, I want to force it to move onto the next
    // integer.
}

Observable<String> doLongRunningOperation(final Integer integer) {
    return Observable
            .just("\tStart working on " + integer,
                    "\tStill working on " + integer,
                    "\tAlmost …
Run Code Online (Sandbox Code Playgroud)

concatmap rx-java2

6
推荐指数
1
解决办法
81
查看次数

RXJS 按顺序链接依赖的可观察量,但获取进度条的每次发射

我遇到了一个问题,我一直在尝试使用 找到解决方案RxJs,但似乎找不到适合它的解决方案......

  • 我有 3 个不同的 REST 请求,将按顺序调用,每个请求都需要前一个请求的响应作为参数
  • 我想实现一个进度条,当请求成功时进度条会增加

这是我的想法:

  • 我将使用管道并concatMap()避免嵌套订阅,并在前一个请求完成后订阅每个请求。

考虑这个非常简化的版本。假设每个of代表一个完整的 REST 成功请求(稍后将处理错误),并且我将使用该n参数执行未显示的工作...

const request1 = of('success 1').pipe(
  delay(500),
  tap(n => console.log('received ' + n)),
);

const request2 = (n) => of('success 2').pipe(
  delay(1000),
  tap(n => console.log('received ' + n))
);

const request3 = (n) => of('success 3').pipe(
  delay(400),
  tap(n => console.log('received ' + n))
);

request1.pipe(
  concatMap(n => request2(n).pipe(
    concatMap(n => request3(n))
  ))
)
Run Code Online (Sandbox Code Playgroud)

但是,当我订阅最后一段代码时,我只会得到最后一个请求的响应,这是预期的,因为管道解析为该响应。

因此,使用concatMap(),我可以正确链接我的依赖 REST 调用,但无法跟踪进度。

虽然我可以通过嵌套订阅轻松跟踪进度,但我正在努力避免这种情况并使用最佳实践方法。 …

javascript pipe observable rxjs concatmap

6
推荐指数
1
解决办法
1322
查看次数

Angular - RxJS ConcatMap - 从两个服务调用返回数据

可能是一个基本问题,但我有一个 Angular 应用程序,它进行后端服务调用以检索一些数据,然后使用该数据进行另一个后端服务调用。

第二个服务调用依赖于第一个成功完成,所以我使用 RxJS 中的 concatMap() 函数。

但是,我下面的代码仅返回第二个服务调用的数据。我需要从两个服务调用返回的所有数据。

感觉我搞砸了 .pipe 调用,但没有取得太大进展。提前致谢。

getData(id: String): Observable<any[]> {
return this.http.get<any>(`${this.baseUrl}/path/${id}`).pipe(
  concatMap(
    evt =>
      <Observable<any[]>>(
        this.http.get<any[]>(
          `${this.baseUrl}/path/relatedby/${evt.child_id}`
        )
      )
  ),
  retry(3),
  catchError(this.handleError("getData", []))
);}
Run Code Online (Sandbox Code Playgroud)

service rxjs concatmap angular

5
推荐指数
1
解决办法
7775
查看次数

RxJava - flatmap vs concatMap - 为什么在订阅时订购相同?

根据这个线程, conCatMap和flatmap仅根据项目的发射顺序而不同.所以我做了一个测试并创建了一个简单的整数流,并希望看到它们将以什么顺序发出.我制作了一个小的可观察量,它将在1-5的范围内取数,并将它们乘以2.简单.

以下是flatmap的代码:

myObservable.flatMap(new Func1<Integer, Observable<Integer>>() {
        @Override
        public Observable<Integer> call(Integer integer) {
            return Observable.just(integer * 2);
        }
    }).subscribe(new Observer<Integer>() {
        @Override
        public void onCompleted() {

        }

        @Override
        public void onError(Throwable e) {

        }

        @Override
        public void onNext(Integer integer) {
        Log.v("myapp","from flatMap:"+integer);
        }
    });
Run Code Online (Sandbox Code Playgroud)

和使用concatMap完全相同的代码:

myObservable.concatMap(new Func1<Integer, Observable<Integer>>() {
        @Override
        public Observable<Integer> call(Integer integer) {
            return Observable.just(integer * 2);
        }
    }).subscribe(new Observer<Integer>() {
        @Override
        public void onCompleted() {

        }

        @Override
        public void onError(Throwable e) {

        }

        @Override
        public void …
Run Code Online (Sandbox Code Playgroud)

rx-java flatmap concatmap

3
推荐指数
1
解决办法
2314
查看次数

ConcatMap 在一个 observable 出现错误时中断

我有以下代码:

Observable.from(modifiedNodes)
    .concatMap(node => {
        return this.Model.setData(node);
    })
    .subscribe(() => {
        NodeSaved++;
    }
Run Code Online (Sandbox Code Playgroud)

如果setData为任何中间节点抛出错误,则不会恢复下一个节点序列。我们如何修改它,以便即使一个节点出现错误,也将执行下一个节点序列?我读过,onErrorResumeNext但我不知道如何在这里使用它。

方法签名setData如下:

public setData(node: INode): Observable<Object>{}
Run Code Online (Sandbox Code Playgroud)

observable rxjs concatmap

3
推荐指数
1
解决办法
3856
查看次数

RXJS - 嵌套 concapMap 是否等同于顺序 concatMap?

当使用 RXJS 时,我问自己嵌套的 concatMap 是否等同于顺序的。考虑以下示例:

observable1.pipe(
   concatMap(result1 => observable2.pipe(
      concatMap(result2 => observable3.pipe(
         concatMap(result3 => of(result3)
      )
   )
).susbcribe((result) => {...});
Run Code Online (Sandbox Code Playgroud)

这将result导致result3. 为了避免嵌套,我想写成如下:

of(null).pipe(
   concatMap(() => observable1),
   concatMap(result1 => observable2),
   concatMap(result2 => observable3),
   concatMap(result3 => of(result3))
).susbcribe((result) => {...});
Run Code Online (Sandbox Code Playgroud)

这也会导致result存在result3。即使在细节级别上可能存在差异,我是否可以假设连接可观察量的两种方法都被认为是等效的(例如,关于失败)?

任何帮助表示赞赏...

javascript rxjs concatmap

2
推荐指数
1
解决办法
900
查看次数