Jef*_*D23 4 javascript functional-programming rxjs reactivex
在RxJS中使用/改变自定义运算符闭包中的变量是否存在缺陷?我意识到它违反了"纯粹"的功能原则,你可以使用scan这个简单的例子,但我特别要求下面的基本模式的有形技术问题:
const custom = () => {
let state = 0;
return pipe(
map(next => state * next),
tap(_ => state += 1),
share()
)
}
// Usage
const obs = interval(1000).pipe(custom())
obs.subscribe()
Run Code Online (Sandbox Code Playgroud)
您在custom运营商中存储状态的方式至少存在两个问题.
第一个问题是,这样做意味着操作员不再是参考透明的.也就是说,如果用运算符的返回值替换运算符的调用,则行为是不同的:
const { pipe, range } = rxjs;
const { map, share, tap } = rxjs.operators;
const custom = () => {
let state = 0;
return pipe(
map(next => state * next),
tap(_ => state += 1),
share()
);
};
const op = custom();
console.log("first use:");
range(1, 2).pipe(op).subscribe(n => console.log(n));
console.log("second use:");
range(1, 2).pipe(op).subscribe(n => console.log(n));Run Code Online (Sandbox Code Playgroud)
.as-console-wrapper { max-height: 100% !important; top: 0; }Run Code Online (Sandbox Code Playgroud)
<script src="https://unpkg.com/rxjs@6/bundles/rxjs.umd.min.js"></script>Run Code Online (Sandbox Code Playgroud)
第二个问题 - 如另一个答案中所提到的 - 不同的订阅将在其next通知中接收不同的值,因为运营商内的状态是共享的.
例如,如果source observable是同步的,则连续订阅将看到不同的值:
const { pipe, range } = rxjs;
const { map, share, tap } = rxjs.operators;
const custom = () => {
let state = 0;
return pipe(
map(next => state * next),
tap(_ => state += 1),
share()
);
};
const source = range(1, 2).pipe(custom());
console.log("first subscription:");
source.subscribe(n => console.log(n));
console.log("second subscription:");
source.subscribe(n => console.log(n));Run Code Online (Sandbox Code Playgroud)
.as-console-wrapper { max-height: 100% !important; top: 0; }Run Code Online (Sandbox Code Playgroud)
<script src="https://unpkg.com/rxjs@6/bundles/rxjs.umd.min.js"></script>Run Code Online (Sandbox Code Playgroud)
但是,可以编写一个与您的custom操作员非常相似的操作符,并使其在所有情况下都能正常运行.为此,必须确保运营商内的任何州都是按订阅.
可管理运算符只是一个带有observable并返回一个observable的函数,因此您可以使用它defer来确保您的状态是按订阅,如下所示:
const { defer, pipe, range } = rxjs;
const { map, share, tap } = rxjs.operators;
const custom = () => {
return source => defer(() => {
let state = 0;
return source.pipe(
map(next => state * next),
tap(_ => state += 1)
);
}).pipe(share());
};
const op = custom();
console.log("first use:");
range(1, 2).pipe(op).subscribe(n => console.log(n));
console.log("second use:");
range(1, 2).pipe(op).subscribe(n => console.log(n));
const source = range(1, 2).pipe(op);
console.log("first subscription:");
source.subscribe(n => console.log(n));
console.log("second subscription:");
source.subscribe(n => console.log(n));Run Code Online (Sandbox Code Playgroud)
.as-console-wrapper { max-height: 100% !important; top: 0; }Run Code Online (Sandbox Code Playgroud)
<script src="https://unpkg.com/rxjs@6/bundles/rxjs.umd.min.js"></script>Run Code Online (Sandbox Code Playgroud)