什么是处理node.js变换流的背压的正确方法?

Sco*_*aad 16 javascript zlib node.js

介绍

这是我编写node.js服务器端的第一次冒险.到目前为止它一直很有趣,但是我很难理解实现与node.js流相关的正确方法.

问题

出于测试和学习的目的,我正在处理其内容为zlib压缩的大型文件.压缩内容是二进制数据,每个 数据包的长度为38个字节.我正在尝试创建一个看起来几乎与原始文件相同的结果文件,除了每1024个38字节数据包有一个未压缩的31字节头.

原始文件内容(解压缩)

+----------+----------+----------+----------+
| packet 1 | packet 2 |  ......  | packet N |
| 38 bytes | 38 bytes |  ......  | 38 bytes |
+----------+----------+----------+----------+
Run Code Online (Sandbox Code Playgroud)

结果文件内容

+----------+--------------------------------+----------+--------------------------------+
| header 1 |    1024 38 byte packets        | header 2 |    1024 38 byte packets        |
| 31 bytes |       zlib compressed          | 31 bytes |       zlib compressed          |
+----------+--------------------------------+----------+--------------------------------+
Run Code Online (Sandbox Code Playgroud)

如您所见,这有点像翻译问题.意思是,我将一些源流作为输入,然后将其稍微转换为一些输出流.因此,实现转换流是很自然的 .

该课程只是试图完成以下任务:

  1. 将流作为输入
  2. zlib对数据块进行膨胀以计算数据包的数量,将其中的1024个放在一起,zlib放气,并预先添加标头.
  3. 通过管道传递新生成的块 this.push(chunk).

用例类似于:

var fs = require('fs');
var me = require('./me'); // Where my Transform stream code sits
var inp = fs.createReadStream('depth_1000000');
var out = fs.createWriteStream('depth_1000000.out');
inp.pipe(me.createMyTranslate()).pipe(out);
Run Code Online (Sandbox Code Playgroud)

问题(S)

假设Transform是这个用例的一个很好的选择,我似乎遇到了一个可能的背压问题.我对this.push(chunk) 内部的呼唤一直在_transform回归false.为什么会这样,以及如何处理这些事情?

Mik*_*ert 12

这个 2013 年的问题是我在创建节点转换流时如何处理“背压”的全部内容。

从节点 7.10.0 Transform stream and Readable stream文档中我收集到的是,一旦push返回 false,_read在调用之前不应该推送任何其他内容。

Transform 文档没有提到,_read只是提到了基本的 Transform 类实现了它(和 _write)。我在可读流文档中找到了有关push返回 false 和_read被调用的信息。

我在 Transform back pressure 上找到的唯一其他权威评论只提到它是一个问题,这是在节点文件_stream_transform.js顶部的评论中。

这是该评论中有关背压的部分:

// This way, back-pressure is actually determined by the reading side,
// since _read has to be called to start processing a new chunk.  However,
// a pathological inflate type of transform can cause excessive buffering
// here.  For example, imagine a stream where every byte of input is
// interpreted as an integer from 0-255, and then results in that many
// bytes of output.  Writing the 4 bytes {ff,ff,ff,ff} would result in
// 1kb of data being output.  In this case, you could write a very small
// amount of input, and end up with a very large amount of output.  In
// such a pathological inflating mechanism, there'd be no way to tell
// the system to stop doing the transform.  A single 4MB write could
// cause the system to run out of memory.
//
// However, even in such a pathological case, only a single written chunk
// would be consumed, and then the rest would wait (un-transformed) until
// the results of the previous transformed chunk were consumed.
Run Code Online (Sandbox Code Playgroud)

解决方案示例

这是我拼凑起来处理 Transform 流中的背压的解决方案,我很确定它是有效的。(我没有写过任何真正的测试,这需要写一个 Writable 流来控制背压。)

这是一个基本的线变换,需要作为线变换工作,但确实演示了处理“背压”。

const stream = require('stream');

class LineTransform extends stream.Transform
{
    constructor(options)
    {
        super(options);

        this._lastLine = "";
        this._continueTransform = null;
        this._transforming = false;
        this._debugTransformCallCount = 0;
    }

    _transform(chunk, encoding, callback)
    {
        if (encoding === "buffer")
            return callback(new Error("Buffer chunks not supported"));

        if (this._continueTransform !== null)
            return callback(new Error("_transform called before previous transform has completed."));

        // DEBUG: Uncomment for debugging help to see what's going on
        //console.error(`${++this._debugTransformCallCount} _transform called:`);

        // Guard (so we don't call _continueTransform from _read while it is being
        // invoked from _transform)
        this._transforming = true;

        // Do our transforming (in this case splitting the big chunk into lines)
        let lines = (this._lastLine + chunk).split(/\r\n|\n/);
        this._lastLine = lines.pop();

        // In order to respond to "back pressure" create a function
        // that will push all of the lines stopping when push returns false,
        // and then resume where it left off when called again, only calling
        // the "callback" once all lines from this transform have been pushed.
        // Resuming (until done) will be done by _read().
        let nextLine = 0;
        this._continueTransform = () =>
            {
                let backpressure = false;
                while (nextLine < lines.length)
                {

                    if (!this.push(lines[nextLine++] + "\n"))
                    {
                        // we've got more to push, but we got backpressure so it has to wait.
                        if (backpressure)
                            return;

                        backpressure = !this.push(lines[nextLine++] + "\n");
                    }
                }

                // DEBUG: Uncomment for debugging help to see what's going on
                //console.error(`_continueTransform ${this._debugTransformCallCount} finished\n`);

                // All lines are pushed, remove this function from the LineTransform instance
                this._continueTransform = null;
                return callback();
            };

        // Start pushing the lines
        this._continueTransform();

        // Turn off guard allowing _read to continue the transform pushes if needed.
        this._transforming = false;
    }

    _flush(callback)
    {
        if (this._lastLine.length > 0)
        {
            this.push(this._lastLine);
            this._lastLine = "";
        }

        return callback();
    }

    _read(size)
    {
        // DEBUG: Uncomment for debugging help to see what's going on
        //if (this._transforming)
        //    console.error(`_read called during _transform ${this._debugTransformCallCount}`);

        // If a transform has not pushed every line yet, continue that transform
        // otherwise just let the base class implementation do its thing.
        if (!this._transforming && this._continueTransform !== null)
            this._continueTransform();
        else
            super._read(size);
    }
}
Run Code Online (Sandbox Code Playgroud)

我通过在 ~10000 行 ~200KB 文件中未注释的调试行运行它来测试上述内容。将 stdout 或 stderr 重定向到文件(或两者)以将调试语句与预期输出分开。( node test.js > out.log 2> err.log)

const fs = require('fs');
let inStrm = fs.createReadStream("testdata/largefile.txt", { encoding: "utf8" });
let lineStrm = new LineTransform({ encoding: "utf8", decodeStrings: false });
inStrm.pipe(lineStrm).pipe(process.stdout);
Run Code Online (Sandbox Code Playgroud)

有用的调试提示

最初写这篇文章时,我没有意识到_read可以 _transform返回之前调用,所以我没有实现this._transforming保护,我收到以下错误:

Error: no writecb in Transform class
    at afterTransform (_stream_transform.js:71:33)
    at TransformState.afterTransform (_stream_transform.js:54:12)
    at LineTransform._continueTransform (/userdata/mjl/Projects/personal/srt-shift/dist/textfilelines.js:44:13)
    at LineTransform._transform (/userdata/mjl/Projects/personal/srt-shift/dist/textfilelines.js:46:21)
    at LineTransform.Transform._read (_stream_transform.js:167:10)
    at LineTransform._read (/userdata/mjl/Projects/personal/srt-shift/dist/textfilelines.js:56:15)
    at LineTransform.Transform._write (_stream_transform.js:155:12)
    at doWrite (_stream_writable.js:331:12)
    at writeOrBuffer (_stream_writable.js:317:5)
    at LineTransform.Writable.write (_stream_writable.js:243:11)
Run Code Online (Sandbox Code Playgroud)

查看节点实现,我意识到这个错误意味着给定的回调_transform被调用了不止一次。也没有找到关于此错误的太多信息,所以我想我会在此处包含我发现的内容。


rob*_*lep 6

我认为Transform这适合于此,但我会将膨胀作为管道中的一个单独步骤.

这是一个快速且基本未经测试的例子:

var zlib        = require('zlib');
var stream      = require('stream');
var transformer = new stream.Transform();

// Properties used to keep internal state of transformer.
transformer._buffers    = [];
transformer._inputSize  = 0;
transformer._targetSize = 1024 * 38;

// Dump one 'output packet'
transformer._dump       = function(done) {
  // concatenate buffers and convert to binary string
  var buffer = Buffer.concat(this._buffers).toString('binary');

  // Take first 1024 packets.
  var packetBuffer = buffer.substring(0, this._targetSize);

  // Keep the rest and reset counter.
  this._buffers   = [ new Buffer(buffer.substring(this._targetSize)) ];
  this._inputSize = this._buffers[0].length;

  // output header
  this.push('HELLO WORLD');

  // output compressed packet buffer
  zlib.deflate(packetBuffer, function(err, compressed) {
    // TODO: handle `err`
    this.push(compressed);
    if (done) {
      done();
    }
  }.bind(this));
};

// Main transformer logic: buffer chunks and dump them once the
// target size has been met.
transformer._transform  = function(chunk, encoding, done) {
  this._buffers.push(chunk);
  this._inputSize += chunk.length;

  if (this._inputSize >= this._targetSize) {
    this._dump(done);
  } else {
    done();
  }
};

// Flush any remaining buffers.
transformer._flush = function() {
  this._dump();
};

// Example:
var fs = require('fs');
fs.createReadStream('depth_1000000')
  .pipe(zlib.createInflate())
  .pipe(transformer)
  .pipe(fs.createWriteStream('depth_1000000.out'));
Run Code Online (Sandbox Code Playgroud)


mak*_*aco 5

push如果要写入的流(在本例中为文件输出流)缓冲了太多数据,则返回false.由于您正在写入磁盘,因此这是有道理的:您处理数据的速度比写出来的速度快.

out缓冲区已满时,您的转换流将无法推送,并开始缓冲数据本身.如果该缓冲区应该填充,那么inp将开始填充.这就是事情应该如何运作.管道流只会像链中最慢的链接一样快地处理数据(一旦缓冲区已满).


小智 5

最近遇到了类似的问题,需要处理膨胀转换流中的背压 - 处理返回 false 的秘密push()是注册并处理'drain'流上的事件

_transform(data, enc, callback) {
  const continueTransforming = () => {
    // ... do some work / parse the data, keep state of where we're at etc
    if(!this.push(event)) 
         this._readableState.pipes.once('drain', continueTransforming); // will get called again when the reader can consume more data
    if(allDone)
       callback();
  }
  continueTransforming()
}
Run Code Online (Sandbox Code Playgroud)

注意,这有点 hacky,因为我们正在深入内部,pipes甚至可以是 s 的数组Readable,但它确实可以在常见情况下工作....pipe(transform).pipe(...

如果来自 Node 社区的人可以建议一个“正确”的方法来处理.push()返回 false,那就太好了