Bra*_*rad 21 javascript stream node.js
我正在尝试使用新的Node.js流API实现一个流,它将缓冲一定数量的数据.当此流通过管道传输到另一个流时,或者某些内容消耗了readable事件时,此流应刷新其缓冲区,然后简单地成为传递.问题是,此流将通过管道传输到许多其他流,并且当连接每个目标流时,即使缓冲区已经刷新到另一个流,也必须刷新缓冲区.
例如:
BufferStream实现stream.Transform,并保留512KB内部环缓冲区ReadableStreamA 通过管道输送到一个实例 BufferStreamBufferStream写入其环形缓冲区,从中读取数据ReadableStreamA.(数据丢失无关紧要,因为缓冲区会覆盖旧数据.)BufferStream 是通过管道输送的 WritableStreamBWritableStreamB接收整个512KB缓冲区,并在从中ReadableStreamA通过写入时继续获取数据BufferStream.BufferStream 是通过管道输送的 WritableStreamCWritableStreamC还接收整个512KB缓冲区,但此缓冲区现在与WritableStreamB收到的缓冲区不同,因为此后已写入更多数据BufferStream.流API是否可以实现?我能想到的唯一方法是创建一个带有方法的对象,该方法为每个目标创建一个新的PassThrough流,这意味着我不能简单地管道输入和输出.
对于它的价值,我通过简单地监听data事件的新处理程序,使用旧的"流动"API完成了这项工作.当附加一个新函数时.on('data'),我会直接使用环形缓冲区的副本来调用它.
这是我对你的问题的看法.
基本思想是创建一个Transform流,这将允许我们在流输出上发送数据之前执行自定义缓冲逻辑:
var util = require('util')
var stream = require('stream')
var BufferStream = function (streamOptions) {
stream.Transform.call(this, streamOptions)
this.buffer = new Buffer('')
}
util.inherits(BufferStream, stream.Transform)
BufferStream.prototype._transform = function (chunk, encoding, done) {
// custom buffering logic
// ie. add chunk to this.buffer, check buffer size, etc.
this.buffer = new Buffer(chunk)
this.push(chunk)
done()
}
Run Code Online (Sandbox Code Playgroud)
然后,我们需要覆盖该.pipe()方法,以便在通过BufferStream管道输入流时通知我们,这允许我们自动向其写入数据:
BufferStream.prototype.pipe = function (destination, options) {
var res = BufferStream.super_.prototype.pipe.call(this, destination, options)
res.write(this.buffer)
return res
}
Run Code Online (Sandbox Code Playgroud)
这样,当我们写入时buffer.pipe(someStream),我们按预期执行管道并将内部缓冲区写入输出流.在那之后,Transform班级负责处理所有事情,同时跟踪背压和诸如此类的事情.
这是一个工作要点.请注意,我没有打扰写一个正确的缓冲逻辑(即我不关心内部缓冲区的大小),但这应该很容易修复.
| 归档时间: |
|
| 查看次数: |
2534 次 |
| 最近记录: |