ava*_*sin 15 stream node.js backpressure
我正在写一个模块,它是一个可写的流.我想为我的用户实现管道接口.
如果发生某些错误,我需要暂停可读流并发出错误事件.然后,用户将决定 - 如果他没有错误,他应该能够恢复数据处理.
var writeable = new BackPressureStream();
writeable.on('error', function(error){
console.log(error);
writeable.resume();
});
var readable = require('fs').createReadStream('somefile.txt');
readable.pipe.(writeable);
Run Code Online (Sandbox Code Playgroud)
我看到该节点为我们提供readable.pause()了可用于暂停可读流的方法.但我无法得到如何从我的可写流模块中调用它:
var Writable = require('stream').Writable;
function BackPressureStream(options) {
Writable.call(this, options);
}
require('util').inherits(BackPressureStream, Writable);
BackPressureStream.prototype._write = function(chunk, encoding, done) {
done();
};
BackPressureStream.prototype.resume = function() {
this.emit('drain');
}
Run Code Online (Sandbox Code Playgroud)
如何在可写流中实现背压?
PS可以使用pipe/unpipe提供可读流作为参数的事件.但也有人说,对于管道流,暂停的唯一机会是从可写入中删除可读流.
我做对了吗?我必须取消管理可写流,直到用户呼叫恢复为止?用户调用恢复后,我应该管道可读流回来?
基本上,据我了解,您希望在发生错误事件时对流施加背压。你有几个选择。
首先,正如您已经确定的那样,用于pipe获取读取流的实例并执行一些奇特的步法。
另一种选择是创建一个提供此功能的包装可写流(即,它采用 aWritableStream作为输入,并且在实现流函数时,将数据传递到提供的流。
基本上你最终会得到类似的东西
source stream -> wrapping writable -> writable
https://nodejs.org/api/stream.html#stream_implementing_a_writable_stream涉及实现可写流。
对您来说关键是,如果底层可写中发生错误,您将在流上设置一个标志,并且下一次调用发生时write,您将缓冲该块,存储回调并仅调用。就像是
// ...
constructor(wrappedWritableStream) {
wrappedWritableStream.on('error', this.errorHandler);
this.wrappedWritableStream = wrappedWritableStream;
}
// ...
write(chunk, encoding, callback) {
if (this.hadError) {
// Note: until callback is called, this function won't be called again, so we will have maximum one stored
// chunk.
this.bufferedChunk = [chunk, encoding, callback];
} else {
wrappedWritableStream.write(chunk, encoding, callback);
}
}
// ...
errorHandler(err) {
console.error(err);
this.hadError = err;
this.emit(err);
}
// ...
recoverFromError() {
if (this.bufferedChunk) {
wrappedWritableStream.write(...this.bufferedChunk);
this.bufferedChunk = undefined;
}
this.hadError = false;
}
Run Code Online (Sandbox Code Playgroud)
注意:您应该只需要实现该write功能,但我鼓励您深入研究并尝试其他实现功能。
还值得注意的是,您在写入已发出错误事件的流时可能会遇到一些问题,但我将把它作为一个单独的问题留给您解决。
这是关于反压的另一个很好的资源 https://nodejs.org/en/docs/guides/backpressuring-in-streams/
| 归档时间: |
|
| 查看次数: |
1946 次 |
| 最近记录: |