RxJS自定义操作员内部变量

Jef*_*D23 4 javascript functional-programming rxjs reactivex

在RxJS中使用/改变自定义运算符闭包中的变量是否存在缺陷?我意识到它违反了"纯粹"的功能原则,你可以使用scan这个简单的例子,但我特别要求下面的基本模式的有形技术问题:

const custom = () => {

  let state = 0; 

  return pipe(
    map(next => state * next),
    tap(_ => state += 1),
    share()
  )
}

// Usage
const obs = interval(1000).pipe(custom())

obs.subscribe()
Run Code Online (Sandbox Code Playgroud)

car*_*ant 7

您在custom运营商中存储状态的方式至少存在两个问题.

第一个问题是,这样做意味着操作员不再是参考透明的.也就是说,如果用运算符的返回值替换运算符的调用,则行为是不同的:

const { pipe, range } = rxjs;
const { map, share, tap } = rxjs.operators;

const custom = () => {
  let state = 0; 
  return pipe(
    map(next => state * next),
    tap(_ => state += 1),
    share()
  );
};

const op = custom();
console.log("first use:");
range(1, 2).pipe(op).subscribe(n => console.log(n));
console.log("second use:");
range(1, 2).pipe(op).subscribe(n => console.log(n));
Run Code Online (Sandbox Code Playgroud)
.as-console-wrapper { max-height: 100% !important; top: 0; }
Run Code Online (Sandbox Code Playgroud)
<script src="https://unpkg.com/rxjs@6/bundles/rxjs.umd.min.js"></script>
Run Code Online (Sandbox Code Playgroud)

第二个问题 - 如另一个答案中所提到的 - 不同的订阅将在其next通知中接收不同的值,因为运营商内的状态是共享的.

例如,如果source observable是同步的,则连续订阅将看到不同的值:

const { pipe, range } = rxjs;
const { map, share, tap } = rxjs.operators;

const custom = () => {
  let state = 0; 
  return pipe(
    map(next => state * next),
    tap(_ => state += 1),
    share()
  );
};

const source = range(1, 2).pipe(custom());
console.log("first subscription:");
source.subscribe(n  => console.log(n));
console.log("second subscription:");
source.subscribe(n => console.log(n));
Run Code Online (Sandbox Code Playgroud)
.as-console-wrapper { max-height: 100% !important; top: 0; }
Run Code Online (Sandbox Code Playgroud)
<script src="https://unpkg.com/rxjs@6/bundles/rxjs.umd.min.js"></script>
Run Code Online (Sandbox Code Playgroud)

但是,可以编写一个与您的custom操作员非常相似的操作符,并使其在所有情况下都能正常运行.为此,必须确保运营商内的任何州都是按订阅.

可管理运算符只是一个带有observable并返回一个observable的函数,因此您可以使用它defer来确保您的状态是按订阅,如下所示:

const { defer, pipe, range } = rxjs;
const { map, share, tap } = rxjs.operators;

const custom = () => {
  return source => defer(() => {
    let state = 0; 
    return source.pipe(
      map(next => state * next),
      tap(_ => state += 1)
    );
  }).pipe(share());
};

const op = custom();
console.log("first use:");
range(1, 2).pipe(op).subscribe(n => console.log(n));
console.log("second use:");
range(1, 2).pipe(op).subscribe(n => console.log(n));

const source = range(1, 2).pipe(op);
console.log("first subscription:");
source.subscribe(n => console.log(n));
console.log("second subscription:");
source.subscribe(n => console.log(n));
Run Code Online (Sandbox Code Playgroud)
.as-console-wrapper { max-height: 100% !important; top: 0; }
Run Code Online (Sandbox Code Playgroud)
<script src="https://unpkg.com/rxjs@6/bundles/rxjs.umd.min.js"></script>
Run Code Online (Sandbox Code Playgroud)