RxJS v5+ 中的 `Observable.transduce` 发生了什么?

Saw*_*oes 5 method-chaining pipelining transducer rxjs

RxJS v4 曾经有一种Observable.transduce方法可以使用转换器。这允许使用在过去具有重大性能优势的独立于库的传感器运算符。

来源

RxJS v5.5 和 v6 具有可管道操作符和 v6 删除了方法链。因此,我假设 RxJS 操作符是标准的转换器。查看源代码,情况似乎并非如此。

RxJS v6 操作符的功能就像一个转换器,其中每个值在下一个值通过之前完全通过链传递,但 RxJS v6 操作符没有使用我在其他库中看到的标准转换器方法,这意味着,我不认为它们是便携式的。

关于传感器的整个事情是他们对集合本身一无所知。您可以编写 100 个普遍适用于任何集合或流类型的运算符,而不是专门为 observable 编写 100 个运算符。

在 RxJS v5 中是否.pipe一致.transduce或完全删除了这种方法?

use*_*222 2

我有完全相同的问题,但在任何地方都找不到答案。是的,你可以pipe,但我相信这将为每个操作员创建中间可观察量。但我不确定,这与阅读代码有关。

所以我想出了我自己的transduce运算符:

function transformForObserver(o) {
  return {
    "@@transducer/init": function() {
      return o;
    },
    "@@transducer/step": function(obs, input) {
      return obs.next(input);
    },
    "@@transducer/result": function(obs) {
      return obs.complete();
    }
  };
}

    const transduce = (obs, transducer) => {
      const xform = transducer(transformForObserver);

      return Observable.create(o => {
        return obs.subscribe({
          next: x => {
            const res = tryCatch(
              xform["@@transducer/step"],
              err => {
                console.error(`Error occurred in transducer/step!`, err);
                return err;
              }
            )(xform, o, x);

            if (res instanceof Error) { o.error(res); }
          },
          error: err => {
            console.error(`Error occurred in observable passed to Rx transduce fn!`, err);
            o.error(err);
          },
          complete: () => {o.complete();}
        });
      });
    }
Run Code Online (Sandbox Code Playgroud)

还没有测试过,如果有兴趣的话会尽快发布。

更新:我分叉了 jslongser 的传感器库并将此类传感器包含在其中。Fork 为https://github.com/brucou/transducers.js,函数为transduceLazyObservable。比照。测试使用示例。