zam*_*uts 13 pipeline pipe node.js node.js-stream
我正在尝试找到在Node.js中过早终止一系列管道流(管道)的正确方法:有时我想在流完成之前优雅地中止流.具体来说,我正在处理大部分objectMode: true
和非原生的并行流,但这并不重要.
问题是当我unpipe
在管道中时,数据保留在每个流的缓冲区中并被drain
编辑.对于大多数中间流(例如/ ),这可能没问题,但是最后一个仍然流向其写目标(例如文件或数据库或套接字或w/e).如果缓冲区包含数百或数千个块需要花费大量时间来消耗,则这可能是有问题的.我希望它立即停止,即不要排水; 为什么浪费周期和内存对数据无关紧要?Readable
Transform
Writable
根据我去的路线,我收到"写后结束"错误,或者当流找不到现有管道时发生异常.
什么是优雅地杀死表单中的流管道的正确方法a.pipe(b).pipe(c).pipe(z)
?
我提出的解决方案是3步:
unpipe
管道中的每个流以相反的顺序排列Writable
end
每个实现的流 Writable
一些伪代码说明了整个过程:
var pipeline = [ // define the pipeline
readStream,
transformStream0,
transformStream1,
writeStream
];
// build and start the pipeline
var tmpBuildStream;
pipeline.forEach(function(stream) {
if ( !tmpBuildStream ) {
tmpBuildStream = stream;
continue;
}
tmpBuildStream = lastStream.pipe(stream);
});
// sleep, timeout, event, etc...
// tear down the pipeline
var tmpTearStream;
pipeline.slice(0).reverse().forEach(function(stream) {
if ( !tmpTearStream ) {
tmpTearStream = stream;
continue;
}
tmpTearStream = stream.unpipe(tmpTearStream);
});
// empty and end the pipeline
pipeline.forEach(function(stream) {
if ( typeof stream._writableState === 'object' ) { // empty
stream._writableState.length -= stream._writableState.buffer.length;
stream._writableState.buffer = [];
}
if ( typeof stream.end === 'function' ) { // kill
stream.end();
}
});
Run Code Online (Sandbox Code Playgroud)
我真的很担心stream._writableState
内部buffer
和length
属性的使用和修改(_
表示私有财产).这似乎是一个黑客.还要注意,由于我是管道,像pause
和resume
我们出了问题(根据我从IRC收到的建议).
我还整理了一个可以从github获取的可运行版本(非常草率):https://github.com/zamnuts/multipipe-proto(git clone,npm install,view readme,npm start)
\n\n\n在这种特殊情况下,我认为我们应该摆脱具有 4 个不同的未完全自定义流的结构。将它们连接在一起会产生链依赖关系,如果我们没有实现自己的机制,这种依赖关系将很难控制。
\n
我想在这里重点关注您的实际目标:
\n\n INPUT >----[read] \xe2\x86\x92 [transform0] \xe2\x86\x92 [transform1] \xe2\x86\x92 [write]-----> OUTPUT\n | | | |\n KILL_ALL------o----------o--------------o------------o--------[nothing to drain]\n
Run Code Online (Sandbox Code Playgroud)\n\n我相信上述结构可以通过组合自定义来实现:
\n\nduplex stream
- 供自己_write(chunk, encoding, cb)
和_read(bytes)
实施
transform stream
- 供自己_transform(chunk, encoding, cb)
实施。
\n\n\n由于您正在使用该
\nwritable-stream-parallel
包,您可能还想查看它们的库,因为它们的duplex
实现可以在这里找到: https: //github.com/Clever/writable-stream-parallel/blob/master/lib/duplex.js。\n 他们的transform stream
实现在这里:https://github.com/Clever/writable-stream-parallel/blob/master/lib/transform.js。他们在这里处理highWaterMark。
可能的解决方案
\n\n他们的写入流:https://github.com/Clever/writable-stream-parallel/blob/master/lib/writable.js#L189有一个有趣的功能writeOrBuffer
,我想你也许可以稍微调整一下它来中断写入来自缓冲区的数据。
注意:这3 个标志控制缓冲区清除:
\n\n( !finished && !state.bufferProcessing && state.buffer.length )\n
Run Code Online (Sandbox Code Playgroud)\n\n参考:
\n\n\n 归档时间: |
|
查看次数: |
1229 次 |
最近记录: |