将缓冲区转换为nodejs中的ReadableStream

Mas*_*iar 62 javascript buffer stream node.js

我对Buffers和ReadableStreams很新,所以这可能是一个愚蠢的问题.我有一个库作为ReadableStream的输入,但我的输入只是一个base64格式的图像.我可以像这样转换Buffer中的数据:

var img = new Buffer(img_string, 'base64');
Run Code Online (Sandbox Code Playgroud)

但我不知道如何将其转换为ReadableStream或将我获得的Buffer转换为ReadableStream.

有没有办法做到这一点,还是我试图实现不可能的?

谢谢.

ark*_*dyt 104

对于 nodejs 10.17.0 及更高版本:

const { Readable } = require('stream');

const stream = Readable.from(myBuffer.toString());
Run Code Online (Sandbox Code Playgroud)

  • 是的,这是最好的。但我认为 .toString() 没有必要。 (17认同)
  • 对于导入: const { Readable } = require('stream') (6认同)
  • 它适用于 OP 描述的情况,但如果缓冲区包含二进制数据“.toString()”将损坏它 (4认同)
  • 你是最棒的❤️ (2认同)
  • 这对图像文件有影响吗? (2认同)

Mr5*_*5o1 38

像这样的东西......

import { Readable } from 'stream'

const buffer = new Buffer(img_string, 'base64')
const readable = new Readable()
readable._read = () => {} // _read is required but you can noop it
readable.push(buffer)
readable.push(null)

readable.pipe(consumer) // consume the stream
Run Code Online (Sandbox Code Playgroud)

在一般过程中,可读流的_read功能应该从底层源收集数据并push逐步增加,以确保在需要之前不会将大量源存入内存.

在这种情况下,虽然您已经在内存中有源,但_read不是必需的.

推送整个缓冲区只是将它包装在可读的流api中.

  • 将_read()方法中的缓冲区推入(push())会更正确吗?即`reader._read =()=> {read.push(buffer); visible.push(null);}`。不确定是否重要,但是允许流管理数据馈入的时间似乎不太可能发生意外行为。除此之外,这应该是公认的答案,因为它不依赖于第三方模块。 (2认同)

Bry*_*sen 36

节点流缓冲器显然设计用于测试; 无法避免延迟使其成为生产使用的不良选择.

Gabriel Llamas在这个答案中建议使用流化器: 如何将缓冲区包装为stream2可读流?


Iho*_*yuk 23

您可以为此使用标准 NodeJS 流 API -stream.Readable.from

const { Readable } = require('stream');
const stream = Readable.from(buffer);
Run Code Online (Sandbox Code Playgroud)

注意:如果缓冲区包含二进制数据,请勿将缓冲区转换为字符串 ( buffer.toString())。它将导致二进制文件损坏。


van*_*ome 19

您可以使用Node Stream Buffers创建一个ReadableStream,如下所示:

// Initialize stream
var myReadableStreamBuffer = new streamBuffers.ReadableStreamBuffer({
  frequency: 10,      // in milliseconds.
  chunkSize: 2048     // in bytes.
}); 

// With a buffer
myReadableStreamBuffer.put(aBuffer);

// Or with a string
myReadableStreamBuffer.put("A String", "utf8");
Run Code Online (Sandbox Code Playgroud)

频率不能为0,因此会引入一定的延迟.

  • 毫秒不是频率的量度-我认为它们是指周期。 (2认同)

Shw*_*har 6

这是一个使用streamifier模块的简单解决方案。

const streamifier = require('streamifier');
streamifier.createReadStream(new Buffer ([97, 98, 99])).pipe(process.stdout);
Run Code Online (Sandbox Code Playgroud)

您可以使用字符串、缓冲区和对象作为其参数。


Joe*_*kes 6

您不需要为单个文件添加整个 npm lib。我将它重构为打字稿:

import { Readable, ReadableOptions } from "stream";

export class MultiStream extends Readable {
  _object: any;
  constructor(object: any, options: ReadableOptions) {
    super(object instanceof Buffer || typeof object === "string" ? options : { objectMode: true });
    this._object = object;
  }
  _read = () => {
    this.push(this._object);
    this._object = null;
  };
}
Run Code Online (Sandbox Code Playgroud)

基于节点流化器(如上所述的最佳选择)。


Ric*_*gis 6

这是我的简单代码。

import { Readable } from 'stream';

const newStream = new Readable({
                    read() {
                      this.push(someBuffer);
                    },
                  })
Run Code Online (Sandbox Code Playgroud)