如何实现可写流

Mon*_*key 55 node.js node.js-stream

我想将数据从亚马逊kinesis流传输到s3日志或bunyan日志.

该示例使用文件写入流或标准输出.我怎么会扼杀我自己的可写流?

//this works
var file = fs.createWriteStream('my.log')
kinesisSource.pipe(file)
Run Code Online (Sandbox Code Playgroud)

这不起作用说没有'on'方法

var stream = {}; //process.stdout works however
stream.writable = true;
stream.write =function(data){
    console.log(data);
};
kinesisSource.pipe(stream);
Run Code Online (Sandbox Code Playgroud)

我必须为我自己的自定义可写流实现什么方法,文档似乎表明我需要实现'write'而不是'on'

Pau*_*gel 113

要创建自己的可写流,您有三种可能性.

创建自己的类

为此,您需要1)扩展Writable类2)以在您自己的构造函数中调用Writable构造函数3)_write()在流对象的原型中定义一个方法.

这是一个例子:

var stream = require('stream');
var util = require('util');

function EchoStream () { // step 2
  stream.Writable.call(this);
};
util.inherits(EchoStream, stream.Writable); // step 1
EchoStream.prototype._write = function (chunk, encoding, done) { // step 3
  console.log(chunk.toString());
  done();
}

var myStream = new EchoStream(); // instanciate your brand new stream
process.stdin.pipe(myStream);
Run Code Online (Sandbox Code Playgroud)

扩展一个空的Writable对象

您可以实例化空Writable对象并实现该_write()方法,而不是定义新的对象类型:

var stream = require('stream');
var echoStream = new stream.Writable();
echoStream._write = function (chunk, encoding, done) {
  console.log(chunk.toString());
  done();
};

process.stdin.pipe(echoStream);
Run Code Online (Sandbox Code Playgroud)

使用Simplified Constructor API

如果您使用的是io.js,则可以使用简化的构造函数API:

var writable = new stream.Writable({
  write: function(chunk, encoding, next) {
    console.log(chunk.toString());
    next();
  }
});
Run Code Online (Sandbox Code Playgroud)

在节点4+中使用ES6类

class EchoStream extends stream.Writable {
  _write(chunk, enc, next) {
    console.log(chunk.toString());
    next();
  }
}
Run Code Online (Sandbox Code Playgroud)

  • 支持对象模式每`chunk.toString替换`chunk.toString()`?chunk.toString():chunk` (2认同)

小智 10

实际上创建可写流非常简单.这是一个例子:

var fs = require('fs');
var Stream = require('stream');

var ws = new Stream;
ws.writable = true;
ws.bytes = 0;

ws.write = function(buf) {
   ws.bytes += buf.length;
}

ws.end = function(buf) {
   if(arguments.length) ws.write(buf);
   ws.writable = false;

   console.log('bytes length: ' + ws.bytes);
}

fs.createReadStream('file path').pipe(ws);
Run Code Online (Sandbox Code Playgroud)

另外,如果你想创建自己的课程,@ Paul会给出一个很好的答案.


小智 5

这是直接来自nodejs文档的示例
https://nodejs.org/api/stream.html#an-example-writable-stream

const { Writable } = require('stream');
class MyWritable extends Writable {
  _write(chunk, encoding, callback) {
    if (chunk.toString().indexOf('a') >= 0) {
      callback(new Error('chunk is invalid'));
    } else {
      callback();
    }
  }
}
Run Code Online (Sandbox Code Playgroud)