Nodejs:在执行下一行之前如何等待管道和createWriteStream完成?

Kid*_*g_C 8 stream node.js async-await

我是 Nodejs 新手。我将一个 zip 文件从 S3 存储桶保存到托管 Nodejs 服务的 EC2 实例,然后在 EC2 的文件系统中本地解压缩该 zip 文件:


  const s3Item = await S3.getFile(objectBucket, objectKey);
  let stream = s3Item.Body.pipe(fs.createWriteStream(sourceFilePath, { mode: 0o777 }));
  stream.on('finish', () => {
    logger.info(` pipe done `);  
  });


  logger.info(`start decompressing...`);
  await decompressArchive(sourceFilePath, targetDirectory_decompressed);

  ... a lot of code...

Run Code Online (Sandbox Code Playgroud)

但是,start decompressing...总是在pipe done打印之前打印。

如何使这两个步骤同步,以便我们等到pipe done,然后开始解压缩?

我想使用 async/await ,因为我根本无法将所有其余代码放入块中stream.on('finish', () => {});,因为代码太多,并且它们都依赖于正在完成的流。

我已经搜索了相关答案(其中有很多),但我仍然无法使其工作。

jfr*_*d00 12

嗯,流是事件驱动的。您可以将要在流完成后运行的代码放入finish事件处理程序中:

  const s3Item = await S3.getFile(objectBucket, objectKey);
  let stream = s3Item.Body.pipe(fs.createWriteStream(sourceFilePath, { mode: 0o777 }));
  stream.on('finish', async () => {
    logger.info(` pipe done `);  
    logger.info(`start decompressing...`);
    await decompressArchive(sourceFilePath, targetDirectory_decompressed);
  });
Run Code Online (Sandbox Code Playgroud)

或者,您可以将finish事件包装在一个承诺中,并且await

  const s3Item = await S3.getFile(objectBucket, objectKey);
  let stream = s3Item.Body.pipe(fs.createWriteStream(sourceFilePath, { mode: 0o777 }));

  await new Promise((resolve, reject) => {
    stream.on('finish', () => {
      logger.info(` pipe done `);
      resolve();  
    }).on('error', err => {
      reject(err);
    });
  });

  logger.info(`start decompressing...`);
  await decompressArchive(sourceFilePath, targetDirectory_decompressed);
Run Code Online (Sandbox Code Playgroud)

仅供参考,最新版本的 nodejsonce()在事件模块中有一个函数,可以让这变得更容易一些:

  const { once } = require('events');

  const s3Item = await S3.getFile(objectBucket, objectKey);
  let stream = s3Item.Body.pipe(fs.createWriteStream(sourceFilePath, { mode: 0o777 }));

  await once(stream, 'finish');
  logger.info(` pipe done `);

  logger.info(`start decompressing...`);
  await decompressArchive(sourceFilePath, targetDirectory_decompressed);
Run Code Online (Sandbox Code Playgroud)

或者,您可以使用 Promisified 版本pipeline()来代替.pipe(). 有很多方法可以做到这一点。