Luc*_*que 8 streaming amazon-s3 amazon-web-services node.js
我当前正在进行的项目需要多个进程将数据上传到 S3 中的单个文件。这些数据并行来自多个源,因此为了尽快处理所有源,我们将使用多个 Nodejs 实例来监听源。由于存在内存和存储限制,因此将所有摄取的数据加载到内存或存储在磁盘中,然后执行单次上传是不可能的。
为了遵守这些限制,我实现了流式上传:它缓冲来自单个源的一小部分数据,并将缓冲区传输到上传流。当使用单个 Nodejs 进程时,这非常有效,但是,正如我提到的,目标是并行处理所有源。我的第一次尝试是打开多个流到存储桶中的同一对象键。这只是用最后一个要关闭的流中的数据覆盖该文件。所以我放弃了这个选项。
// code for the scenario above, where each process will open a separete stream to
// the same key and perform it's data ingestion and upload.
openStreamingUpload() {
const stream = require('stream');
const AWS = require('aws-sdk');
const s3 = new this.AWS.S3(/* s3 config */);
const passThrough = new stream.PassThrough();
const params = {
Key: 'final-s3-file.txt',
Bucket: 'my-bucket',
Body: passThrough
};
s3
.upload(params)
.promise();
return passThrough;
}
async main() { // simulating a "never ending" flow of data
const stream = openStreamingUpload();
let data = await receiveData();;
do {
stream.write(data);
data = await receiveData();
} while(data);
stram.close();
}
main();
Run Code Online (Sandbox Code Playgroud)
接下来我尝试了S3 API 提供的分段上传。首先,我创建一个分段上传,获取其ID并将其存储在共享空间中。之后,我尝试在集群将使用的所有 Nodejs 进程上打开多个分段上传,并使用事先获得的相同 UploadId。每一个分段上传都应该有一个流来传输接收到的数据。我遇到的问题是,分段上传需要事先知道部分长度,并且当我通过管道传输一个流时,我不知道何时关闭或将传输的数据量,因此无法计算它的大小。代码受此实现启发。
const AWS = require('aws-sdk');
const s3 = new this.AWS.S3(/* s3 config */);
async startMultipartUpload()
const multiPartParams = {
Key: 'final-s3-file.txt',
Bucket: 'my-bucket'
};
const multipart = await s3.createMultipartUpload(multiPartParams).promise();
return multipart.UploadId;
}
async finishMultipartUpload(multipartUploadId) {
const finishingParams = {
Key: 'final-s3-file.txt',
Bucket: 'my-bucket',
UploadId: multipartUploadId
};
const data = await s3.completeMultipartUpload(finishingParams).promise();
return data;
}
async openMultiparStream(multipartUploadId) {
const stream = require('stream');
const passThrough = new stream.PassThrough();
const params = {
Body: passThrough.,
Key: 'final-s3-file.txt',
Bucket: 'my-bucket',
UploadId: multipartUploadId,
PartNumber: // how do I know this part number when it's, in principle, unbounded?
};
s3
.uploadPart(params)
.promise();
return passThrough
}
// a single process will start the multipart upload
const uploadId startMultipartUpload();
async main() { // simulating a "never ending" flow of data
const stream = openMultiparStream(uploadId);
let data = await receiveData();;
do {
stream.write(data);
data = await receiveData();
} while(data);
stram.close();
}
main(); // all the processes will receive and upload to the same UploadId
finishMultipartUpload(uploadId); // only the last process to closm will finish the multipart upload.
Run Code Online (Sandbox Code Playgroud)
通过搜索,我发现AWS 的文章upload()
介绍了 API 方法,并说它抽象了多部分 API,以允许使用管道数据流上传大文件。所以我想知道是否可以从流式“简单”上传中获取 uploadId,这样我就可以在集群中传递此 Id 并上传到同一对象,同时仍然保持流式传输特性。有人尝试过这种“流式分段”上传的场景吗?
归档时间: |
|
查看次数: |
3697 次 |
最近记录: |