如何将节点可读流转换为RX observable

Jua*_*edo 6 javascript node.js rxjs rxjs5

如果我有一个Node js流,比如来自类似process.stdin或来自的东西fs.createReadStream,我如何使用RxJs5将其转换为RxJs Observable流?

我看到RxJs-Node有一个fromReadableStream方法,但看起来它在近一年内没有更新.

Que*_*Roy 12

对于任何寻找这个的人,按照Mark的建议,fromStream为rxjs5 改编了rx-node 实现.

import { Observable } from 'rxjs';

// Adapted from https://github.com/Reactive-Extensions/rx-node/blob/87589c07be626c32c842bdafa782fca5924e749c/index.js#L52
export default function fromStream(stream, finishEventName = 'end', dataEventName = 'data') {
  stream.pause();

  return new Observable((observer) => {
    function dataHandler(data) {
      observer.next(data);
    }

    function errorHandler(err) {
      observer.error(err);
    }

    function endHandler() {
      observer.complete();
    }

    stream.addListener(dataEventName, dataHandler);
    stream.addListener('error', errorHandler);
    stream.addListener(finishEventName, endHandler);

    stream.resume();

    return () => {
      stream.removeListener(dataEventName, dataHandler);
      stream.removeListener('error', errorHandler);
      stream.removeListener(finishEventName, endHandler);
    };
  }).share();
}
Run Code Online (Sandbox Code Playgroud)

请注意,它本质上打破了流的所有背压功能.Observable是推送技术.所有输入块将被读取并尽快推送给观察者.根据您的情况,它可能不是最佳解决方案.


Vla*_*pov 5

自 Node v11.14.0 流支持for await https://nodejs.org/api/stream.html#stream_read_symbol_asynciterator

这意味着您可以将流传递给from()操作员。

在底层 rxjs(v7.xx) 将调用fromAsyncIterable()它将返回Observable