RxJS 6 扩展不是无限的?

Tim*_*ter 3 rxjs rxjs6

我一直在玩 RxJS 来探索边缘并遇到了一些意想不到的事情。

考虑以下代码:

of(1)
.pipe(expand(x => of(x + 1)))
.subscribe(
    x => console.log(x), 
    err => console.log(err), 
    () => console.log('complete'));
Run Code Online (Sandbox Code Playgroud)

在 node 中运行时,我希望这会产生一个无限的数字列表,每个数字增加 1,当它超过 JavaScript 中的最大数字时最终会出错,或者我会用 Ctrl-C(在 Windows 上)杀死它。我没想到的是,在记录的数字少于 1000 个后,它会停止而不会出错。

我得到的是写入控制台的数字 1 - 783。

然后它停止了。

每次。

我运行它。它停在 783(这不是一个特殊数字 - 256、1024 等)。它不会记录错误或“完成”。它只是停止。

我原以为它会“永远”运行,抛出错误,或者至少注销“完成”。但它只是停止并将我返回到命令行。

我做了一些挖掘并认为它可能与可选的并发参数有关,Number.POSITIVE_INFINITY根据expand文档默认

但是,如果我明确指定此参数的值,我会得到同样奇怪的结果 - 即 1 的并发值(始终为 ~860)比并发值 1,000,000(回到 783)产生的值多于 100(~850) ,以及超过 200 (~840)。似乎大多数值都是在并发为 1 的情况下发出的,然后随着并发性的增加,它慢慢地下降到 783 的低值。也就是说,我不知道是否真的是并发影响了这一点,尽管它看起来确实相关。

我快速浏览了 expand 的源代码,但没有看到任何可以解释这一点的内容。

这不是我会在生产中运行的代码,但它激起了我的好奇心。有没有人对什么可以解释这一点有任何想法?

  • RxJS 6.3.2

  • 节点 8.9.4

谢谢,

电讯

小智 5

这是预期的行为。我不知道为什么你没有从 node.js 收到任何错误消息,但它实际上如下所示:

RangeError: Maximum call stack size exceeded
at SafeSubscriber.__tryOrUnsub (/Users/ojkwon/github/goj/node_modules/rxjs/internal/Subscriber.js:212:18)
...
Run Code Online (Sandbox Code Playgroud)

简而言之,它遇到了堆栈溢出。

当您调用 时expand,您将返回通过创建的可观察对象of,并通过扩展运算符递归。Rx v5 及更高版本的调度行为not scheduling anything默认为 - 意味着,您创建了同步 observable 并再次创建同步堆栈,最终爆炸。

您可以通过使用异步源或给定值以异步方式显式调度来进行无限扩展:即,如果您expand在异步调度程序中调度返回的 observable

const { of, asyncScheduler } = require('rxjs');
const { expand } = require('rxjs/operators');

of(1)
  .pipe(expand(x => of(x + 1, asyncScheduler)))
  .subscribe(
    x => console.log(x),
    err => console.log(err),
    () => console.log('complete'));
Run Code Online (Sandbox Code Playgroud)

那么你可以观察到操作员不会停止。