将数据放回可读流

Mat*_*son 5 stream node.js

TL; DR如何从流中读取一些数据然后将其放回,以便其他消费者获得相同的data事件?

这是一个可读的流,流1 ... Infinity:

var Readable = require('stream').Readable;

var readable = new Readable();

var c = 0;

readable._read = function () {

    var self = this;

    setTimeout(function () {
        self.push((++c).toString());
    }, 500);
};
Run Code Online (Sandbox Code Playgroud)

我想读取第一个data事件,查看数据,然后将流"重置"到其原始状态,并允许其他另一个data侦听器使用第一个事件,就好像它从未发生过一样.我认为unshift()这将是正确的方法,如文档中所述:

readable.unshift(块)#

块缓冲区| 字符串要卸载到读取队列的数据块这在某些情况下非常有用,在这种情况下,解析器正在使用流,而解析器需要"取消"一些它乐观地从源中取出的数据,以便流可以传递给其他一方.

这对我的需求来说听起来很完美,但它并不符合我的预期:

...

readable.once('data', function (d) {

    console.log(d.toString());              // Outputs 1

    readable.unshift(d);                    // Put the 1 back on the stream

    readable.on('data', function (d) {
        console.log(d.toString());          // Heh?! Outputs 2, how about 1?
    });

});
Run Code Online (Sandbox Code Playgroud)

Mat*_*son 8

所以我想出了答案:

当您调用stream.unshift()if时,如果流处于流动模式,则会立即发出数据事件.所以当我在我的例子中添加监听器时,船已经航行了.

readable.unshift(d);                  // emits 'data' event

readable.on('data', function (d) {    // missed `data` event
    console.log(d.toString());
});
Run Code Online (Sandbox Code Playgroud)

有几种方法可以让它按照我的预期运作:

1)在取消之前添加新的监听器:

readable.once('data', function (d) {

    console.log(d.toString());              // Outputs 1

    readable.on('data', function (d) {
        console.log(d.toString());          // Outputs 1,1,2,3...
    });

    readable.unshift(d);                    // Put the 1 back on the stream

});
Run Code Online (Sandbox Code Playgroud)

2)暂停并恢复流:

readable.once('data', function (d) {

    console.log(d.toString());              // Outputs 1
    readable.pause();                       // Stops the stream from flowing
    readable.unshift(d);                    // Put the 1 back on the stream

    readable.on('data', function (d) {
        console.log(d.toString());          // Outputs 1,1,2,3...
    });

    readable.resume();                      // Start the stream flowing again

});
Run Code Online (Sandbox Code Playgroud)