节点 - 在管道之后正确关闭流

Inf*_*911 6 javascript pipeline pipe stream node.js

假设我有以下代码:

try {
    let size = 0;

    await pipeline(
        fs.createReadStream('lowercase.txt'),
        async function* (source) {
            for await (const chunk of source) {
                size += chunk.length;
           
                if (size >= 1000000) {
                    throw new Error('File is too big');
                }

                yield String(chunk).toUpperCase();
            }
        },
        fs.createWriteStream('uppercase.txt')
    );

    console.log('Pipeline succeeded.');
} catch (error) {
    console.log('got error:', error);
}
Run Code Online (Sandbox Code Playgroud)

如何确保在每种情况下都正确关闭流?节点文档没有多大帮助——它们只是告诉我,我将有悬空事件侦听器:

Stream.pipeline() 将在所有流上调用stream.destroy(err),除了:

已发出“结束”或“关闭”的可读流。

已发出“完成”或“关闭”信号的可写流。

调用回调后,stream.pipeline() 在流上留下悬空事件侦听器。在失败后重用流的情况下,这可能会导致事件侦听器泄漏和吞没错误。

eri*_*2k8 10

太长了;

  • pipe有那些问题
  • pipeline是为了解决所有这些问题而创建的,而且确实如此
  • pipeline如果从头到尾拥有所有部分就很好,但如果没有:
    • 节点的未来版本将有一个stream.compose功能来解决这个问题
    • 在那之前流链库是一个不错的选择

长篇大论的回答:

接受的答案只是被忽略了pipeline,但它是专门为解决这个问题而设计的。绝对遭受了它的痛苦(更多内容见下文),但我还没有发现没有正确关闭文件、http 等周围的流的pipe情况。YMMV 带有随机 npm 包,但如果它有一个或函数,以及一个事件,应该没问题。pipelineclosedestroyon('error'

为了演示,这会调用 shell 来查看我们的测试文件是否打开:

const listOpenFiles = async () => {
  const { stdout } = await promisify(exec)("lsof -c node | awk '{print $9}'");

  // only show our test files
  const openFiles = stdout.split('\n').filter((str) => str.endsWith('case.txt'));
  console.log('***** open files:\n', openFiles, '\n-------------');
};
Run Code Online (Sandbox Code Playgroud)

如果您在上面示例中的循环内调用它:

for await (const chunk of source) {
  await listOpenFiles();
Run Code Online (Sandbox Code Playgroud)

输出将不断重复:

***** open files:
[
  '/path/to/lowercase.txt',
  '/path/to/uppercase.txt'
]
Run Code Online (Sandbox Code Playgroud)

如果您在捕获后再次调用它,您可以看到所有内容都已关闭。

***** open files:
 [] 
Run Code Online (Sandbox Code Playgroud)

关于引用的文档:

文档pipeline在前两个要点中提到的是,它不会关闭已经关闭的流,因为……好吧,它们已经关闭了。至于悬空的侦听器,它们确实留在传递给的各个流上pipeline。但是,在您的示例(典型情况)中,您无论如何都不会保留对各个流的引用;管道完成后,它们将立即被垃圾收集。例如,如果您经常引用其中一个副作用,则这是有关潜在副作用的警告。

***** open files:
[
  '/path/to/lowercase.txt',
  '/path/to/uppercase.txt'
]
Run Code Online (Sandbox Code Playgroud)

相反,最好拥有“干净”的实例。现在生成器函数很容易链接,甚至引用转换的情况也不太常见,但您可以简单地创建一个返回新实例而不是常量实例的函数:

***** open files:
 [] 
Run Code Online (Sandbox Code Playgroud)

简而言之,上面的例子在这三点上都很好。

更多信息:pipe

pipe另一方面,确实存在上述传播问题。

// using this same instance over and over will end up with tons of dangling listeners
export const capitalizer = new Transform(// ...
Run Code Online (Sandbox Code Playgroud)

人们普遍认为这很痛苦/不直观,但现在改变它已经太晚了。我会尽可能避免它,并pipeline尽可能推荐它。然而,需要注意的是,这pipeline需要将所有部分放在一起。Writable因此,例如对于上述内容,您还需要最终目标。pipe如果您只想构建链的一部分,您仍然必须在这种情况下使用。解决这个问题的方法更容易单独推理:

export const createCaptilizer = () => new Transform(// ...
Run Code Online (Sandbox Code Playgroud)

不过,有好消息。它仍处于实验阶段,但很快 Stream 将公开一个stream.compose函数。它具有 的所有传播/清理优点pipeline,但仅返回一个新流。本质上,这就是大多数人认为会做的事情pipe。;)

const csvStream = (file) => {
  // does not expose file errors, nor clean up the file stream on parsing errors!!!
  return fs.createReadStream(file).pipe(createCsvTransform());
};
Run Code Online (Sandbox Code Playgroud)

在那之前,请查看https://www.npmjs.com/package/stream-chain

pipeline关于和的注释await

请注意,上面的示例使用await pipeline(//...,但链接的文档是同步版本。这不会返回一个承诺,所以await什么也不做。从节点 15 开始,您通常需要stream/promises此处的 api: https: //nodejs.org/api/stream.html#streams-promises-api

const csvStream = (file) => {
  const fileStream = fs.createReadStream(file);
  const transform = createCsvTransform();
  // pass file errors forward
  fileStream.on('error', (error) => transform.emit('error', error));
  // close file stream on parsing errors
  transform.on('error', () => fileStream.close());

  return transform;
}
Run Code Online (Sandbox Code Playgroud)

在节点 15 之前,您可以使用 util's 做出承诺promisify

// NO propagation or cleanup
readable.pipe(transform);

// automatic propagation and cleanup
stream.compose(readable, transform);
Run Code Online (Sandbox Code Playgroud)

或者,为了使整个文件变得更简单:

import { pipeline } from 'stream/promises'; // NOT 'stream'
Run Code Online (Sandbox Code Playgroud)

我之所以提到这一点,是因为,如果您await与同步版本一起使用,它实际上不会在 后完成try/catch,因此可能会给人一种错误的印象,即它无法清理,而实际上它尚未完成。


jfr*_*d00 8

因此,我发现许多 Node.js 流复合操作(例如pipeline()和 ).pipe()在错误处理方面非常糟糕/不完整。例如,如果您只是这样做:

fs.createReadStream("input.txt")
  .pipe(fs.createWriteStream("output.txt"))
  .on('error', err => {
      console.log(err);
  }).on('finish', () => {
      console.log("all done");
  });
Run Code Online (Sandbox Code Playgroud)

您可能会期望,如果打开 readStream 时出现错误,您会在此处的错误处理程序中收到该错误,但“不”,事实并非如此。打开该输入文件时出现的错误将无法处理。有一些逻辑,因为.pipe()返回输出流并且输入错误不是输出流上的错误,但是当它没有通过时,很容易错过输入流上的错误。该.pipe()操作可以侦听输入流上的错误并传递错误(即使它是 apipeErr或不同的东西),然后它也可以在读取错误时正确清理 writeStream。但是,并.pipe()没有那么彻底地实施。它似乎想假设输入流上永远不会出现错误。

相反,您必须单独保存 readStream 对象并直接向其附加错误处理程序才能查看该错误。所以,我只是不再相信这种复合的东西,并且文档从未真正解释如何进行正确的错误处理。我试图查看代码,pipeline()看看我是否能理解错误处理,但这并没有被证明是一个富有成果的努力。

因此,您的特定问题似乎可以通过转换流来完成:

const fs = require('fs');
const { Transform } = require('stream');

const myTransform = new Transform({
    transform: function(chunk, encoding, callback) {
        let str = chunk.toString('utf8');
        this.push(str.toUpperCase());
        callback();
    }
});

function upperFile(input, output) {
    return new Promise((resolve, reject) => {
        // common function for cleaning up a partial output file
        function errCleanup(err) {
            fs.unlink(output, function(e) {
                if (e) console.log(e);
                reject(err);
            });
        }

        let inputStream = fs.createReadStream(input, {encoding: 'utf8'});
        let outputStream = fs.createWriteStream(output, {emitClose: true});

        // have to separately listen for read/open errors
        inputStream.on("error", err => {
            // have to manually close writeStream when there was an error reading
            if (outputStream) outputStream.destroy();
            errCleanup(err);
        });
        inputStream.pipe(myTransform)
            .pipe(outputStream)
            .on("error", errCleanup)
            .on("close", resolve);        
    });
}

// sample usage
upperFile("input.txt", "output.txt").then(() => {
    console.log("all done");
}).catch(err => {
    console.log("got error", err);
});
Run Code Online (Sandbox Code Playgroud)

正如您所看到的,大约 2/3 的代码以稳健的方式处理错误(内置操作无法正确执行的部分)。