NodeJS流和过早结束

alp*_*ogg 3 events stream event-handling node.js

假设NodeJS中的可读流和on('data', ...)与之关联的Data()事件处理程序相对较慢,是否有可能在最后一个数据处理程序完成之前触发End事件,如果是,它是否会过早地终止处理程序?或者,是否会调度并运行所有数据事件?

在我的情况下,我正在处理大文件,并希望每个数据块都提交到数据库.我担心如果在处理程序中的最后一次数据库调用实际完成之前触发了End,我可能会丢失最后一条或两条(或更多条).

Krz*_*pka 7

最后'数据'事件后事件'结束'开火.但它可能会在最后一个数据处理程序完成之前发生.在一个'数据'处理程序完成之前,可能会启动next.这取决于你的代码中有什么,但事后"数据"的后续调用有可能在之前完成.它可能会导致代码中的错误和问题.

示例如何导致问题(对您自己的测试):

  var fs = require('fs');
  var rr = fs.createReadStream('somebigfile.jpg');
  var i=0;
  rr.on('data', function(chunk) {
    i++;
    var s = i;
    console.log('readable:' + s);
    setTimeout(function(){
      console.log('timeout:'+s);
    }, 50-i*10);
  });
  rr.on('end', function() {
    console.log('end');
  });
Run Code Online (Sandbox Code Playgroud)

当启动每个'data'事件处理程序时,它将在您的控制台中打印.完成后几毫秒.完成可能是不同的顺序.

解:

可读流有两种模式"流动模式"和"暂停模式".添加"数据"事件处理程序时,会自动将可读流设置为流动模式.

来自文档:

在流动模式下,数据从底层系统读取并尽快提供给您的程序

在此模式下,事件不会等待您的慢动作完成.对于您的需求是'暂停模式'.

来自文档:

在暂停模式下,您必须显式调用stream.read()以获取数据块.流以暂停模式开始.

换句话说:你需要数据块,你得到它,你使用它,当你准备好时,你要求新的数据块.在此模式下,您可以控制何时获取数据.

如何更改为"暂停模式":

它是此流的默认模式.但是当您注册"数据"事件处理程序时,它会切换到"流动模式".因此,不要使用readstream.on('data',...) 而是readstream.on('readable', function(){...})在它触发时使用,这意味着流已准备好提供大量数据.获取大量数据使用var chunk = readstream.read();

来自docs的示例:

var fs = require('fs');
var rr = fs.createReadStream('foo.txt');
rr.on('readable', function() {
  console.log('readable:', rr.read());
});
rr.on('end', function() {
  console.log('end');
});
Run Code Online (Sandbox Code Playgroud)

请阅读文档以获取更多详细信息,因为当流自动切换到"流动模式"时有更多可能性.

使用慢速处理程序和流动模式:

如果您想要/需要在"流动模式"下工作,还有解决方案.您可以暂停和恢复流.当你获得chunk形式readstream('data'),暂停流,当你完成工作然后恢复它.

文档示例:

var readable = getReadableStreamSomehow();
readable.on('data', function(chunk) {
  console.log('got %d bytes of data', chunk.length);
  readable.pause();
  console.log('there will be no more data for 1 second');
  setTimeout(function() {
    console.log('now data will start flowing again');
    readable.resume();
  }, 1000);
});
Run Code Online (Sandbox Code Playgroud)