在nodejs中从可写入的流中暂停管道可读流的正确方法是什么?

ava*_*sin 15 stream node.js backpressure

我正在写一个模块,它是一个可写的流.我想为我的用户实现管道接口.

如果发生某些错误,我需要暂停可读流并发出错误事件.然后,用户将决定 - 如果他没有错误,他应该能够恢复数据处理.

var writeable = new BackPressureStream();
writeable.on('error', function(error){
    console.log(error);
    writeable.resume();
});

var readable = require('fs').createReadStream('somefile.txt');
readable.pipe.(writeable);
Run Code Online (Sandbox Code Playgroud)

我看到该节点为我们提供readable.pause()了可用于暂停可读流的方法.但我无法得到如何从我的可写流模块中调用它:

var Writable = require('stream').Writable;

function BackPressureStream(options) {
    Writable.call(this, options);
}
require('util').inherits(BackPressureStream, Writable);

BackPressureStream.prototype._write = function(chunk, encoding, done) {
    done();
};

BackPressureStream.prototype.resume = function() {
    this.emit('drain');
}
Run Code Online (Sandbox Code Playgroud)

如何在可写流中实现背压?

PS可以使用pipe/unpipe提供可读流作为参数的事件.但也有人说,对于管道流,暂停的唯一机会是从可写入中删除可读流.

我做对了吗?我必须取消管理可写流,直到用户呼叫恢复为止?用户调用恢复后,我应该管道可读流回来?

maj*_*ann 0

基本上,据我了解,您希望在发生错误事件时对流施加背压。你有几个选择。

首先,正如您已经确定的那样,用于pipe获取读取流的实例并执行一些奇特的步法。

另一种选择是创建一个提供此功能的包装可写流(即,它采用 aWritableStream作为输入,并且在实现流函数时,将数据传递到提供的流。

基本上你最终会得到类似的东西

source stream -> wrapping writable -> writable

https://nodejs.org/api/stream.html#stream_implementing_a_writable_stream涉及实现可写流。

对您来说关键是,如果底层可写中发生错误,您将在流上设置一个标志,并且下一次调用发生时write,您将缓冲该块,存储回调并仅调用。就像是

// ...
constructor(wrappedWritableStream) {
    wrappedWritableStream.on('error', this.errorHandler);
    this.wrappedWritableStream = wrappedWritableStream;
}
// ...
write(chunk, encoding, callback) {
    if (this.hadError) {
        // Note: until callback is called, this function won't be called again, so we will have maximum one stored
        //  chunk.
        this.bufferedChunk = [chunk, encoding, callback];
    } else {
        wrappedWritableStream.write(chunk, encoding, callback);
    }
}
// ...
errorHandler(err) {
    console.error(err);
    this.hadError = err;
    this.emit(err);
}
// ...
recoverFromError() {
    if (this.bufferedChunk) {
        wrappedWritableStream.write(...this.bufferedChunk);
        this.bufferedChunk = undefined;
    }
    this.hadError = false;
}
Run Code Online (Sandbox Code Playgroud)

注意:您应该只需要实现该write功能,但我鼓励您深入研究并尝试其他实现功能。

还值得注意的是,您在写入已发出错误事件的流时可能会遇到一些问题,但我将把它作为一个单独的问题留给您解决。

这是关于反压的另一个很好的资源 https://nodejs.org/en/docs/guides/backpressuring-in-streams/