apx*_*pxp 6 javascript asynchronous stream node.js
这是自定义可读流的实现的简短示例。该类称为MyStream。流从目录中获取文件/文件夹名称,并将值推送到数据事件。
为了比较,我实现了(在此示例中)两种不同的方式/功能。一个是同步的,另一个是异步的。构造函数的第二个参数让您决定使用哪种方式(对于异步,为true,对于同步为false。
readcounter计数调用方法_read的次数。仅提供反馈。
var Readable = require('stream').Readable;
var util = require('util');
var fs = require('fs');
util.inherits(MyStream, Readable);
function MyStream(dirpath, async, opt) {
Readable.call(this, opt);
this.async = async;
this.dirpath = dirpath;
this.counter = 0;
this.readcounter = 0;
}
MyStream.prototype._read = function() {
this.readcounter++;
if (this.async === true){
console.log("Readcounter: " + this.readcounter);
that = this;
fs.readdir(this.dirpath,function(err, files){
that.counter ++;
console.log("Counter: " + that.counter);
for (var i = 0; i < files.length; i++){
that.push(files[i]);
}
that.push(null);
});
} else {
console.log("Readcounter: " + this.readcounter);
files = fs.readdirSync(this.dirpath)
for (var i = 0; i < files.length; i++){
this.push(files[i]);
};
this.push(null);
}
};
//Instance for a asynchronous call
mystream = new MyStream('C:\\Users', true);
mystream.on('data', function(chunk){
console.log(chunk.toString());
});
Run Code Online (Sandbox Code Playgroud)
同步方式的工作方式与预期的一样,但是当我异步调用它时,正在发生一些有趣的事情。每次通过that.push(files[i])
_read方法推送文件名时,都会再次调用。当第一个异步循环完成并that.push(null)
定义流的末尾时,这会导致错误。
我正在测试的环境:节点4.1.1,电子0.35.2。
我不明白为什么_read这样被称为,为什么会这样。也许是一个错误?还是有我目前看不到的东西。有没有一种方法可以通过使用异步函数来构建可读流?异步推送数据块真的很酷,因为这是非阻塞流方式。特别是当您有大量数据时。
_read
每当“读者”需要数据时就会调用,并且通常在推送数据后发生。
我在直接实现时遇到了同样的“问题”,_read
所以现在,我编写了一个返回流对象的函数。它工作得很好,数据不能从我的流中“拉出”,当我决定时数据是可用/推送的。以你的例子,我会这样做:
var Readable = require('stream').Readable;
var fs = require('fs');
function MyStream(dirpath, async, opt) {
var rs = new Readable();
// needed to avoid "Not implemented" exception
rs._read = function() {
// console.log('give me data!'); // << this will print after every console.log(folder);
};
var counter = 0;
var readcounter = 0;
if (async) {
console.log("Readcounter: " + readcounter);
fs.readdir(dirpath, function (err, files) {
counter++;
console.log("Counter: " + counter);
for (var i = 0; i < files.length; i++) {
rs.push(files[i]);
}
rs.push(null);
});
} else {
console.log("Readcounter: " + readcounter);
files = fs.readdirSync(dirpath)
for (var i = 0; i < files.length; i++) {
rs.push(files[i]);
};
rs.push(null);
}
return rs;
}
var mystream = MyStream('C:\\Users', true);
mystream.on('data', function (chunk) {
console.log(chunk.toString());
});
Run Code Online (Sandbox Code Playgroud)
它不会直接回答您的问题,但它是获取工作代码的一种方法。