CSE*_*SEO 6 node.js node-streams
我对 Nodejs 流还很陌生,所以请耐心等待。
我正在使用节点流pipeline方法来制作流的副本(特别是节点获取的图像,它是可读流)。当我使用同步管道 api时,它工作得很好(例如,我看到管道成功消息控制台日志记录),但我需要使此过程异步,因为我一次处理(制作副本)多个图像。
当我将同步方法包装在 中时util.promisify,它只是永远挂起——承诺似乎永远处于挂起状态(甚至没有抛出一个错误,我可以看到这是最令人沮丧的部分)。我在这里错过了什么吗?接受解决方案的建议,甚至深入了解如何查看问题所在,因为我什至没有收到可以尝试调试的错误消息
这是代码/我尝试过的:
import util from 'util';
import { pipeline, PassThrough } from 'stream';
// synchronous
// confirmed that this works
// the images get copied into the streams array, I get a bunch of 'Pipeline succeeded.' console logs
const streams = allKeys.map(() => pipeline(response.body, new PassThrough(),
(err) => {
if (err) {
console.error('Pipeline failed.', err);
} else {
console.log('Pipeline succeeded.');
}
}
)
);
// asynchronous
// this does not work, I never get any console logs and the promise just hangs forever
const pipelineAsync = util.promisify(pipeline);
const streams = await Promise.all(allKeys.map(async () => {
try {
const stream = await pipelineAsync(
response.body,
new PassThrough()
)
console.log('Pipeline succeeded.');
return stream;
} catch(e) {
console.log('error!', e)
return null;
}
}
));
Run Code Online (Sandbox Code Playgroud)
另外,如果这有帮助,这里有一些注释:
我PassThrough在这里使用是因为这些流也必须可读/上传到 S3(不包含在上面的代码片段中)
另外我在节点 16 上,所以我也尝试使用 frompipeline并stream/promises得到相同的悬挂结果
我还尝试使用同步pipelineapi 并将其手动包装在一个 Promise 中,如下所示:
import { pipeline } from 'stream';
const streams = await Promise.all(allKeys.map(async () => {
console.log('inside the map loop!')
return new Promise((resolve, reject) => {
console.log('we are inside the promise!');
return pipeline(
response.body,
new PassThrough(),
(err) => {
if (err) {
console.error('Pipeline failed.', err);
return reject();
} else {
console.log('Pipeline succeeded.');
return resolve(undefined);
}
}
)
})
}
));
Run Code Online (Sandbox Code Playgroud)
我仍然得到相同的挂起行为(我得到“在地图循环内”和“我们在承诺内”控制台日志,但从未到达管道失败或成功的控制台日志。
要将一个可读流“管道”到多个可写流中,请在可读流的事件处理程序中执行多个write命令,对于该事件执行类似的命令:dataend
var blocked = 0;
function resume() {
if (--blocked === 0) readable.resume();
}
writable1.on("drain", resume);
writable2.on("drain", resume);
readable.on("data", function(chunk) {
if (!writable1.write(chunk)) blocked++;
if (!writable2.write(chunk)) blocked++;
if (blocked > 0) readable.pause();
})
.on("end", function() {
writable1.end();
writable2.end();
});
Run Code Online (Sandbox Code Playgroud)
由于data事件处理程序是同步的,但写入是异步操作,因此即使可写流仍忙于消耗先前的数据块从而使其缓冲区溢出,事件处理程序也可能会继续写入数据块。为了避免这种情况,如果一个可写流的write方法返回false(意味着它想要阻止更多的数据块),则可读流将暂停,并且只有在所有被阻止的可写流再次解除阻止并发出事件后才恢复drain。
还必须考虑错误处理。如果一个可写流失败,其他流是否应该继续?无论如何,如果可读流失败,所有可写流都会被销毁:
readable.on("error", function(err) {
writable1.destroy(err);
writable2.destroy(err);
});
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
708 次 |
| 最近记录: |