Inf*_*911 6 javascript pipeline pipe stream node.js
假设我有以下代码:
try {
let size = 0;
await pipeline(
fs.createReadStream('lowercase.txt'),
async function* (source) {
for await (const chunk of source) {
size += chunk.length;
if (size >= 1000000) {
throw new Error('File is too big');
}
yield String(chunk).toUpperCase();
}
},
fs.createWriteStream('uppercase.txt')
);
console.log('Pipeline succeeded.');
} catch (error) {
console.log('got error:', error);
}
Run Code Online (Sandbox Code Playgroud)
如何确保在每种情况下都正确关闭流?节点文档没有多大帮助——它们只是告诉我,我将有悬空事件侦听器:
Stream.pipeline() 将在所有流上调用stream.destroy(err),除了:
已发出“结束”或“关闭”的可读流。
已发出“完成”或“关闭”信号的可写流。
调用回调后,stream.pipeline() 在流上留下悬空事件侦听器。在失败后重用流的情况下,这可能会导致事件侦听器泄漏和吞没错误。
eri*_*2k8 10
太长了;
pipe有那些问题pipeline是为了解决所有这些问题而创建的,而且确实如此pipeline如果从头到尾拥有所有部分就很好,但如果没有:
stream.compose功能来解决这个问题接受的答案只是被忽略了pipeline,但它是专门为解决这个问题而设计的。绝对遭受了它的痛苦(更多内容见下文),但我还没有发现没有正确关闭文件、http 等周围的流的pipe情况。YMMV 带有随机 npm 包,但如果它有一个或函数,以及一个事件,应该没问题。pipelineclosedestroyon('error'
为了演示,这会调用 shell 来查看我们的测试文件是否打开:
const listOpenFiles = async () => {
const { stdout } = await promisify(exec)("lsof -c node | awk '{print $9}'");
// only show our test files
const openFiles = stdout.split('\n').filter((str) => str.endsWith('case.txt'));
console.log('***** open files:\n', openFiles, '\n-------------');
};
Run Code Online (Sandbox Code Playgroud)
如果您在上面示例中的循环内调用它:
for await (const chunk of source) {
await listOpenFiles();
Run Code Online (Sandbox Code Playgroud)
输出将不断重复:
***** open files:
[
'/path/to/lowercase.txt',
'/path/to/uppercase.txt'
]
Run Code Online (Sandbox Code Playgroud)
如果您在捕获后再次调用它,您可以看到所有内容都已关闭。
***** open files:
[]
Run Code Online (Sandbox Code Playgroud)
文档pipeline在前两个要点中提到的是,它不会关闭已经关闭的流,因为……好吧,它们已经关闭了。至于悬空的侦听器,它们确实留在传递给的各个流上pipeline。但是,在您的示例(典型情况)中,您无论如何都不会保留对各个流的引用;管道完成后,它们将立即被垃圾收集。例如,如果您经常引用其中一个副作用,则这是有关潜在副作用的警告。
***** open files:
[
'/path/to/lowercase.txt',
'/path/to/uppercase.txt'
]
Run Code Online (Sandbox Code Playgroud)
相反,最好拥有“干净”的实例。现在生成器函数很容易链接,甚至引用转换的情况也不太常见,但您可以简单地创建一个返回新实例而不是常量实例的函数:
***** open files:
[]
Run Code Online (Sandbox Code Playgroud)
简而言之,上面的例子在这三点上都很好。
pipepipe另一方面,确实存在上述传播问题。
// using this same instance over and over will end up with tons of dangling listeners
export const capitalizer = new Transform(// ...
Run Code Online (Sandbox Code Playgroud)
人们普遍认为这很痛苦/不直观,但现在改变它已经太晚了。我会尽可能避免它,并pipeline尽可能推荐它。然而,需要注意的是,这pipeline需要将所有部分放在一起。Writable因此,例如对于上述内容,您还需要最终目标。pipe如果您只想构建链的一部分,您仍然必须在这种情况下使用。解决这个问题的方法更容易单独推理:
export const createCaptilizer = () => new Transform(// ...
Run Code Online (Sandbox Code Playgroud)
不过,有好消息。它仍处于实验阶段,但很快 Stream 将公开一个stream.compose函数。它具有 的所有传播/清理优点pipeline,但仅返回一个新流。本质上,这就是大多数人认为会做的事情pipe。;)
const csvStream = (file) => {
// does not expose file errors, nor clean up the file stream on parsing errors!!!
return fs.createReadStream(file).pipe(createCsvTransform());
};
Run Code Online (Sandbox Code Playgroud)
在那之前,请查看https://www.npmjs.com/package/stream-chain
pipeline关于和的注释await请注意,上面的示例使用await pipeline(//...,但链接的文档是同步版本。这不会返回一个承诺,所以await什么也不做。从节点 15 开始,您通常需要stream/promises此处的 api: https: //nodejs.org/api/stream.html#streams-promises-api
const csvStream = (file) => {
const fileStream = fs.createReadStream(file);
const transform = createCsvTransform();
// pass file errors forward
fileStream.on('error', (error) => transform.emit('error', error));
// close file stream on parsing errors
transform.on('error', () => fileStream.close());
return transform;
}
Run Code Online (Sandbox Code Playgroud)
在节点 15 之前,您可以使用 util's 做出承诺promisify:
// NO propagation or cleanup
readable.pipe(transform);
// automatic propagation and cleanup
stream.compose(readable, transform);
Run Code Online (Sandbox Code Playgroud)
或者,为了使整个文件变得更简单:
import { pipeline } from 'stream/promises'; // NOT 'stream'
Run Code Online (Sandbox Code Playgroud)
我之所以提到这一点,是因为,如果您await与同步版本一起使用,它实际上不会在 后完成try/catch,因此可能会给人一种错误的印象,即它无法清理,而实际上它尚未完成。
因此,我发现许多 Node.js 流复合操作(例如pipeline()和 ).pipe()在错误处理方面非常糟糕/不完整。例如,如果您只是这样做:
fs.createReadStream("input.txt")
.pipe(fs.createWriteStream("output.txt"))
.on('error', err => {
console.log(err);
}).on('finish', () => {
console.log("all done");
});
Run Code Online (Sandbox Code Playgroud)
您可能会期望,如果打开 readStream 时出现错误,您会在此处的错误处理程序中收到该错误,但“不”,事实并非如此。打开该输入文件时出现的错误将无法处理。有一些逻辑,因为.pipe()返回输出流并且输入错误不是输出流上的错误,但是当它没有通过时,很容易错过输入流上的错误。该.pipe()操作可以侦听输入流上的错误并传递错误(即使它是 apipeErr或不同的东西),然后它也可以在读取错误时正确清理 writeStream。但是,并.pipe()没有那么彻底地实施。它似乎想假设输入流上永远不会出现错误。
相反,您必须单独保存 readStream 对象并直接向其附加错误处理程序才能查看该错误。所以,我只是不再相信这种复合的东西,并且文档从未真正解释如何进行正确的错误处理。我试图查看代码,pipeline()看看我是否能理解错误处理,但这并没有被证明是一个富有成果的努力。
因此,您的特定问题似乎可以通过转换流来完成:
const fs = require('fs');
const { Transform } = require('stream');
const myTransform = new Transform({
transform: function(chunk, encoding, callback) {
let str = chunk.toString('utf8');
this.push(str.toUpperCase());
callback();
}
});
function upperFile(input, output) {
return new Promise((resolve, reject) => {
// common function for cleaning up a partial output file
function errCleanup(err) {
fs.unlink(output, function(e) {
if (e) console.log(e);
reject(err);
});
}
let inputStream = fs.createReadStream(input, {encoding: 'utf8'});
let outputStream = fs.createWriteStream(output, {emitClose: true});
// have to separately listen for read/open errors
inputStream.on("error", err => {
// have to manually close writeStream when there was an error reading
if (outputStream) outputStream.destroy();
errCleanup(err);
});
inputStream.pipe(myTransform)
.pipe(outputStream)
.on("error", errCleanup)
.on("close", resolve);
});
}
// sample usage
upperFile("input.txt", "output.txt").then(() => {
console.log("all done");
}).catch(err => {
console.log("got error", err);
});
Run Code Online (Sandbox Code Playgroud)
正如您所看到的,大约 2/3 的代码以稳健的方式处理错误(内置操作无法正确执行的部分)。
| 归档时间: |
|
| 查看次数: |
8939 次 |
| 最近记录: |