Jas*_*mer 7 etl amazon-s3 node.js node-modules node-streams
我正在使用stream.pipelineNode的功能将一些数据上传到 S3。我正在实施的基本思想是从请求中提取文件并将它们写入 S3。我有一个pipeline可以提取 zip 文件并将它们成功写入 S3 的文件。但是,我希望我的第二个pipeline提出相同的请求,但解压缩并将解压缩的文件写入 S3。管道代码如下所示:
pipeline(request.get(...), s3Stream(zipFileWritePath)),
pipeline(request.get(...), new unzipper.Parse(), etl.map(entry => entry.pipe(s3Stream(createWritePath(writePath, entry)))))
Run Code Online (Sandbox Code Playgroud)
s3Stream 函数如下所示:
function s3Stream(file) {
const pass = new stream.PassThrough()
s3Store.upload(file, pass)
return pass
}
Run Code Online (Sandbox Code Playgroud)
第一个pipeline运行良好,目前在生产中运行良好。但是,在添加第二个管道时,出现以下错误:
Error [ERR_STREAM_PREMATURE_CLOSE]: Premature close
at Parse.onclose (internal/streams/end-of-stream.js:56:36)
at Parse.emit (events.js:187:15)
at Parse.EventEmitter.emit (domain.js:442:20)
at Parse.<anonymous> (/node_modules/unzipper/lib/parse.js:28:10)
at Parse.emit (events.js:187:15)
at Parse.EventEmitter.emit (domain.js:442:20)
at finishMaybe (_stream_writable.js:641:14)
at afterWrite (_stream_writable.js:481:3)
at onwrite (_stream_writable.js:471:7)
at /node_modules/unzipper/lib/PullStream.js:70:11
at afterWrite (_stream_writable.js:480:3)
at process._tickCallback (internal/process/next_tick.js:63:19)
Run Code Online (Sandbox Code Playgroud)
任何想法可能导致此问题或解决此问题的解决方案将不胜感激!
Sce*_*eat 10
当使用管道时,您同意完全使用可读流,您不希望任何内容在可读结束之前停止。
经过一段时间的处理这些恶作剧之后,这里有一些更有用的信息。
import stream from 'stream'
const s1 = new stream.PassThrough()
const s2 = new stream.PassThrough()
const s3 = new stream.PassThrough()
s1.on('end', () => console.log('end 1'))
s2.on('end', () => console.log('end 2'))
s3.on('end', () => console.log('end 3'))
s1.on('close', () => console.log('close 1'))
s2.on('close', () => console.log('close 2'))
s3.on('close', () => console.log('close 3'))
stream.pipeline(
s1,
s2,
s3,
async s => { for await (_ of s) { } },
err => console.log('end', err)
)
Run Code Online (Sandbox Code Playgroud)
现在如果我打电话,s2.end()所有父母都会关闭
end 2
close 2
end 3
close 3
Run Code Online (Sandbox Code Playgroud)
pipeline 相当于 s3(s2(s1)))
但如果我称之为s2.destroy()打印并销毁所有内容,这就是您的问题,流在正常结束之前被销毁,要么是错误,要么是 asyncGenerator/asyncFunction 中的返回/中断/抛出
close 2
end Error [ERR_STREAM_PREMATURE_CLOSE]: Premature close
at PassThrough.onclose (internal/streams/end-of-stream.js:117:38)
at PassThrough.emit (events.js:327:22)
at emitCloseNT (internal/streams/destroy.js:81:10)
at processTicksAndRejections (internal/process/task_queues.js:83:21) {
code: 'ERR_STREAM_PREMATURE_CLOSE'
}
close 1
close 3
Run Code Online (Sandbox Code Playgroud)
您不能让其中一个流无法捕获其错误
调用 allback 后,stream.pipeline() 在流上留下悬空事件侦听器。在失败后重用流的情况下,这可能会导致事件侦听器泄漏和吞没错误。
end 2
close 2
end 3
close 3
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
2923 次 |
| 最近记录: |