Ale*_*lls 6 stdio node.js node-streams
我产生了一个像这样的子进程:
const n = cp.spawn('bash');
n.stdout.pipe(process.stdout);
n.stderr.pipe(process.stderr);
Run Code Online (Sandbox Code Playgroud)
我正在寻找一个转换流,以便我可以在孩子的每一行的开头添加类似'[child process]'的东西,所以我知道stdio来自孩子而不是父进程.
所以它看起来像:
const getTransformPrepender = function() : Transform {
return ...
}
n.stdout.pipe(getTransformPrepender('[child]')).pipe(process.stdout);
n.stderr.pipe(getTransformPrepender('[child]')).pipe(process.stderr);
Run Code Online (Sandbox Code Playgroud)
有没有人知道是否有这样的现有转换包或如何写一个?
我有这个:
import * as stream from 'stream';
export default function(pre: string){
let saved = '';
return new stream.Transform({
transform(chunk, encoding, cb) {
cb(null, String(pre) + String(chunk));
},
flush(cb) {
this.push(saved);
cb();
}
});
}
Run Code Online (Sandbox Code Playgroud)
但我担心它在边缘情况下不起作用 - 其中一个块突发可能不包含整行(对于很长的行).
看起来这里的答案就在这里:https://strongloop.com/strongblog/practical-examples-of-the-new-node-js-streams-api/
但是这个附录:https: //twitter.com/the1mills/status/886340747275812865
您需要正确处理三种情况:
这是一个解决所有三种情况的算法描述
这是一个实际的实现,描述了为什么需要它等.
请注意,出于性能原因,我没有将Buffers转换为经典的JS字符串.
const { Transform } = require('stream')
const prefix = Buffer.from('[worker]: ')
const prepender = new Transform({
transform(chunk, encoding, done) {
this._rest = this._rest && this._rest.length
? Buffer.concat([this._rest, chunk])
: chunk
let index
// As long as we keep finding newlines, keep making slices of the buffer and push them to the
// readable side of the transform stream
while ((index = this._rest.indexOf('\n')) !== -1) {
// The `end` parameter is non-inclusive, so increase it to include the newline we found
const line = this._rest.slice(0, ++index)
// `start` is inclusive, but we are already one char ahead of the newline -> all good
this._rest = this._rest.slice(index)
// We have a single line here! Prepend the string we want
this.push(Buffer.concat([prefix, line]))
}
return void done()
},
// Called before the end of the input so we can handle any remaining
// data that we have saved
flush(done) {
// If we have any remaining data in the cache, send it out
if (this._rest && this._rest.length) {
return void done(null, Buffer.concat([prefix, this._rest])
}
},
})
process.stdin.pipe(prepender).pipe(process.stdout)
Run Code Online (Sandbox Code Playgroud)
您可以使用以下方法添加到流中:
https://github.com/ORESoftware/prepend-transform
但它旨在解决手头的问题,如下所示:
import pt from 'prepend-transform';
import * as cp from 'child_process';
const n = cp.spawn('bash');
n.stdout.pipe(pt('child stdout: ')).pipe(process.stdout);
n.stderr.pipe(pt('child stderr: ')).pipe(process.stderr);
Run Code Online (Sandbox Code Playgroud)