Node.js:如何将流读入缓冲区?

Gal*_*aim 44 node.js

我写了一个非常简单的函数,从给定的URL下载图像,调整大小并上传到S3(使用'gm'和'knox'),我不知道我是否正在正确地读取流到缓冲区.(一切正常,但这是正确的方法吗?)

另外,我想了解一下事件循环,我怎么知道函数的一次调用不会泄漏任何东西或者将'buf'变量更改为另一个已经运行的调用(或者这种情况是不可能的,因为回调是匿名的功能?)

var http = require('http');
var https = require('https');
var s3 = require('./s3');
var gm = require('gm');

module.exports.processImageUrl = function(imageUrl, filename, callback) {
var client = http;
if (imageUrl.substr(0, 5) == 'https') { client = https; }

client.get(imageUrl, function(res) {
    if (res.statusCode != 200) {
        return callback(new Error('HTTP Response code ' + res.statusCode));
    }

    gm(res)
        .geometry(1024, 768, '>')
        .stream('jpg', function(err, stdout, stderr) {
            if (!err) {
                var buf = new Buffer(0);
                stdout.on('data', function(d) {
                    buf = Buffer.concat([buf, d]);
                });

                stdout.on('end', function() {
                    var headers = {
                        'Content-Length': buf.length
                        , 'Content-Type': 'Image/jpeg'
                        , 'x-amz-acl': 'public-read'
                    };

                    s3.putBuffer(buf, '/img/d/' + filename + '.jpg', headers, function(err, res) {
                        if(err) {
                            return callback(err);
                        } else {
                            return callback(null, res.client._httpMessage.url);
                        }
                    });
                });
            } else {
                callback(err);
            }
        });
    }).on('error', function(err) {
        callback(err);
    });
};
Run Code Online (Sandbox Code Playgroud)

log*_*yth 73

总的来说,我没有看到任何会破坏您的代码的东西.

两个建议:

组合Buffer对象的方式不是最理想的,因为它必须复制每个"数据"事件上的所有预先存在的数据.将块放在一个数组中最好将concat它们全部放在最后.

var bufs = [];
stdout.on('data', function(d){ bufs.push(d); });
stdout.on('end', function(){
  var buf = Buffer.concat(bufs);
}
Run Code Online (Sandbox Code Playgroud)

为了提高性能,我会调查您使用的S3库是否支持流.理想情况下,您根本不需要创建一个大缓冲区,而只是将stdout流直接传递给S3库.

至于你问题的第二部分,那是不可能的.调用函数时,会为其分配自己的私有上下文,并且只能从该函数内定义的其他项访问其中定义的所有内容.

更新

将文件转储到文件系统可能意味着每个请求的内存使用量更少,但文件IO可能非常慢,因此可能不值得.我要说你不应该优化太多,直到你能分析和压力测试这个功能.如果垃圾收集器正在执行其工作,您可能会过度优化.

尽管如此,还是有更好的方法,所以不要使用文件.由于您只需要长度,因此无需将所有缓冲区附加在一起即可计算出来,因此您根本不需要分配新的缓冲区.

var pause_stream = require('pause-stream');

// Your other code.

var bufs = [];
stdout.on('data', function(d){ bufs.push(d); });
stdout.on('end', function(){
  var contentLength = bufs.reduce(function(sum, buf){
    return sum + buf.length;
  }, 0);

  // Create a stream that will emit your chunks when resumed.
  var stream = pause_stream();
  stream.pause();
  while (bufs.length) stream.write(bufs.shift());
  stream.end();

  var headers = {
      'Content-Length': contentLength,
      // ...
  };

  s3.putStream(stream, ....);
Run Code Online (Sandbox Code Playgroud)


Ric*_*ler 34

注意:这仅回答“如何将流读入缓冲区?” 并忽略原始问题的上下文。

ES2018答案

从 Node 11.14.0 开始,可读流支持异步迭代器

const buffers = [];

// node.js readable streams implement the async iterator protocol
for await (const data of readableStream) {
  buffers.push(data);
}

const finalBuffer = Buffer.concat(buffers);
Run Code Online (Sandbox Code Playgroud)

奖励:将来,通过stage 2 stage 3Array.fromAsync提案,这可能会变得更好。

//  DOES NOT WORK (yet!)
const finalBuffer = Buffer.concat(await Array.fromAsync(readableStream));
Run Code Online (Sandbox Code Playgroud)


bso*_*ino 9

Javascript 片段

function stream2buffer(stream) {

    return new Promise((resolve, reject) => {
        
        const _buf = [];

        stream.on("data", (chunk) => _buf.push(chunk));
        stream.on("end", () => resolve(Buffer.concat(_buf)));
        stream.on("error", (err) => reject(err));

    });
} 
Run Code Online (Sandbox Code Playgroud)

打字稿片段

async function stream2buffer(stream: Stream): Promise<Buffer> {

    return new Promise < Buffer > ((resolve, reject) => {
        
        const _buf = Array < any > ();

        stream.on("data", chunk => _buf.push(chunk));
        stream.on("end", () => resolve(Buffer.concat(_buf)));
        stream.on("error", err => reject(`error converting stream - ${err}`));

    });
} 
Run Code Online (Sandbox Code Playgroud)

  • 这效果非常好...这就是 MVP (3认同)

Tib*_*tan 7

如果要从http(s)URI中提取,则可以使用node-fetch轻松完成此操作。

从自述文件:

fetch('https://assets-cdn.github.com/images/modules/logos_page/Octocat.png')
    .then(res => res.buffer())
    .then(buffer => console.log)
Run Code Online (Sandbox Code Playgroud)

  • 您还可以滥用节点获取中的“Response”从*任何*流中获取缓冲区,而不仅仅是http:“new Response(stream).buffer()”。 (2认同)

SLI*_*med 7

您可以将可读流转换为缓冲区,并以异步方式将其集成到代码中,如下所示。

async streamToBuffer (stream) {
    return new Promise((resolve, reject) => {
      const data = [];

      stream.on('data', (chunk) => {
        data.push(chunk);
      });

      stream.on('end', () => {
        resolve(Buffer.concat(data))
      })

      stream.on('error', (err) => {
        reject(err)
      })
   
    })
  }
Run Code Online (Sandbox Code Playgroud)

用法很简单:

 // usage
  const myStream // your stream
  const buffer = await streamToBuffer(myStream) // this is a buffer
Run Code Online (Sandbox Code Playgroud)


Mad*_*cks 5

我建议 loganfsmyths 方法,使用数组来保存数据。

var bufs = [];
stdout.on('data', function(d){ bufs.push(d); });
stdout.on('end', function(){
  var buf = Buffer.concat(bufs);
}
Run Code Online (Sandbox Code Playgroud)

在我当前的工作示例中,我正在使用 GRIDfs 和 npm 的 Jimp。

   var bucket = new GridFSBucket(getDBReference(), { bucketName: 'images' } );
    var dwnldStream = bucket.openDownloadStream(info[0]._id);// original size
  dwnldStream.on('data', function(chunk) {
       data.push(chunk);
    });
  dwnldStream.on('end', function() {
    var buff =Buffer.concat(data);
    console.log("buffer: ", buff);
       jimp.read(buff)
.then(image => {
         console.log("read the image!");
         IMAGE_SIZES.forEach( (size)=>{
         resize(image,size);
         });
});

Run Code Online (Sandbox Code Playgroud)

我做了一些其他研究

使用字符串方法,但这不起作用,可能是因为我正在从图像文件中读取,但数组方法确实有效。

const DISCLAIMER = "DONT DO THIS";
var data = "";
stdout.on('data', function(d){ 
           bufs+=d; 
         });
stdout.on('end', function(){
          var buf = Buffer.from(bufs);
          //// do work with the buffer here

          });
Run Code Online (Sandbox Code Playgroud)

当我执行 string 方法时,我从 npm jimp 收到此错误

var bufs = [];
stdout.on('data', function(d){ bufs.push(d); });
stdout.on('end', function(){
  var buf = Buffer.concat(bufs);
}
Run Code Online (Sandbox Code Playgroud)

基本上我认为从二进制到字符串的类型强制转换效果不太好。