标签: node-streams

Node.js将文本作为"spawnSync"的标准输入传递

我认为这很简单,但以下内容不能按预期工作.

我想wc从Node 管道数据到一个进程,比如说(只是一个用于说明的任意命令).

文档和其他SO问题似乎表明传递流应该工作:

const {spawnSync} = require('child_process')
const {Readable} = require('stream')

const textStream = new Readable()
textStream.push("one two three")
textStream.push(null)

const stdio = [textStream, process.stdout, process.stderr]
spawnSync('wc', ["-c"], { stdio })
Run Code Online (Sandbox Code Playgroud)

不幸的是,这会引发错误:

值"可读{...}对选项"stdio"无效

代码相关的位由internal/child_process.js不立即揭示了什么预期的有效选项.

node.js node-streams

7
推荐指数
1
解决办法
800
查看次数

错误 [ERR_STREAM_PREMATURE_CLOSE]:节点管道流中的过早关闭

我正在使用stream.pipelineNode的功能将一些数据上传到 S3。我正在实施的基本思想是从请求中提取文件并将它们写入 S3。我有一个pipeline可以提取 zip 文件并将它们成功写入 S3 的文件。但是,我希望我的第二个pipeline提出相同的请求,但解压缩并将解压缩的文件写入 S3。管道代码如下所示:

pipeline(request.get(...), s3Stream(zipFileWritePath)),
pipeline(request.get(...), new unzipper.Parse(), etl.map(entry => entry.pipe(s3Stream(createWritePath(writePath, entry)))))
Run Code Online (Sandbox Code Playgroud)

s3Stream 函数如下所示:

function s3Stream(file) {
    const pass = new stream.PassThrough()
    s3Store.upload(file, pass)
    return pass
}
Run Code Online (Sandbox Code Playgroud)

第一个pipeline运行良好,目前在生产中运行良好。但是,在添加第二个管道时,出现以下错误:

Error [ERR_STREAM_PREMATURE_CLOSE]: Premature close
at Parse.onclose (internal/streams/end-of-stream.js:56:36)
at Parse.emit (events.js:187:15)
at Parse.EventEmitter.emit (domain.js:442:20)
at Parse.<anonymous> (/node_modules/unzipper/lib/parse.js:28:10)
at Parse.emit (events.js:187:15)
at Parse.EventEmitter.emit (domain.js:442:20)
at finishMaybe (_stream_writable.js:641:14)
at afterWrite (_stream_writable.js:481:3)
at onwrite (_stream_writable.js:471:7)
at /node_modules/unzipper/lib/PullStream.js:70:11
at afterWrite (_stream_writable.js:480:3)
at process._tickCallback (internal/process/next_tick.js:63:19) …
Run Code Online (Sandbox Code Playgroud)

etl amazon-s3 node.js node-modules node-streams

7
推荐指数
1
解决办法
2923
查看次数

如何使用对象列表作为gulp源流

我知道gulp需要一个乙烯基源流才能正常工作,但是有一种简单的方法可以使用已经存在的乙烯基文件或json对象而不是众所周知的仅采用globs的gulp.src吗?

javascript node.js gulp vinyl node-streams

6
推荐指数
1
解决办法
545
查看次数

转换流以将字符串添加到每一行

我产生了一个像这样的子进程:

const n = cp.spawn('bash');

n.stdout.pipe(process.stdout);
n.stderr.pipe(process.stderr);
Run Code Online (Sandbox Code Playgroud)

我正在寻找一个转换流,以便我可以在孩子的每一行的开头添加类似'[child process]'的东西,所以我知道stdio来自孩子而不是父进程.

所以它看起来像:

const getTransformPrepender = function() : Transform {
   return ...
}

n.stdout.pipe(getTransformPrepender('[child]')).pipe(process.stdout);
n.stderr.pipe(getTransformPrepender('[child]')).pipe(process.stderr);
Run Code Online (Sandbox Code Playgroud)

有没有人知道是否有这样的现有转换包或如何写一个?

我有这个:

import * as stream from 'stream';


export default function(pre: string){

  let saved = '';

  return new stream.Transform({

    transform(chunk, encoding, cb) {

      cb(null, String(pre) + String(chunk));
    },

    flush(cb) {
      this.push(saved);
      cb();
    }

  });

}
Run Code Online (Sandbox Code Playgroud)

但我担心它在边缘情况下不起作用 - 其中一个块突发可能不包含整行(对于很长的行).

看起来这里的答案就在这里:https://strongloop.com/strongblog/practical-examples-of-the-new-node-js-streams-api/

但是这个附录:https: //twitter.com/the1mills/status/886340747275812865

stdio node.js node-streams

6
推荐指数
2
解决办法
1021
查看次数

为什么尝试写大型文件导致js堆耗尽内存

这段代码

const file = require("fs").createWriteStream("./test.dat");
for(var i = 0; i < 1e7; i++){

    file.write("a");
}
Run Code Online (Sandbox Code Playgroud)

运行约30秒后出现此错误消息

<--- Last few GCs --->

[47234:0x103001400]    27539 ms: Mark-sweep 1406.1 (1458.4) -> 1406.1 (1458.4) MB, 2641.4 / 0.0 ms  allocation failure GC in old space requested
[47234:0x103001400]    29526 ms: Mark-sweep 1406.1 (1458.4) -> 1406.1 (1438.9) MB, 1986.8 / 0.0 ms  last resort GC in old spacerequested
[47234:0x103001400]    32154 ms: Mark-sweep 1406.1 (1438.9) -> 1406.1 (1438.9) MB, 2628.3 / 0.0 ms  last resort GC in …
Run Code Online (Sandbox Code Playgroud)

v8 node.js node-streams

6
推荐指数
1
解决办法
1892
查看次数

努力理解可读流上的 highWaterMark

我发现 Busboy 和节点流 highWaterMark 属性存在一些我不理解的行为。我预计块的大小最大为 highWaterMark 值,但块大小看起来不受 highWaterMark 设置的影响。

我已经fileHwm: 5在 Busboy 中设置了选项,并且我的快速路线设置如下

app.post('/upload', function(req, res, next) {
  req.pipe(req.busboy); // Pipe it through busboy
  req.busboy.on('file', (fieldname, file, filename) => {
      console.log(`Upload of '${filename}' started`);
      console.log(file);
      file.on('readable', () => {
        let chunk;
        while (null !== (chunk = file.read())) {
          console.log(`Received ${chunk.length} bytes of data.`);
        }
      });

  });
});
Run Code Online (Sandbox Code Playgroud)

当我登录时file,看起来不错,并且我看到该highWaterMark属性已设置得很好

FileStream {
  _readableState:
   ReadableState {
     objectMode: false,
     highWaterMark: 5,
...
Run Code Online (Sandbox Code Playgroud)

但我得到的块的大小并不5像我预期的那样 - 相反我看到

Received …
Run Code Online (Sandbox Code Playgroud)

node.js busboy node-streams

6
推荐指数
1
解决办法
1865
查看次数

可以让事件处理程序等到基于异步/Promise 的代码完成吗?

我在 Nodejs 模式下使用优秀的 Papa Parse 库,将超过 100 万行的大型(500 MB)CSV 文件流式传输到缓慢的持久性 API,该 API 一次只能接受一个请求。持久性 API 基于Promises,但从 Papa Parse,我在同步事件中收到每个解析的 CSV 行,如下所示:parseStream.on("data", row => { ... }

我面临的挑战是 Papa Parse 从流中转储 CSV 行的速度太快,以至于我缓慢的持久性 API 无法跟上。因为 Papa 是同步的,而我的 API 是基于Promise的,所以我不能只调用事件处理await doDirtyWork(row)程序on,因为同步和异步代码不会混合。

或者它们可以混合,但我只是不知道如何混合?

我的问题是,我可以让 Papa 的事件处理程序等待我的 API 调用完成吗?是不是直接在on("data")事件中执行持久性 API 请求,让on()函数以某种方式徘徊直到脏 API 工作完成?

就内存占用而言,我到目前为止的解决方案并不比使用 Papa 的非流模式好多少。实际上,我需要以生成器函数迭代的形式对大量事件进行排队。on("data")我还可以将 Promise 工厂排列在一个数组中,并在循环中进行处理。无论哪种方式,我最终都会将几乎整个 CSV 文件作为未来 Promise(承诺工厂)的巨大集合保存在内存中,直到我缓慢的 API 调用完全完成。

async importCSV(filePath) { …
Run Code Online (Sandbox Code Playgroud)

node.js promise backpressure es6-promise node-streams

6
推荐指数
1
解决办法
1612
查看次数

从 ReadableStream 创建 MediaStream

我正在使用puppeteer-stream来获取在服务器上运行的由 Node 控制的浏览器流。我可以毫无问题地将这个流写入文件。

我想通过 WebRTC 将此流传输到浏览器(基本上是为了查看浏览器实时运行的内容)。对于 webrtc,我尝试使用simple-peer,因为它已经为 Node 和浏览器端准备好了绑定。

但是,当我尝试将此流传递给 simple-peer 时,出现以下错误:

/Users/my_user/my_project/node_modules/simple-peer/index.js:286
    stream.getTracks().forEach(track => {
           ^

TypeError: stream.getTracks is not a function
    at Peer.addStream (/Users/my_user/my_project/node_modules/simple-peer/index.js:286:12)
Run Code Online (Sandbox Code Playgroud)

这是因为我拥有的 Stream 是 ReadableStream 但 simple-peer (或大多数 webrtc 库)需要MediaStream

如何将实时ReadableStream转换为可与 WebRTC 一起使用的MediaStream ?我找到了将MediaStreams转换为ReadableStreams的示例 ,例如这里,但反之亦然。

我在这里错过了什么吗?

node.js webrtc mediastream node-streams simple-peer

6
推荐指数
0
解决办法
1052
查看次数

Nodejs 管道异步流式传输

我对 Nodejs 流还很陌生,所以请耐心等待。

我正在使用节点流pipeline方法来制作流的副本(特别是节点获取的图像,它是可读流)。当我使用同步管道 api时,它工作得很好(例如,我看到管道成功消息控制台日志记录),但我需要使此过程异步,因为我一次处理(制作副本)多个图像。

当我将同步方法包装在 中时util.promisify,它只是永远挂起——承诺似乎永远处于挂起状态(甚至没有抛出一个错误,我可以看到这是最令人沮丧的部分)。我在这里错过了什么吗?接受解决方案的建议,甚至深入了解如何查看问题所在,因为我什至没有收到可以尝试调试的错误消息

这是代码/我尝试过的:

import util from 'util';
import { pipeline, PassThrough } from 'stream';

// synchronous 
// confirmed that this works
// the images get copied into the streams array, I get a bunch of 'Pipeline succeeded.' console logs
const streams = allKeys.map(() => pipeline(response.body, new PassThrough(), 
  (err) => {
    if (err) {
       console.error('Pipeline failed.', err);
     } else {
       console.log('Pipeline succeeded.');
      }
    }
  )
);

// asynchronous
// this …
Run Code Online (Sandbox Code Playgroud)

node.js node-streams

6
推荐指数
1
解决办法
708
查看次数

从 AWS S3 SDK v3 中的 getObject 获取可读对象

我正在将使用 AWS S3 SDK v2 的库转换为 v3。

我的库是另一个库的接口的实现。另一个库提供抽象文件存储的接口(例如,在本地文件系统或云中存储)。

目前我有这个功能(v2 SDK):

public async getFileStream(filename: string): Promise<Readable> {
  return this.s3.getObject({
                Bucket: this.bucket,
                Key: filename,
            })
            .createReadStream();
}
Run Code Online (Sandbox Code Playgroud)

新的 v3 SDK 不再有createReadStream。我检查了互联网上的各个网站,他们都建议使用.Body.transformToWebStream().pipe. 我不能这样做,因为我需要返回Readable.

我尝试这样做(v3 SDK):

import { Readable } from 'node:stream';

public async getFileStream(filename: string): Promise<Readable> {
   return Readable.fromWeb(
       (await this.s3.getObject({
                    Bucket: this.bucket,
                    Key: filename,
              })
        ).Body.transformToWebStream()
   );
}
Run Code Online (Sandbox Code Playgroud)

我收到以下错误:

Argument of type 'ReadableStream<any>' is not assignable to parameter of type 'import("stream/web").ReadableStream<any>'.
  Type 'ReadableStream<any>' is missing the …
Run Code Online (Sandbox Code Playgroud)

node.js aws-sdk node-streams aws-sdk-js

6
推荐指数
1
解决办法
3526
查看次数