我认为这很简单,但以下内容不能按预期工作.
我想wc从Node 管道数据到一个进程,比如说(只是一个用于说明的任意命令).
const {spawnSync} = require('child_process')
const {Readable} = require('stream')
const textStream = new Readable()
textStream.push("one two three")
textStream.push(null)
const stdio = [textStream, process.stdout, process.stderr]
spawnSync('wc', ["-c"], { stdio })
Run Code Online (Sandbox Code Playgroud)
不幸的是,这会引发错误:
值"可读{...}对选项"stdio"无效
该代码相关的位由internal/child_process.js不立即揭示了什么预期的有效选项.
我正在使用stream.pipelineNode的功能将一些数据上传到 S3。我正在实施的基本思想是从请求中提取文件并将它们写入 S3。我有一个pipeline可以提取 zip 文件并将它们成功写入 S3 的文件。但是,我希望我的第二个pipeline提出相同的请求,但解压缩并将解压缩的文件写入 S3。管道代码如下所示:
pipeline(request.get(...), s3Stream(zipFileWritePath)),
pipeline(request.get(...), new unzipper.Parse(), etl.map(entry => entry.pipe(s3Stream(createWritePath(writePath, entry)))))
Run Code Online (Sandbox Code Playgroud)
s3Stream 函数如下所示:
function s3Stream(file) {
const pass = new stream.PassThrough()
s3Store.upload(file, pass)
return pass
}
Run Code Online (Sandbox Code Playgroud)
第一个pipeline运行良好,目前在生产中运行良好。但是,在添加第二个管道时,出现以下错误:
Error [ERR_STREAM_PREMATURE_CLOSE]: Premature close
at Parse.onclose (internal/streams/end-of-stream.js:56:36)
at Parse.emit (events.js:187:15)
at Parse.EventEmitter.emit (domain.js:442:20)
at Parse.<anonymous> (/node_modules/unzipper/lib/parse.js:28:10)
at Parse.emit (events.js:187:15)
at Parse.EventEmitter.emit (domain.js:442:20)
at finishMaybe (_stream_writable.js:641:14)
at afterWrite (_stream_writable.js:481:3)
at onwrite (_stream_writable.js:471:7)
at /node_modules/unzipper/lib/PullStream.js:70:11
at afterWrite (_stream_writable.js:480:3)
at process._tickCallback (internal/process/next_tick.js:63:19) …Run Code Online (Sandbox Code Playgroud) 我知道gulp需要一个乙烯基源流才能正常工作,但是有一种简单的方法可以使用已经存在的乙烯基文件或json对象而不是众所周知的仅采用globs的gulp.src吗?
我产生了一个像这样的子进程:
const n = cp.spawn('bash');
n.stdout.pipe(process.stdout);
n.stderr.pipe(process.stderr);
Run Code Online (Sandbox Code Playgroud)
我正在寻找一个转换流,以便我可以在孩子的每一行的开头添加类似'[child process]'的东西,所以我知道stdio来自孩子而不是父进程.
所以它看起来像:
const getTransformPrepender = function() : Transform {
return ...
}
n.stdout.pipe(getTransformPrepender('[child]')).pipe(process.stdout);
n.stderr.pipe(getTransformPrepender('[child]')).pipe(process.stderr);
Run Code Online (Sandbox Code Playgroud)
有没有人知道是否有这样的现有转换包或如何写一个?
我有这个:
import * as stream from 'stream';
export default function(pre: string){
let saved = '';
return new stream.Transform({
transform(chunk, encoding, cb) {
cb(null, String(pre) + String(chunk));
},
flush(cb) {
this.push(saved);
cb();
}
});
}
Run Code Online (Sandbox Code Playgroud)
但我担心它在边缘情况下不起作用 - 其中一个块突发可能不包含整行(对于很长的行).
看起来这里的答案就在这里:https://strongloop.com/strongblog/practical-examples-of-the-new-node-js-streams-api/
但是这个附录:https: //twitter.com/the1mills/status/886340747275812865
这段代码
const file = require("fs").createWriteStream("./test.dat");
for(var i = 0; i < 1e7; i++){
file.write("a");
}
Run Code Online (Sandbox Code Playgroud)
运行约30秒后出现此错误消息
<--- Last few GCs --->
[47234:0x103001400] 27539 ms: Mark-sweep 1406.1 (1458.4) -> 1406.1 (1458.4) MB, 2641.4 / 0.0 ms allocation failure GC in old space requested
[47234:0x103001400] 29526 ms: Mark-sweep 1406.1 (1458.4) -> 1406.1 (1438.9) MB, 1986.8 / 0.0 ms last resort GC in old spacerequested
[47234:0x103001400] 32154 ms: Mark-sweep 1406.1 (1438.9) -> 1406.1 (1438.9) MB, 2628.3 / 0.0 ms last resort GC in …Run Code Online (Sandbox Code Playgroud) 我发现 Busboy 和节点流 highWaterMark 属性存在一些我不理解的行为。我预计块的大小最大为 highWaterMark 值,但块大小看起来不受 highWaterMark 设置的影响。
我已经fileHwm: 5在 Busboy 中设置了选项,并且我的快速路线设置如下
app.post('/upload', function(req, res, next) {
req.pipe(req.busboy); // Pipe it through busboy
req.busboy.on('file', (fieldname, file, filename) => {
console.log(`Upload of '${filename}' started`);
console.log(file);
file.on('readable', () => {
let chunk;
while (null !== (chunk = file.read())) {
console.log(`Received ${chunk.length} bytes of data.`);
}
});
});
});
Run Code Online (Sandbox Code Playgroud)
当我登录时file,看起来不错,并且我看到该highWaterMark属性已设置得很好
FileStream {
_readableState:
ReadableState {
objectMode: false,
highWaterMark: 5,
...
Run Code Online (Sandbox Code Playgroud)
但我得到的块的大小并不5像我预期的那样 - 相反我看到
Received …Run Code Online (Sandbox Code Playgroud) 我在 Nodejs 模式下使用优秀的 Papa Parse 库,将超过 100 万行的大型(500 MB)CSV 文件流式传输到缓慢的持久性 API,该 API 一次只能接受一个请求。持久性 API 基于Promises,但从 Papa Parse,我在同步事件中收到每个解析的 CSV 行,如下所示:parseStream.on("data", row => { ... }
我面临的挑战是 Papa Parse 从流中转储 CSV 行的速度太快,以至于我缓慢的持久性 API 无法跟上。因为 Papa 是同步的,而我的 API 是基于Promise的,所以我不能只调用事件处理await doDirtyWork(row)程序on,因为同步和异步代码不会混合。
或者它们可以混合,但我只是不知道如何混合?
我的问题是,我可以让 Papa 的事件处理程序等待我的 API 调用完成吗?是不是直接在on("data")事件中执行持久性 API 请求,让on()函数以某种方式徘徊直到脏 API 工作完成?
就内存占用而言,我到目前为止的解决方案并不比使用 Papa 的非流模式好多少。实际上,我需要以生成器函数迭代的形式对大量事件进行排队。on("data")我还可以将 Promise 工厂排列在一个数组中,并在循环中进行处理。无论哪种方式,我最终都会将几乎整个 CSV 文件作为未来 Promise(承诺工厂)的巨大集合保存在内存中,直到我缓慢的 API 调用完全完成。
async importCSV(filePath) { …Run Code Online (Sandbox Code Playgroud) 我正在使用puppeteer-stream来获取在服务器上运行的由 Node 控制的浏览器流。我可以毫无问题地将这个流写入文件。
我想通过 WebRTC 将此流传输到浏览器(基本上是为了查看浏览器实时运行的内容)。对于 webrtc,我尝试使用simple-peer,因为它已经为 Node 和浏览器端准备好了绑定。
但是,当我尝试将此流传递给 simple-peer 时,出现以下错误:
/Users/my_user/my_project/node_modules/simple-peer/index.js:286
stream.getTracks().forEach(track => {
^
TypeError: stream.getTracks is not a function
at Peer.addStream (/Users/my_user/my_project/node_modules/simple-peer/index.js:286:12)
Run Code Online (Sandbox Code Playgroud)
这是因为我拥有的 Stream 是 ReadableStream ,但 simple-peer (或大多数 webrtc 库)需要MediaStream。
如何将实时ReadableStream转换为可与 WebRTC 一起使用的MediaStream ?我找到了将MediaStreams转换为ReadableStreams的示例 ,例如这里,但反之亦然。
我在这里错过了什么吗?
我对 Nodejs 流还很陌生,所以请耐心等待。
我正在使用节点流pipeline方法来制作流的副本(特别是节点获取的图像,它是可读流)。当我使用同步管道 api时,它工作得很好(例如,我看到管道成功消息控制台日志记录),但我需要使此过程异步,因为我一次处理(制作副本)多个图像。
当我将同步方法包装在 中时util.promisify,它只是永远挂起——承诺似乎永远处于挂起状态(甚至没有抛出一个错误,我可以看到这是最令人沮丧的部分)。我在这里错过了什么吗?接受解决方案的建议,甚至深入了解如何查看问题所在,因为我什至没有收到可以尝试调试的错误消息
这是代码/我尝试过的:
import util from 'util';
import { pipeline, PassThrough } from 'stream';
// synchronous
// confirmed that this works
// the images get copied into the streams array, I get a bunch of 'Pipeline succeeded.' console logs
const streams = allKeys.map(() => pipeline(response.body, new PassThrough(),
(err) => {
if (err) {
console.error('Pipeline failed.', err);
} else {
console.log('Pipeline succeeded.');
}
}
)
);
// asynchronous
// this …Run Code Online (Sandbox Code Playgroud) 我正在将使用 AWS S3 SDK v2 的库转换为 v3。
我的库是另一个库的接口的实现。另一个库提供抽象文件存储的接口(例如,在本地文件系统或云中存储)。
目前我有这个功能(v2 SDK):
public async getFileStream(filename: string): Promise<Readable> {
return this.s3.getObject({
Bucket: this.bucket,
Key: filename,
})
.createReadStream();
}
Run Code Online (Sandbox Code Playgroud)
新的 v3 SDK 不再有createReadStream。我检查了互联网上的各个网站,他们都建议使用.Body.transformToWebStream().pipe. 我不能这样做,因为我需要返回Readable.
我尝试这样做(v3 SDK):
import { Readable } from 'node:stream';
public async getFileStream(filename: string): Promise<Readable> {
return Readable.fromWeb(
(await this.s3.getObject({
Bucket: this.bucket,
Key: filename,
})
).Body.transformToWebStream()
);
}
Run Code Online (Sandbox Code Playgroud)
我收到以下错误:
Argument of type 'ReadableStream<any>' is not assignable to parameter of type 'import("stream/web").ReadableStream<any>'.
Type 'ReadableStream<any>' is missing the …Run Code Online (Sandbox Code Playgroud) node-streams ×10
node.js ×10
amazon-s3 ×1
aws-sdk ×1
aws-sdk-js ×1
backpressure ×1
busboy ×1
es6-promise ×1
etl ×1
gulp ×1
javascript ×1
mediastream ×1
node-modules ×1
promise ×1
simple-peer ×1
stdio ×1
v8 ×1
vinyl ×1
webrtc ×1