将流传递给s3.upload()

wom*_*omp 61 amazon-s3 amazon-web-services node.js node-streams

我目前正在使用名为s3-upload-stream的node.js插件非常大的文件流式传输到Amazon S3.它使用多部分API,并且在大多数情况下它运行良好.

但是,这个模块显示了它的年龄,我已经不得不对它进行修改(作者也弃用了它).今天我遇到了亚马逊的另一个问题,我真的想接受作者的推荐并开始使用官方的aws-sdk来完成我的上传.

但.

官方SDK似乎不支持管道s3.upload().s3.upload的本质是您必须将可读流作为参数传递给S3构造函数.

我有大约120多个用户代码模块进行各种文件处理,并且它们与输出的最终目的地无关.引擎为它们提供了一个可管理的可写输出流,然后它们就会输出它.我无法将AWS.S3它们交给对象,并要求它们upload()在不向所有模块添加代码的情况下调用它.我使用的原因s3-upload-stream是因为它支持管道.

有没有办法制作aws-sdk s3.upload()我能管道流的东西?

小智 95

upload()使用node.js stream.PassThrough()流包装S3 函数.

这是一个例子:

inputStream
  .pipe(uploadFromStream(s3));

function uploadFromStream(s3) {
  var pass = new stream.PassThrough();

  var params = {Bucket: BUCKET, Key: KEY, Body: pass};
  s3.upload(params, function(err, data) {
    console.log(err, data);
  });

  return pass;
}
Run Code Online (Sandbox Code Playgroud)

  • 当你这样做时,你的PassThrough流是否关闭?我有一段时间在s3.upload中关闭我的PassThrough流. (5认同)
  • 上传文件的大小为0字节.如果我将相同的数据从源流传输到文件系统,那么一切正常.任何的想法? (5认同)
  • 直通流将接收写入的字节并将其输出。这使您可以返回可写流,当您对其进行写入时,aws-sdk将读取该可写流。我还要从s3.upload()返回响应对象,因为否则您不能确保上传完成。 (3认同)
  • 太好了,这解决了我非常丑陋的问题=-)您能解释一下stream.PassThrough()实际做什么吗? (2认同)
  • 这不是与将可读流传递给 Body 相同但需要更多代码吗?AWS SDK 仍将在 PassThrough 流上调用 read(),因此没有真正的管道一直到 S3。唯一的区别是中间有一个额外的流。 (2认同)

Ahm*_*tin 44

有点迟到的答案,它可能有希望帮助别人.您可以返回可写流和承诺,这样您就可以在上载完成时获取响应数据.

const AWS = require('aws-sdk');
const stream = require('stream');

const uploadStream = ({ Bucket, Key }) => {
  const s3 = new AWS.S3();
  const pass = new stream.PassThrough();
  return {
    writeStream: pass,
    promise: s3.upload({ Bucket, Key, Body: pass }).promise(),
  };
}
Run Code Online (Sandbox Code Playgroud)

您可以使用以下功能:

const { writeStream, promise } = uploadStream({Bucket: 'yourbucket', Key: 'yourfile.mp4'});
const readStream = fs.createReadStream('/path/to/yourfile.mp4');

readStream.pipe(writeStream);
promise.then(console.log);
Run Code Online (Sandbox Code Playgroud)


sr9*_*yar 41

我认为值得更新 AWS SDK v3 的答案:)。

S3 客户端不再具有upload功能,@aws-sdk/lib-storage 建议按照https://github.com/aws/aws-sdk-js-v3/blob/main/lib/lib-storage/README.md使用该包

因此,生成的代码片段应如下所示:

import { S3Client } from '@aws-sdk/client-s3';
import { Upload } from '@aws-sdk/lib-storage';
const stream = require('stream');

...

const client = new S3Client({
  credentials: {
    accessKeyId: process.env.AWS_ACCESS_KEY_ID,
    secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY,
  },
  region: process.env.AWS_DEFAULT_REGION,
});

...

async function uploadStream(readableStream) {

  const Key = 'filename.pdf'; 
  const Bucket = 'bucket-name';
  const passThroughStream = new stream.PassThrough();

  let res;

  try {
    const parallelUploads3 = new Upload({
      client,
      params: {
        Bucket,
        Key,
        Body: passThroughStream,
        ACL:'public-read',
      },
      queueSize: 4,
      partSize: 1024 * 1024 * 5,
      leavePartsOnError: false,
    });

    readableStream.pipe(passThroughStream);
    res = await parallelUploads3.done();
  } catch (e) {
    console.log(e);
  }

  return res;
}
Run Code Online (Sandbox Code Playgroud)

  • 真正的MVP就在这里!您也可以只设置 `Body: readableStream` (2认同)

tsu*_*suz 36

在接受的答案中,该功能在上传完成之前结束,因此,它是不正确的.下面的代码从可读流中正确管道.

上传参考

async function uploadReadableStream(stream) {
  const params = {Bucket: bucket, Key: key, Body: stream};
  return s3.upload(params).promise();
}

async function upload() {
  const readable = getSomeReadableStream();
  const results = await uploadReadableStream(readable);
  console.log('upload complete', results);
}
Run Code Online (Sandbox Code Playgroud)

您还可以更进一步,使用以下方式输出进度信息ManagedUpload:

const manager = s3.upload(params);
manager.on('httpUploadProgress', (progress) => {
  console.log('progress', progress) // { loaded: 4915, total: 192915, part: 1, key: 'foo.jpg' }
});
Run Code Online (Sandbox Code Playgroud)

ManagedUpload引用

可用事件列表

  • 这表示接受的答案是不完整的,但是对于s3.upload的管道不起作用,如@Womp的更新帖子中所示.如果更新此答案以获取其他内容的管道输出,那将非常有用! (7认同)
  • @tsuz,尝试实施您的解决方案给我一个错误:`TypeError: dest.on is not a function`,知道为什么吗? (2认同)

cor*_*opy 9

没有一个答案对我有用,因为我想:

  • 管道进入 s3.upload()
  • 将 的结果通过管道s3.upload()输送到另一个流中

接受的答案不做后者。其他的依赖promise api,这在使用流管道时很麻烦。

这是我对已接受答案的修改。

const s3 = new S3();

function writeToS3({Key, Bucket}) {
  const Body = new stream.PassThrough();

  s3.upload({
    Body,
    Key,
    Bucket: process.env.adpBucket
  })
   .on('httpUploadProgress', progress => {
       console.log('progress', progress);
   })
   .send((err, data) => {
     if (err) {
       Body.destroy(err);
     } else {
       console.log(`File uploaded and available at ${data.Location}`);
       Body.destroy();
     }
  });

  return Body;
}

const pipeline = myReadableStream.pipe(writeToS3({Key, Bucket});

pipeline.on('close', () => {
  // upload finished, do something else
})
pipeline.on('error', () => {
  // upload wasn't successful. Handle it
})
Run Code Online (Sandbox Code Playgroud)


小智 7

Type Script 解决方案:
此示例使用:

import * as AWS from "aws-sdk";
import * as fsExtra from "fs-extra";
import * as zlib from "zlib";
import * as stream from "stream";
Run Code Online (Sandbox Code Playgroud)

和异步功能:

public async saveFile(filePath: string, s3Bucket: AWS.S3, key: string, bucketName: string): Promise<boolean> { 

         const uploadStream = (S3: AWS.S3, Bucket: string, Key: string) => {
            const passT = new stream.PassThrough();
            return {
              writeStream: passT,
              promise: S3.upload({ Bucket, Key, Body: passT }).promise(),
            };
          };
        const { writeStream, promise } = uploadStream(s3Bucket, bucketName, key);
        fsExtra.createReadStream(filePath).pipe(writeStream);     //  NOTE: Addition You can compress to zip by  .pipe(zlib.createGzip()).pipe(writeStream)
        let output = true;
        await promise.catch((reason)=> { output = false; console.log(reason);});
        return output;
}
Run Code Online (Sandbox Code Playgroud)

在某处调用此方法,例如:

let result = await saveFileToS3(testFilePath, someS3Bucket, someKey, someBucketName);
Run Code Online (Sandbox Code Playgroud)


emi*_*ich 7

遵循其他答案并使用最新的适用于 Node.js 的 AWS 开发工具包,有一个更干净、更简单的解决方案,因为 s3 upload() 函数使用等待语法和 S3 的承诺接受流:

var model = await s3Client.upload({
    Bucket : bucket,
    Key : key,
    ContentType : yourContentType,
    Body : fs.createReadStream(path-to-file)
}).promise();
Run Code Online (Sandbox Code Playgroud)


mat*_*yer 5

如果它对任何人有帮助,我就能够成功地从客户端流式传输到 s3:

https://gist.github.com/mattlockyer/532291b6194f6d9ca40cb82564db9d2a

服务器端代码假设req是一个流对象,在我的例子中,它是从客户端发送的,并在标头中设置了文件信息。

const fileUploadStream = (req, res) => {
  //get "body" args from header
  const { id, fn } = JSON.parse(req.get('body'));
  const Key = id + '/' + fn; //upload to s3 folder "id" with filename === fn
  const params = {
    Key,
    Bucket: bucketName, //set somewhere
    Body: req, //req is a stream
  };
  s3.upload(params, (err, data) => {
    if (err) {
      res.send('Error Uploading Data: ' + JSON.stringify(err) + '\n' + JSON.stringify(err.stack));
    } else {
      res.send(Key);
    }
  });
};
Run Code Online (Sandbox Code Playgroud)

是的,它打破了惯例,但如果你看一下要点,它比我发现的使用 multer、busboy 等的任何东西都要干净得多......

+1 表示实用主义,并感谢@SalehenRahman 的帮助。


Tim*_*Tim 5

对于那些抱怨当他们使用 s3 api 上传功能并且零字节文件最终出现在 s3 上的人(@Radar155 和 @gabo) - 我也遇到了这个问题。

创建第二个 PassThrough 流,并将所有数据从第一个流传输到第二个流,并将对该第二个流的引用传递给 s3。您可以通过几种不同的方式来做到这一点 - 可能一种肮脏的方式是侦听第一个流上的“数据”事件,然后将相同的数据写入第二个流 - 与“结束”事件类似 - 只需调用第二个流上的结束函数。我不知道这是否是 aws api、节点版本或其他问题中的错误 - 但它为我解决了这个问题。

它可能是这样的:

var PassThroughStream = require('stream').PassThrough;
var srcStream = new PassThroughStream();

var rstream = fs.createReadStream('Learning/stocktest.json');
var sameStream = rstream.pipe(srcStream);
// interesting note: (srcStream == sameStream) at this point
var destStream = new PassThroughStream();
// call your s3.upload function here - passing in the destStream as the Body parameter
srcStream.on('data', function (chunk) {
    destStream.write(chunk);
});

srcStream.on('end', function () {
    dataStream.end();
});
Run Code Online (Sandbox Code Playgroud)


小智 5

The thing here to note in the most accepted answer above is that: You need to return the pass in the function if you are using pipe like,

fs.createReadStream(<filePath>).pipe(anyUploadFunction())

function anyUploadFunction () { 
 let pass = new stream.PassThrough();
 return pass // <- Returning this pass is important for the stream to understand where it needs to write to.
}
Run Code Online (Sandbox Code Playgroud)

否则它会默默地移动到下一个而不会抛出错误,或者会抛出错误,TypeError: dest.on is not a function具体取决于您如何编写函数