takeWhile 完成后的链式 Observable 没有被调用?

xai*_*oft 5 observable rxjs

我有以下方法,应该这样调用:

  1. registerDomain应该被调用并且应该返回operationId
  2. 几秒后10getOperationDetail应该称为传入operationId
  3. getOperationDetail应每秒调用一次10,直到successful返回。
  4. 一旦getOperationDetail完成,createRecordSets应该被调用。
  5. 最后,getChangeStatus应该调用直到它返回INSYNC
  6. 如果任何 api 调用抛出异常,我该如何在客户端处理错误?

下面的代码调用 registerDomain 和 getOperationDetail,但 getOperationDetail 完成后,它不会移至 createRecordSets。

  registerDomain(domain) {
    return this._adminService.registerDomain(domain)
      .concatMap(operation => this.getOperationDetail(operation.OperationId))
      .concatMap(() => this._adminService.createRecordSets(domain));
  }

  getOperationDetail(operationId) {
    return Observable.interval(10000)
      .mergeMap(() => this._adminService.getOperationDetail(operationId))
      .takeWhile((info) => info.Status.Value !== 'SUCCESSFUL');
  }
  createRecordSets(caseWebsiteUrl) {
    return this._adminService.createRecordSets(caseWebsiteUrl.Url)
        .concatMap(registerId => this.getChangeStatus(registerId));
  }

  getChangeStatus(registerId) {
    return Observable.interval(5000)
      .mergeMap(() => this._adminService.getChange(registerId))
      .takeWhile((info) => info.ChangeInfo.Status.Value !== 'INSYNC');
  }
Run Code Online (Sandbox Code Playgroud)

我更新getOperationDetail为使用first运算符:

  getOperationDetail(operationId) {
    return Observable.interval(3000)
      .mergeMap(() => this._adminService.getOperationDetail(operationId))
      .first((info) =>  info.Status.Value === 'SUCCESSFUL')

  }
Run Code Online (Sandbox Code Playgroud)

现在它确实调用了createRecordSets,但是之后createRecordSets,它继续调用了getOperationDetail大约 13 次,最终调用了getChangeStatus。从我的角度来看,我认为:

  1. 调用getOperationDetail直到收到SUCCESS.
  2. 打电话createRecordSets一次。
  3. 呼叫getChangeStatus直至收到INSYNC
  4. 完毕。

为什么要额外调用?

我将 registerDomain 更改为如下所示:

 registerDomain(domain) {
    return this._adminService.registerDomain(domain)
      .concatMap(operation => this.getOperationDetail(operation.OperationId))
        .concatMap((op) => this.createRecordSets(op));
Run Code Online (Sandbox Code Playgroud)

在我被.concatMap((op) => this.createRecordSets(op))锁住之前this.getOperationDetail。一旦我把它移到外面,它就开始按预期工作。但我不确定为什么。有人可以解释一下吗?

Ser*_*aev 6

takeWhile满足满足指定条件的值时,它会完成可观察值而不传播该值。这意味着下一个链式运算符将不会收到该值,也不会调用其回调。

\n\n

假设在您的示例中,前两次调用导致this._adminService.getOperationDetail(...)不成功状态,而第三次调用成功。这意味着返回的可观察值getOperationDetail()只会产生两个info值,每个值都具有不成功状态。同样重要的是,下一个链式concatMap运算符将针对每个不成功的值调用其回调,这意味着createRecordSets()将被调用两次。我想您可能想避免这种情况。

\n\n

我建议改用first运算符:

\n\n
getOperationDetail(operationId) {\n    return Observable.interval(10000)\n        .concatMap(() => this._adminService.getOperationDetail(operationId))\n        .first(info => info.Status.Value !== \'SUCCESSFUL\');\n}\n
Run Code Online (Sandbox Code Playgroud)\n\n

这种方式getOperationDetail()一旦成功就只会产生一个“成功”值this._adminService.getOperationDetail(operationId)。该first运算符发出与指定条件匹配的源可观察值的第一个值,然后完成。

\n\n

当涉及到错误处理时,catchretry运算符可能会很有用。

\n\n

更新:

\n\n

您遇到的意外行为(完成getOperationDetail()后不断被调用first())似乎. 正如本期rxjs所述,

\n\n
\n

每个 take-ish 运算符(早于其源 Observable 完成的运算符)在与延长订阅的运算符(此处为 switchMap)结合使用时,将继续订阅源 Observable。

\n
\n\n

first都是takeWhile此类操作符的示例,而“延长”订阅的操作符例如是switchMapconcatMapmergeMapconcatMap在下面的示例中,当内部可观察值发出值时,数字将被保留记录:

\n\n
var takeish$ = Rx.Observable.interval(200)\n    // will keep logging until inner observable of `concatMap` is completed\n    .do(x => console.log(x))\n    .takeWhile(x => x < 2);\n\nvar source = takeish$\n    .concatMap(x => Rx.Observable.interval(200).take(10))\n    .subscribe();\n
Run Code Online (Sandbox Code Playgroud)\n\n

看起来可以通过将包含此类 take-ish 运算符的可观察量转换为高阶可观察量 \xe2\x80\x94 来解决此问题,方法与您所做的类似:

\n\n
var takeish$ = Rx.Observable.interval(200)\n    // will log only 0, 1, 2\n    .do(x => console.log(x))\n    .takeWhile(x => x < 2);\n\nvar source = Rx.Observable.of(null)\n    .switchMap(() => takeish$)\n    .concatMap(x => Rx.Observable.interval(200).take(1))\n    .subscribe();\n
Run Code Online (Sandbox Code Playgroud)\n\n

更新2:

\n\n

从 rxjs 版本 5.4.2 开始,上述错误似乎仍然存在。例如,它会影响当满足指定条件时是否first取消订阅运算符的源可观察对象。firstfirst运算符后紧跟 时concatMap,其源可观察量将不会取消订阅,并且将继续发出值,直到内部可观察量concatMap完成。在你的情况下,这意味着this._adminService.getOperationDetail()将继续被调用,直到 observable 返回createRecordSets()完成为止。

\n\n

这是简化的示例以说明行为:

\n\n

\r\n
\r\n
getOperationDetail(operationId) {\n    return Observable.interval(10000)\n        .concatMap(() => this._adminService.getOperationDetail(operationId))\n        .first(info => info.Status.Value !== \'SUCCESSFUL\');\n}\n
Run Code Online (Sandbox Code Playgroud)\r\n
var takeish$ = Rx.Observable.interval(200)\n    // will keep logging until inner observable of `concatMap` is completed\n    .do(x => console.log(x))\n    .takeWhile(x => x < 2);\n\nvar source = takeish$\n    .concatMap(x => Rx.Observable.interval(200).take(10))\n    .subscribe();\n
Run Code Online (Sandbox Code Playgroud)\r\n
\r\n
\r\n

\n\n

如果我们扩展第一个运算符的内部可观察量concatMap,我们将得到以下可观察量:

\n\n
Rx.Observable.interval(100)\n    .do(x => console.log(x))\n    .first(x => x === 2)\n    .concatMap(() => Rx.Observable.interval(200).take(5));\n
Run Code Online (Sandbox Code Playgroud)\n\n

请注意,first紧随其后的是防止操作符(即)concatMap的源可观察值被取消订阅。值将继续被记录(或者在您的情况下,服务调用将继续发送),直到(ie ) 的内部可观察完成。firstinterval(100).do(x => console.log(x)concatMapinterval(200).take(5)

\n\n

如果我们修改上面的示例并将第二个移出concatMap第一个的内部可观察量concatMapfirst则将不再与它链接,并且一旦满足条件就会取消订阅源可观察量,这意味着该间隔将停止发出值并且将不会记录更多号码(或者不会发送更多服务请求):

\n\n

\r\n
\r\n
var takeish$ = Rx.Observable.interval(200)\n    // will log only 0, 1, 2\n    .do(x => console.log(x))\n    .takeWhile(x => x < 2);\n\nvar source = Rx.Observable.of(null)\n    .switchMap(() => takeish$)\n    .concatMap(x => Rx.Observable.interval(200).take(1))\n    .subscribe();\n
Run Code Online (Sandbox Code Playgroud)\r\n
function registerDomain() {\r\n    return Rx.Observable.of("operation")\r\n        .concatMap(() => getOperationDetail()\r\n            .concatMap(() => Rx.Observable.interval(200).take(5)));\r\n}\r\n\r\nfunction getOperationDetail() {\r\n    return Rx.Observable.interval(100)\r\n        // console.log() instead of the actual service call\r\n        .do(x => console.log(x))\r\n        .first(x => x === 2);\r\n}\r\n\r\nregisterDomain().subscribe();
Run Code Online (Sandbox Code Playgroud)\r\n
\r\n
\r\n

\n\n

在这种情况下,内部可观察可以简单地扩展到:

\n\n
Rx.Observable.interval(100)\n    .do(x => console.log(x))\n    .first(x => x === 2)\n
Run Code Online (Sandbox Code Playgroud)\n\n

请注意,first后面不再是concatMap

\n\n

还值得一提的是,在这两种情况下,由返回的 observableregisterDomain()产生完全相同的值,如果我们将日志记录从do()操作符移至subscribe(),则在两种情况下相同的值将被写入控制台:

\n\n
registerDomain.subscribe(x => console.log(x));\n
Run Code Online (Sandbox Code Playgroud)\n