mai*_*mic 6 javascript reactive-programming reactive-extensions-js rxjs
我在RxJS中遇到了一个特殊的生产者消费者问题:生产者慢慢生产元素.消费者正在请求元素,并且经常必须等待生产者.这可以通过压缩生产者和请求流来实现:
var produce = getProduceStream();
var request = getRequestStream();
var consume = Rx.Observable.zipArray(produce, request).pluck(0);
Run Code Online (Sandbox Code Playgroud)
有时请求会中止.生成的元素只应在未中止请求后使用:
produce: -------------p1-------------------------p2--------->
request: --r1--------------r2---------------r3-------------->
abort: ------a(r1)------------------a(?)------------------>
consume: ------------------c(p1, r2)-------------c(p2, r3)-->
Run Code Online (Sandbox Code Playgroud)
第一个请求r1将使用第一个生成的元素p1,但在它可以消耗之前r1被中止.生成并在第二次请求时消耗.第二次中止被忽略,因为之前没有发出未答复的请求.第三个请求必须等待下一个生成的元素,并且在生成之前不会中止.因此,在生产后立即消耗.a(r1)p1p1c(p1, r2)r2a(?)r3p2p2p2c(p2, r3)
我怎样才能在RxJS中实现这一目标?
编辑:
我创建了一个例子与jsbin一个QUnit测试.您可以编辑该功能createConsume(produce, request, abort)以尝试/测试您的解决方案.
该示例包含先前接受的答案的函数定义.
此解决方案忽略不遵循未答复请求的中止:
const {merge} = Rx.Observable;
Rx.Observable.prototype.wrapValue = function(wrapper) {
wrapper = (wrapper || {});
return this.map(function (value) {
wrapper.value = value;
return wrapper;
});
};
function createConsume(produce, request, abort) {
return merge(
produce.wrapValue({type: 'produce'}),
request.wrapValue({type: 'request'}),
abort.wrapValue({type: 'abort'})
)
.scan(
[false, []],
([isRequest, products], e) => {
// if last time the request was answered
if (isRequest && products.length) {
// remove consumed product
products.shift();
// mark request as answered
isRequest = false;
}
if (e.type === 'produce') {
// save product to consume later
products.push(e.value);
} else {
// if evaluated to false, e.type === 'abort'
isRequest = (e.type === 'request');
}
return [isRequest, products];
}
)
.filter( ([isRequest, products]) => (isRequest && products.length) )
.map( ([isRequest, products]) => products[0] ); // consume
}
Run Code Online (Sandbox Code Playgroud)
JSBin 最新测试中的代码。
| 归档时间: |
|
| 查看次数: |
729 次 |
| 最近记录: |