Mon*_*key 55 node.js node.js-stream
我想将数据从亚马逊kinesis流传输到s3日志或bunyan日志.
该示例使用文件写入流或标准输出.我怎么会扼杀我自己的可写流?
//this works
var file = fs.createWriteStream('my.log')
kinesisSource.pipe(file)
Run Code Online (Sandbox Code Playgroud)
这不起作用说没有'on'方法
var stream = {}; //process.stdout works however
stream.writable = true;
stream.write =function(data){
console.log(data);
};
kinesisSource.pipe(stream);
Run Code Online (Sandbox Code Playgroud)
我必须为我自己的自定义可写流实现什么方法,文档似乎表明我需要实现'write'而不是'on'
Pau*_*gel 113
要创建自己的可写流,您有三种可能性.
为此,您需要1)扩展Writable类2)以在您自己的构造函数中调用Writable构造函数3)_write()在流对象的原型中定义一个方法.
这是一个例子:
var stream = require('stream');
var util = require('util');
function EchoStream () { // step 2
stream.Writable.call(this);
};
util.inherits(EchoStream, stream.Writable); // step 1
EchoStream.prototype._write = function (chunk, encoding, done) { // step 3
console.log(chunk.toString());
done();
}
var myStream = new EchoStream(); // instanciate your brand new stream
process.stdin.pipe(myStream);
Run Code Online (Sandbox Code Playgroud)
您可以实例化空Writable对象并实现该_write()方法,而不是定义新的对象类型:
var stream = require('stream');
var echoStream = new stream.Writable();
echoStream._write = function (chunk, encoding, done) {
console.log(chunk.toString());
done();
};
process.stdin.pipe(echoStream);
Run Code Online (Sandbox Code Playgroud)
如果您使用的是io.js,则可以使用简化的构造函数API:
var writable = new stream.Writable({
write: function(chunk, encoding, next) {
console.log(chunk.toString());
next();
}
});
Run Code Online (Sandbox Code Playgroud)
class EchoStream extends stream.Writable {
_write(chunk, enc, next) {
console.log(chunk.toString());
next();
}
}
Run Code Online (Sandbox Code Playgroud)
小智 10
实际上创建可写流非常简单.这是一个例子:
var fs = require('fs');
var Stream = require('stream');
var ws = new Stream;
ws.writable = true;
ws.bytes = 0;
ws.write = function(buf) {
ws.bytes += buf.length;
}
ws.end = function(buf) {
if(arguments.length) ws.write(buf);
ws.writable = false;
console.log('bytes length: ' + ws.bytes);
}
fs.createReadStream('file path').pipe(ws);
Run Code Online (Sandbox Code Playgroud)
另外,如果你想创建自己的课程,@ Paul会给出一个很好的答案.
小智 5
这是直接来自nodejs文档的示例
https://nodejs.org/api/stream.html#an-example-writable-stream
const { Writable } = require('stream');
class MyWritable extends Writable {
_write(chunk, encoding, callback) {
if (chunk.toString().indexOf('a') >= 0) {
callback(new Error('chunk is invalid'));
} else {
callback();
}
}
}
Run Code Online (Sandbox Code Playgroud)