如何在Stream.js中使用stream.Writable的drain事件

sac*_*hin 12 javascript stream node.js

在Node.js中,我使用该fs.createWriteStream方法将数据附加到本地文件.在Node文档中,他们drain在使用时提到了事件fs.createWriteStream,但我不明白.

var stream = fs.createWriteStream('fileName.txt');
var result = stream.write(data);
Run Code Online (Sandbox Code Playgroud)

在上面的代码中,我如何使用drain事件?事件在下面正确使用了吗?

var data = 'this is my data';
if (!streamExists) {
  var stream = fs.createWriteStream('fileName.txt');
}

var result = stream.write(data);
if (!result) {
  stream.once('drain', function() {
    stream.write(data);
  });
}
Run Code Online (Sandbox Code Playgroud)

hex*_*ide 24

drain事件用于何时清空可写流的内部缓冲区.

这只能在内部缓冲区的大小超过其highWaterMark属性时发生,该属性是可写入流的内部缓冲区内可以存储的数据的最大字节数,直到它停止从数据源读取为止.

造成这种情况的原因可能是由于设置涉及从一个流中读取数据源的速度比可以写入另一个资源的速度快.例如,采取两个流:

var fs = require('fs');

var read = fs.createReadStream('./read');
var write = fs.createWriteStream('./write');
Run Code Online (Sandbox Code Playgroud)

现在假设该文件read位于SSD上,并且可以以500MB/s的速度读取,并且write位于只能写入的HDD上150MB/s.写入流将无法跟上,并将开始将数据存储在内部缓冲区中.一旦缓冲区达到highWaterMark(默认为16KB),写入将开始返回false,并且流将在内部排队.一旦内部缓冲区的长度为0,则drain触发该事件.

这是排水工作的方式:

if (state.length === 0 && state.needDrain) {
  state.needDrain = false;
  stream.emit('drain');
}
Run Code Online (Sandbox Code Playgroud)

这些是排水的先决条件,它是writeOrBuffer功能的一部分:

var ret = state.length < state.highWaterMark;
state.needDrain = !ret;
Run Code Online (Sandbox Code Playgroud)

要查看drain事件的使用方式,请参阅Node.js文档中的示例.

function writeOneMillionTimes(writer, data, encoding, callback) {
  var i = 1000000;
  write();
  function write() {
    var ok = true;
    do {
      i -= 1;
      if (i === 0) {
        // last time!
        writer.write(data, encoding, callback);
      } else {
        // see if we should continue, or wait
        // don't pass the callback, because we're not done yet.
        ok = writer.write(data, encoding);
      }
    } while (i > 0 && ok);
    if (i > 0) {
      // had to stop early!
      // write some more once it drains
      writer.once('drain', write);
    }
  }
}
Run Code Online (Sandbox Code Playgroud)

该函数的目标是将1,000,000次写入可写流.发生的变化ok是变量设置为true,循环仅在oktrue 时执行.对于每个循环迭代,将值ok设置为值stream.write(),如果drain需要,则返回false .如果ok变为false,那么drain等待的事件处理程序将立即恢复写入.


Regarding your code specifically, you don't need to use the drain event because you are writing only once right after opening your stream. Since you have not yet written anything to the stream, the internal buffer is empty, and you would have to be writing at least 16KB in chunks in order for the drain event to fire. The drain event is for writing many times with more data than the highWaterMark setting of your writable stream.


Lau*_*rin 8

想象一下,您正在连接具有不同带宽的2个流,例如,将本地文件上载到慢速服务器.(快速)文件流将比(慢)套接字流消耗它更快地发出数据.

在这种情况下,node.js会将数据保存在内存中,直到慢流有机会处理它.如果文件非常大,这可能会出现问题.

为避免这种情况,请在底层系统缓冲区已满时Stream.write返回false.如果停止写入,则流稍后将发出一个drain事件以指示系统缓冲区已清空,并且再次写入是合适的.

您可以使用pause/resume可读流并控制可读流的带宽.

更好:你可以使用readable.pipe(writable)哪个会为你做这件事.

编辑:您的代码中存在一个错误:无论write返回什么,您的数据都已写入.您无需重试它.在你的情况下,你写了data两次.

像这样的东西会起作用:

var packets = […],
    current = -1;

function niceWrite() {
  current += 1;

  if (current === packets.length)
    return stream.end();

  var nextPacket = packets[current],
      canContinue = stream.write(nextPacket);

  // wait until stream drains to continue
  if (!canContinue)
    stream.once('drain', niceWrite);
  else
    niceWrite();
}
Run Code Online (Sandbox Code Playgroud)


Ste*_*par 6

这是带有 async/await 的版本

const write = (writer, data) => {
  return new Promise((resolve) => {
    if (!writer.write(data)) {
      writer.once('drain', resolve)
    }
    else {
      resolve()
    }
  })
}

// usage
const run = async () => {
  const write_stream = fs.createWriteStream('...')
  const max = 1000000
  let current = 0
  while (current <= max) {
    await write(write_stream, current++)
  }
}
Run Code Online (Sandbox Code Playgroud)

https://gist.github.com/stevenkaspar/509f792cbf1194f9fb05e7d60a1fbc73