Jus*_*tin 14 rxjs typescript rxjs5 angular
我在Angular 2项目中使用RxJs版本5.我想创建一些observable,但我不希望立即调用observable.
在版本4中,您可以使用(例如)受控命令或Pausable Buffers来控制调用.但是,该功能尚未在版本5中提供.
如何在RxJs 5中获得这种功能?
我的最终目标是对创建的observable进行排队并逐个调用它们.只有在成功处理上一个时才会调用下一个.当一个失败时,队列被清空.
编辑
通过@Niklas Fasching的评论,我可以使用Publish操作创建一个可行的解决方案.
// Queue to queue operations
const queue = [];
// Just a function to create Observers
function createObserver(id): Observer {
return {
next: function (x) {
console.log('Next: ' + id + x);
},
error: function (err) {
console.log('Error: ' + err);
},
complete: function () {
console.log('Completed');
}
};
};
// Creates an async operation and add it to the queue
function createOperation(name: string): Observable {
console.log('add ' + name);
// Create an async operation
var observable = Rx.Observable.create(observer => {
// Some async operation
setTimeout(() =>
observer.next(' Done'),
500);
});
// Hold the operation
var published = observable.publish();
// Add Global subscribe
published.subscribe(createObserver('Global'));
// Add it to the queue
queue.push(published);
// Return the published so the caller could add a subscribe
return published;
};
// Create 4 operations on hold
createOperation('SourceA').subscribe(createObserver('SourceA'));
createOperation('SourceB').subscribe(createObserver('SourceB'));
createOperation('SourceC').subscribe(createObserver('SourceC'));
createOperation('SourceD').subscribe(createObserver('SourceD'));
// Dequeue and run the first
queue.shift().connect();
Run Code Online (Sandbox Code Playgroud)
Ben*_*esh 20
controlled
时,仍然会在订阅时调用Observablecontrolled
RxJS 4中的操作员实际上只是在操作员之后控制Observable的流量.到目前为止,它都通过并缓冲该运营商.考虑一下:
(RxJS 4)http://jsbin.com/yaqabe/1/edit?html,js,console
const source = Rx.Observable.range(0, 5).do(x => console.log('do' + x)).controlled();
source.subscribe(x => console.log(x));
setTimeout(() => {
console.log('requesting');
source.request(2);
}, 1000);
Run Code Online (Sandbox Code Playgroud)
您会注意到,在获得两个值之前Observable.range(0, 5)
,do
立即发出所有五个值...然后暂停一秒(1000毫秒).
所以,它实际上是背压控制的错觉.最后,该运算符中有一个无界缓冲区.一个数组正在收集它正在发送的"上方"的所有内容,并等待您通过调用将其出列request(n)
.
controlled
在这个答案的时候,controlled
RxJS 5中不存在运算符.这有几个原因:1.没有请求它,2.它的名字显然令人困惑(因此StackOverflow上的这个问题)
如何复制RxJS 5中的行为(现在):http://jsbin.com/metuyab/1/edit?html,js,console
// A subject we'll use to zip with the source
const controller = new Rx.Subject();
// A request function to next values into the subject
function request(count) {
for (let i = 0; i < count; i++) {
controller.next(count);
}
}
// We'll zip our source with the subject, we don't care about what
// comes out of the Subject, so we'll drop that.
const source = Rx.Observable.range(0, 5).zip(controller, (x, _) => x);
// Same effect as above Rx 4 example
source.subscribe(x => console.log(x));
// Same effect as above Rx 4 example
request(3);
Run Code Online (Sandbox Code Playgroud)
对于"真正的背压控制",一个解决方案是承诺的迭代器.IoP并非没有问题,但有一点,每回合都有一个对象分配.每个值都有一个与之关联的Promise.另一方面,取消不存在,因为它是承诺.
一个更好的,基于Rx的方法是让一个主题"提供"你的可观察链的顶部,然后你在其余的组成.
像这样:http://jsbin.com/qeqaxo/2/edit?js,console
// start with 5 values
const controller = new Rx.BehaviorSubject(5);
// some observable source, in this case, an interval.
const source = Rx.Observable.interval(100)
const controlled = controller.flatMap(
// map your count into a set of values
(count) => source.take(count),
// additional mapping for metadata about when the block is done
(count, value, _, index) => {
return { value: value, done: count - index === 1 };
})
// when the block is done, request 5 more.
.do(({done}) => done && controller.next(5))
// we only care about the value for output
.map(({value}) => value);
// start our subscription
controlled.subscribe(x => {
console.log(x)
});
Run Code Online (Sandbox Code Playgroud)
...我们有一些可流动的可观察类型的计划,在不久的将来也会有真正的背压控制.对于这种情况,这将更令人兴奋和更好.
您可以通过发布 observable 将observable的开头从订阅分离到它.发布的observable只有在调用connect之后才会启动.
请注意,所有订阅者将共享对可观察序列的单个订阅.
var published = Observable.of(42).publish();
// subscription does not start the observable sequence
published.subscribe(value => console.log('received: ', value));
// connect starts the sequence; subscribers will now receive values
published.connect();
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
4559 次 |
最近记录: |