ace*_*686 8 stream node.js node.js-stream
我一直在尝试使用可读和转换流来处理非常大的文件.我似乎遇到的问题是,如果我不在最后放置可写流,程序似乎在返回结果之前终止.
示例: rstream.pipe(split()).pipe(tstream)
我tstream的发射器在计数器达到阈值时发出.当该阈值设置为较低的数字时,我得到一个结果,但是当它达到高时,它不会返回任何结果.如果我将它传递给文件编写器,它总是返回一个结果.我错过了一些明显的东西吗
码:
// Dependencies
var fs = require('fs');
var rstream = fs.createReadStream('file');
var wstream = fs.createWriteStream('output');
var split = require('split'); // used for separating stream by new line
var QTransformStream = require('./transform');
var qtransformstream = new QTransformStream();
qtransformstream.on('completed', function(result) {
console.log('Result: ' + result);
});
exports.getQ = function getQ(filename, callback) {
// THIS WORKS if i have a low counter for qtransformstream,
// but when it's high, I do not get a result
// rstream.pipe(split()).pipe(qtransformstream);
// this always works
rstream.pipe(split()).pipe(qtransformstream).pipe(wstream);
};
Run Code Online (Sandbox Code Playgroud)
这是代码 Qtransformstream
// Dependencies
var Transform = require('stream').Transform,
util = require('util');
// Constructor, takes in the Quser as an input
var TransformStream = function(Quser) {
// Create this as a Transform Stream
Transform.call(this, {
objectMode: true
});
// Default the Qbase to 32 as an assumption
this.Qbase = 32;
if (Quser) {
this.Quser = Quser;
} else {
this.Quser = 20;
}
this.Qpass = this.Quser + this.Qbase;
this.Counter = 0;
// Variables used as intermediates
this.Qmin = 120;
this.Qmax = 0;
};
// Extend the transform object
util.inherits(TransformStream, Transform);
// The Transformation to get the Qbase and Qpass
TransformStream.prototype._transform = function(chunk, encoding, callback) {
var Qmin = this.Qmin;
var Qmax = this.Qmax;
var Qbase = this.Qbase;
var Quser = this.Quser;
this.Counter++;
// Stop the stream after 100 reads and emit the data
if (this.Counter === 100) {
this.emit('completed', this.Qbase, this.Quser);
}
// do some calcs on this.Qbase
this.push('something not important');
callback();
};
// export the object
module.exports = TransformStream;
Run Code Online (Sandbox Code Playgroud)
编辑:
此外,我不知道您的计数器有多高,但如果填满缓冲区,它将停止将数据传递给转换流,在这种情况下,completed由于您从未达到计数器限制,因此从未实际执行过.尝试改变你的highwatermark.
编辑2:一个更好的解释
众所周知,a transform stream 是双工流,基本上意味着它可以接受来自源的数据,并且可以将数据发送到目的地.这通常分别称为读写.在transform stream同时从继承read stream和write stream由Node.js的实现 但有一点需要注意,transform stream 不必实现_read或_write函数.从这个意义上说,你可以把它想象成鲜为人知的直通流.
如果您考虑transform stream实现的事实,write stream您还必须考虑写入流始终具有转储其内容的目标的事实.您遇到的问题是,在创建时,您transform stream无法指定发送内容的位置. 将数据完全传递到转换流的唯一方法是将其传输到写入流,否则,实质上您的流会被备份并且无法接受更多数据,因为没有数据存储的地方.
这就是为什么当你管道到写入流时它始终有效.写入流通过将数据发送到目的地来减轻数据备份,因此所有数据都将通过管道传输,并且将发出完成事件.
当样本量很小时,代码在没有写入流的情况下工作的原因是您没有填充流,因此转换流可以接受足够的数据以允许命中完整的事件/阈值.随着阈值的增加,您的流可以接受而不将其发送到另一个地方(写入流)的数据量保持不变.这会导致您的流备份,并且它不再接受数据,这意味着永远不会发出已完成的事件.
我冒昧地说,如果你增加你highwatermark的变换流,你将能够提高你的门槛,仍然有代码工作.但这种方法不正确.将您的流传输到写入流,该写入流将数据发送到dev/null,以创建写入流的方式:
var writer = fs.createWriteStream('/dev/null');
Run Code Online (Sandbox Code Playgroud)
Node.js文档中有关缓冲的部分解释了您遇到的错误.
| 归档时间: |
|
| 查看次数: |
1366 次 |
| 最近记录: |