如何在rxjs中使用节点的变换流?

shr*_*iek 6 stream node.js rxjs

我已经玩rxjs了一段时间了,我喜欢如何使用它的逻辑运算符而不是命令式编程.但是,我也喜欢节点的流,它也是高度可组合的,所以我明显的反应就是使用它们但我还没有看到它被提到很多(实际上,我根本没有)除了在rxjs的书中绑定它.

所以,我的问题是,如何利用RxJS上npm中的所有变换流?或者,它甚至可能吗?
例:-

var fs = require('fs');
var csv = require('csv-parse')({delimiter:';'});
var src = fs.createReadStream('./myFile.csv');
src.pipe(csv).pipe(process.stdout);
Run Code Online (Sandbox Code Playgroud)

基本上,我想这样做: -

var fs = require('fs');
var csv = require('csv-parse')({delimiter:';'});
var rx= require('rx-node');
var src = fs.createReadStream('./myFile.csv');

var obj = rx.fromReadableStream(src);
obj.pipe(csb).map(x=>console.log(x));
Run Code Online (Sandbox Code Playgroud)

我被告知highland过去使用但我在rxjs这里严格寻找解决方案.

edi*_*n-m 5

您不必使用,rx-node但可以!记住:All streams are event emitters!

准备: input.txt

Hello World!
Hello World!
Hello World!
Hello World!
Hello World!
Run Code Online (Sandbox Code Playgroud)

跑步:

npm install through2 split2 rx rx-node
Run Code Online (Sandbox Code Playgroud)

并在index.js

var Rx = require('rx');
Rx.Node = require('rx-node');
var fs = require('fs');
var th2 = require('through2');
var split2 = require('split2');

var file = fs.createReadStream('./input.txt').on('error', console.log.bind(console, 'fs err'));

var transform = th2(function(ch, en, cb) {
  cb(null, ch.toString());
}).on('error', function(err) {
  console.log(err, err.toString());
});

// All streams are event emitters ! (one way without using rx-node)
// var subs = Rx.Observable.fromEvent(transform, 'data').share();
// subs
//   .map(value => 'Begin line: ' + value)
//   .subscribe(value => console.log(value));

// rx-node has convenience functions (another way)
var subs = Rx.Node.fromTransformStream(transform).share()
  .map(value => 'Begin line: ' + value)
  .subscribe(value => console.log(value));

file.pipe(split2()).pipe(transform);
Run Code Online (Sandbox Code Playgroud)

输出:

Begin line: Hello World!
Begin line: Hello World!
Begin line: Hello World!
Begin line: Hello World!
Begin line: Hello World!
Run Code Online (Sandbox Code Playgroud)