RxJS:生产者 - 消费者中止

mai*_*mic 6 javascript reactive-programming reactive-extensions-js rxjs

我在RxJS中遇到了一个特殊的生产者消费者问题:生产者慢慢生产元素.消费者正在请求元素,并且经常必须等待生产者.这可以通过压缩生产者和请求流来实现:

var produce = getProduceStream();
var request = getRequestStream();

var consume = Rx.Observable.zipArray(produce, request).pluck(0);
Run Code Online (Sandbox Code Playgroud)

有时请求会中止.生成的元素只应在未中止请求后使用:

produce:  -------------p1-------------------------p2--------->
request:  --r1--------------r2---------------r3-------------->
abort:    ------a(r1)------------------a(?)------------------>
consume:  ------------------c(p1, r2)-------------c(p2, r3)-->
Run Code Online (Sandbox Code Playgroud)

第一个请求r1将使用第一个生成的元素p1,但在它可以消耗之前r1被中止.生成并在第二次请求时消耗.第二次中止被忽略,因为之前没有发出未答复的请求.第三个请求必须等待下一个生成的元素,并且在生成之前不会中止.因此,在生产后立即消耗.a(r1)p1p1c(p1, r2)r2a(?)r3p2p2p2c(p2, r3)

我怎样才能在RxJS中实现这一目标?

编辑: 我创建了一个例子与jsbin一个QUnit测试.您可以编辑该功能createConsume(produce, request, abort)以尝试/测试您的解决方案.

该示例包含先前接受的答案的函数定义.

mai*_*mic 0

此解决方案忽略不遵循未答复请求的中止:

const {merge} = Rx.Observable;

Rx.Observable.prototype.wrapValue = function(wrapper) {
    wrapper = (wrapper || {});
    return this.map(function (value) {
        wrapper.value = value;
        return wrapper;
    });
};

function createConsume(produce, request, abort) {
  return merge(
            produce.wrapValue({type: 'produce'}),
            request.wrapValue({type: 'request'}),
            abort.wrapValue({type: 'abort'})
         )
         .scan(
            [false, []],
            ([isRequest, products], e) => {
                // if last time the request was answered
                if (isRequest && products.length) {
                    // remove consumed product
                    products.shift();
                    // mark request as answered
                    isRequest = false;
                }
                if (e.type === 'produce') {
                    // save product to consume later
                    products.push(e.value);
                } else {
                    // if evaluated to false, e.type === 'abort'
                    isRequest = (e.type === 'request');
                }
                return [isRequest, products];
            }
         )
         .filter( ([isRequest, products]) => (isRequest && products.length) )
         .map( ([isRequest, products]) => products[0] ); // consume
}
Run Code Online (Sandbox Code Playgroud)

JSBin 最新测试中的代码。