取消管道streams2管道并清空它的正确方法(不仅仅是刷新)

zam*_*uts 13 pipeline pipe node.js node.js-stream

前提

我正在尝试找到在Node.js中过早终止一系列管道流(管道)的正确方法:有时我想在流完成之前优雅地中止流.具体来说,我正在处理大部分objectMode: true和非原生的并行流,但这并不重要.

问题

问题是当我unpipe在管道中时,数据保留在每个流的缓冲区中并被drain编辑.对于大多数中间流(例如/ ),这可能没问题,但是最后一个仍然流向其写目标(例如文件或数据库或套接字或w/e).如果缓冲区包含数百或数千个块需要花费大量时间来消耗,则这可能是有问题的.我希望它立即停止,即不要排水; 为什么浪费周期和内存对数据无关紧要?ReadableTransformWritable

根据我去的路线,我收到"写后结束"错误,或者当流找不到现有管道时发生异常.

什么是优雅地杀死表单中的流管道的正确方法a.pipe(b).pipe(c).pipe(z)

解?

我提出的解决方案是3步:

  1. unpipe 管道中的每个流以相反的顺序排列
  2. 清空实现的每个流的缓冲区 Writable
  3. end 每个实现的流 Writable

一些伪代码说明了整个过程:

var pipeline = [ // define the pipeline
  readStream,
  transformStream0,
  transformStream1,
  writeStream
];

// build and start the pipeline
var tmpBuildStream;
pipeline.forEach(function(stream) {
    if ( !tmpBuildStream ) {
        tmpBuildStream = stream;
        continue;
    }
    tmpBuildStream = lastStream.pipe(stream);
});

// sleep, timeout, event, etc...

// tear down the pipeline
var tmpTearStream;
pipeline.slice(0).reverse().forEach(function(stream) {
    if ( !tmpTearStream ) {
        tmpTearStream = stream;
        continue;
    }
    tmpTearStream = stream.unpipe(tmpTearStream);
});

// empty and end the pipeline
pipeline.forEach(function(stream) {
  if ( typeof stream._writableState === 'object' ) { // empty
    stream._writableState.length -= stream._writableState.buffer.length;
    stream._writableState.buffer = [];
  }
  if ( typeof stream.end === 'function' ) { // kill
    stream.end();
  }
});
Run Code Online (Sandbox Code Playgroud)

我真的很担心stream._writableState内部bufferlength属性的使用和修改(_表示私有财产).这似乎是一个黑客.还要注意,由于我是管道,像pauseresume我们出了问题(根据我从IRC收到的建议).

我还整理了一个可以从github获取的可运行版本(非常草率):https://github.com/zamnuts/multipipe-proto(git clone,npm install,view readme,npm start)

Pio*_*ido 2

\n

在这种特殊情况下,我认为我们应该摆脱具有 4 个不同的未完全自定义流的结构。将它们连接在一起会产生链依赖关系,如果我们没有实现自己的机制,这种依赖关系将很难控制。

\n
\n\n

我想在这里重点关注您的实际目标:

\n\n
 INPUT >----[read] \xe2\x86\x92 [transform0] \xe2\x86\x92 [transform1] \xe2\x86\x92 [write]-----> OUTPUT\n               |          |              |            |\n KILL_ALL------o----------o--------------o------------o--------[nothing to drain]\n
Run Code Online (Sandbox Code Playgroud)\n\n

我相信上述结构可以通过组合自定义来实现:

\n\n
    \n
  1. duplex stream - 供自己_write(chunk, encoding, cb)_read(bytes)实施

  2. \n
  3. transform stream- 供自己_transform(chunk, encoding, cb)实施。

  4. \n
\n\n
\n

由于您正在使用该writable-stream-parallel包,您可能还想查看它们的库,因为它们的duplex实现可以在这里找到: https: //github.com/Clever/writable-stream-parallel/blob/master/lib/duplex.js。\n 他们的transform stream实现在这里:https://github.com/Clever/writable-stream-parallel/blob/master/lib/transform.js。他们在这里处理highWaterMark

\n
\n\n

可能的解决方案

\n\n

他们的写入流:https://github.com/Clever/writable-stream-parallel/blob/master/lib/writable.js#L189有一个有趣的功能writeOrBuffer,我想你也许可以稍微调整一下它来中断写入来自缓冲区的数据。

\n\n

注意:这3 个标志控制缓冲区清除:

\n\n
( !finished && !state.bufferProcessing && state.buffer.length )\n
Run Code Online (Sandbox Code Playgroud)\n\n
\n\n

参考:

\n\n\n