Bre*_*rto 8 reactive-programming rxjs rxjs5
我有一个用例,每当另一个通知者Observable发出时,我都需要Observable跳过其下一个发出。
source: |---X---X---X---X---X---X---X---X---X---X--|>
notifier: |-------------X---------X----------X-------|>
result: |---X---X---X-------X---X-------X-------X--|>
Run Code Online (Sandbox Code Playgroud)
基本上,我希望有一个叫做的运算符skipNextWhen
,它以可观察的方式接收通知程序,并跳过源中的下一个发射。
我尝试使用使用pausable
运算符的实现(使用来重新实现switchMap
),但无法使其正常工作。
停顿的
import { Observable } from 'rxjs/Observable';
import { Subject } from 'rxjs/Subject';
import 'rxjs/add/observable/never';
import 'rxjs/add/operator/startWith';
declare module 'rxjs/Observable' {
interface Observable<T> {
pausable: typeof pausable;
}
}
function pausable<T>(notifier: Observable<boolean>): Observable<T> {
return notifier.startWith(false).switchMap((paused) => {
if (paused) {
return Observable.never();
} else {
const source = new Subject();
this.subscribe(source);
return source;
}
});
}
Observable.prototype.pausable = pausable;
Run Code Online (Sandbox Code Playgroud)
skipNextWhen.ts
import { Observable } from 'rxjs/Observable';
import './pausable';
declare module 'rxjs/Observable' {
interface Observable<T> {
skipNextWhen: typeof skipNextWhen;
}
}
function skipNextWhen<T, R>(other: Observable<T>): Observable<R> {
const notifier = Observable.merge(this.map(() => false),
other.map(() => true));
return this.pausable(notifier);
}
Observable.prototype.skipNextWhen = skipNextWhen;
Run Code Online (Sandbox Code Playgroud)
我应该考虑使用更适合的运算符吗?我在当前实现中看到的行为是,结果Observable发出一次,然后再也不会发出-即使通知者Observable从未发出。
我可以想到两种解决方案:
使用.filter()
,.do()
以及一些副作用。
即使不是那种“Rx”方式,这也可能更容易理解解决方案:
function skipNextWhen(other) {
let skipNext = false;
return this.merge(other.do(() => skipNext = true).filter(() => false))
.filter(val => {
const doSkip = skipNext;
skipNext = false;
return !doSkip;
});
}
Run Code Online (Sandbox Code Playgroud)
我merge()
只是用来更新skipNext
,other
的值总是被忽略。
使用.scan()
:
该解决方案没有任何状态变量和副作用。
function skipNextWhen(other) {
const SKIP = 'skip';
return this.merge(other.mapTo(SKIP))
.scan((acc, val) => {
if (acc === SKIP) {
return null;
} else if (val === SKIP) {
return SKIP;
} else {
return val;
}
}, [])
.filter(val => Boolean(val) && val !== SKIP);
}
Run Code Online (Sandbox Code Playgroud)
基本上,当SKIP
到达时我会立即返回它,因为它会被操作员再次在acc
参数中传递scan()
,然后被filter()
.
如果我收到一个正常值但先前的值是SKIP
我忽略它并返回null
稍后过滤掉的值。
两种解决方案都给出了相同的结果:
Observable.prototype.skipNextWhen = skipNextWhen;
const source = Observable.range(1, 10)
.concatMap(val => Observable.of(val).delay(100));
source
.skipNextWhen(Observable.interval(350))
.subscribe(console.log);
Run Code Online (Sandbox Code Playgroud)
这将打印以下内容:
1
2
3
5
6
8
9
10
Run Code Online (Sandbox Code Playgroud)
请注意,您实际上并不是在创建新的运算符。您只有操作员链的快捷方式。例如,这不会让您other
在源完成时取消订阅。
我已经启动了一个(非常)小的库,其中包含我想要的一些 rxjs 实用程序。它恰好有一个功能可以完全按照您的要求执行:skipAfter
。从文档:
source: -1-----2-----3-----4-----5-|
skip$: ----0----------0-0----------
result: -1-----------3-----------5-|
Run Code Online (Sandbox Code Playgroud)
图书馆在这里:https : //github.com/simontonsoftware/s-rxjs-utils
归档时间: |
|
查看次数: |
625 次 |
最近记录: |