Nodejs 管道异步流式传输

CSE*_*SEO 6 node.js node-streams

我对 Nodejs 流还很陌生,所以请耐心等待。

我正在使用节点流pipeline方法来制作流的副本(特别是节点获取的图像,它是可读流)。当我使用同步管道 api时,它工作得很好(例如,我看到管道成功消息控制台日志记录),但我需要使此过程异步,因为我一次处理(制作副本)多个图像。

当我将同步方法包装在 中时util.promisify,它只是永远挂起——承诺似乎永远处于挂起状态(甚至没有抛出一个错误,我可以看到这是最令人沮丧的部分)。我在这里错过了什么吗?接受解决方案的建议,甚至深入了解如何查看问题所在,因为我什至没有收到可以尝试调试的错误消息

这是代码/我尝试过的:

import util from 'util';
import { pipeline, PassThrough } from 'stream';

// synchronous 
// confirmed that this works
// the images get copied into the streams array, I get a bunch of 'Pipeline succeeded.' console logs
const streams = allKeys.map(() => pipeline(response.body, new PassThrough(), 
  (err) => {
    if (err) {
       console.error('Pipeline failed.', err);
     } else {
       console.log('Pipeline succeeded.');
      }
    }
  )
);

// asynchronous
// this does not work, I never get any console logs and the promise just hangs forever
const pipelineAsync = util.promisify(pipeline);
  const streams = await Promise.all(allKeys.map(async () => {
    try {
      const stream = await pipelineAsync(
        response.body,
        new PassThrough()
       )
      console.log('Pipeline succeeded.');
      return stream;
    } catch(e) {
      console.log('error!', e)
      return null;
    }
  }
));
Run Code Online (Sandbox Code Playgroud)

另外,如果这有帮助,这里有一些注释:

  1. PassThrough在这里使用是因为这些流也必须可读/上传到 S3(不包含在上面的代码片段中)

  2. 另外我在节点 16 上,所以我也尝试使用 frompipelinestream/promises得到相同的悬挂结果

  3. 我还尝试使用同步pipelineapi 并将其手动包装在一个 Promise 中,如下所示:

import { pipeline } from 'stream';

const streams = await Promise.all(allKeys.map(async () => {
  console.log('inside the map loop!')
  return new Promise((resolve, reject) => {
    console.log('we are inside the promise!');
    return pipeline(
      response.body,
      new PassThrough(),
      (err) => {
         if (err) {
           console.error('Pipeline failed.', err);
           return reject();
         } else {
          console.log('Pipeline succeeded.');
          return resolve(undefined);
         }
       }
     )
   })
 }
));
Run Code Online (Sandbox Code Playgroud)

我仍然得到相同的挂起行为(我得到“在地图循环内”和“我们在承诺内”控制台日志,但从未到达管道失败或成功的控制台日志。

Hei*_*ßen 0

要将一个可读流“管道”到多个可写流中,请在可读流的事件处理程序中执行多个write命令,对于该事件执行类似的命令:dataend

var blocked = 0;
function resume() {
  if (--blocked === 0) readable.resume();
}
writable1.on("drain", resume);
writable2.on("drain", resume);
readable.on("data", function(chunk) {
  if (!writable1.write(chunk)) blocked++;
  if (!writable2.write(chunk)) blocked++;
  if (blocked > 0) readable.pause();
})
.on("end", function() {
  writable1.end();
  writable2.end();
});
Run Code Online (Sandbox Code Playgroud)

由于data事件处理程序是同步的,但写入是异步操作,因此即使可写流仍忙于消耗先前的数据块从而使其缓冲区溢出,事件处理程序也可能会继续写入数据块。为了避免这种情况,如果一个可写流的write方法返回false(意味着它想要阻止更多的数据块),则可读流将暂停,并且只有在所有被阻止的可写流再次解除阻止并发出事件后才恢复drain

还必须考虑错误处理。如果一个可写流失败,其他流是否应该继续?无论如何,如果可读流失败,所有可写流都会被销毁:

readable.on("error", function(err) {
  writable1.destroy(err);
  writable2.destroy(err);
});
Run Code Online (Sandbox Code Playgroud)