标签: node.js-stream

如何在NodeJS中模拟流

我正在尝试对我的一个node-js模块进行单元测试,这些模块在流中进行大量处理.我正在尝试模拟一个流(我将写入),因为在我的模块中我有".on('data/end)"我想要触发的监听器.基本上我希望能够做到这样的事情:

var mockedStream = new require('stream').readable();

mockedStream.on('data', function withData('data') {
  console.dir(data);
});

mockedStream.on('end', function() { 
  console.dir('goodbye');
});

mockedStream.push('hello world');
mockedStream.close();
Run Code Online (Sandbox Code Playgroud)

这会执行,但是'on'事件在我执行推送后永远不会被触发(并且.close()无效).

我可以在流上找到的所有指导使用'fs'或'net'库作为创建新流的基础(https://github.com/substack/stream-handbook),或者他们用sinon模拟它但是嘲笑变得非常冗长非常快.

有没有一种很好的方式来提供像这样的虚拟流?

unit-testing node.js node.js-stream

9
推荐指数
4
解决办法
1万
查看次数

Node.js Streams可读取转换

我一直在尝试使用可读和转换流来处理非常大的文件.我似乎遇到的问题是,如果我不在最后放置可写流,程序似乎在返回结果之前终止.

示例: rstream.pipe(split()).pipe(tstream)

tstream的发射器在计数器达到阈值时发出.当该阈值设置为较低的数字时,我得到一个结果,但是当它达到高时,它不会返回任何结果.如果我将它传递给文件编写器,它总是返回一个结果.我错过了一些明显的东西吗

码:

// Dependencies
var fs = require('fs');
var rstream = fs.createReadStream('file');
var wstream = fs.createWriteStream('output');
var split = require('split'); // used for separating stream by new line
var QTransformStream = require('./transform');

var qtransformstream = new QTransformStream();
qtransformstream.on('completed', function(result) {
    console.log('Result: ' + result);
});
exports.getQ = function getQ(filename, callback) {

    // THIS WORKS if i have a low counter for qtransformstream, 
    // but when it's high, I do not get a result
    //   rstream.pipe(split()).pipe(qtransformstream); …
Run Code Online (Sandbox Code Playgroud)

stream node.js node.js-stream

8
推荐指数
1
解决办法
1366
查看次数

Node.js:分割n部分的流内容

我正在尝试了解节点流及其生命周期.所以,我想分割一个流的内容为n部分.下面的代码只是为了解释我的意图,并表明我已经自己尝试了一些东西.我省略了一些细节

我有一个流,它只生成一些数据(只是一系列数字):

class Stream extends Readable {
  constructor() {
    super({objectMode: true, highWaterMark: 1})
    this.counter = 0
  }

  _read(size) {
    if(this.counter === 30) {
      this.push(null)
    } else {
      this.push(this.counter)
    }
    this.counter += 1
  }
}

const stream = new Stream()
stream.pause();
Run Code Online (Sandbox Code Playgroud)

试图占用下一个块的函数:

function take(stream, count) {
  const result = []
  return new Promise(function(resolve) {
    stream.once('readable', function() {
      var chunk;
      do {
        chunk = stream.read()
        if (_.isNull(chunk) || result.length > count) {
          stream.pause()
          break
        }
        result.push(chunk)
      } while(true)
      resolve(result)
    })
  })
} …
Run Code Online (Sandbox Code Playgroud)

javascript node.js node.js-stream

8
推荐指数
2
解决办法
1491
查看次数

将文件流式传输到html视频播放器,因为它是使用fs在Electron中下载的

  • 我目前正在尝试使用HTML视频播放器从Electron中的文件系统流式传输文件.

  • 我想在文件下载时开始流式传输.

  • 我不确定我目前的计划是否有效(或者如果可行的话).

计划

  • 从文件中创建可读文件流,该文件在下载文件时进行更新
  • 从该流生成blob网址
  • 使用该blob url作为视频源

我认为目前失败的地方是我在读取第一个块后生成blob url,但之后的任何块都不包含在blob url中.

这是关于我想做的事情(我知道这段代码不起作用)

const file = GrowingFile.open(downloadPath) // provides a readable stream for a file

let chunks = [];
file.on('data', (chunk) => {
  chunks.push(chunk);
  const blob = new Blob(chunks);
  const url = URL.createObjectURL(blob);

  video.src = url // continuously update the video src with a new blob url
})
Run Code Online (Sandbox Code Playgroud)

我的主要问题是:

有没有办法在从它生成网址后推送到blob列表并继续使用相同的blob网址?

javascript video-streaming node.js node.js-stream electron

8
推荐指数
1
解决办法
404
查看次数

Node.js网络库:从'data'事件中获取完整数据

我四处寻找,要么找不到我想要回答的确切问题,要么我需要有人向我解释,就像我5岁.

基本上,我有一个使用Net库的Node.js脚本.我正在连接多个主机,发送命令和侦听返回数据.

var net = require('net');

var nodes = [
    'HOST1,192.168.179.8',
    'HOST2,192.168.179.9',
    'HOST3,192.168.179.10',
    'HOST4,192.168.179.11'
];

function connectToServer(tid, ip) {
    var conn = net.createConnection(23, ip);
    conn.on('connect', function() {
        conn.write (login_string);  // login string hidden in pretend variable
    });
    conn.on('data', function(data) {
        var read = data.toString();
        if (read.match(/Login Successful/)) {
            console.log ("Connected to " + ip);
            conn.write(command_string);
        }

        else if (read.match(/Command OK/)) { // command_string returned successful,
                                                    // read until /\r\nEND\r\n/

                    // First part of data comes in here
            console.log("Got a response …
Run Code Online (Sandbox Code Playgroud)

javascript tcp node.js node.js-stream

7
推荐指数
2
解决办法
9470
查看次数

Node.js - 如何将流转换为字符串

我有流,我需要将流内容输入字符串.我使用http.get从互联网流式传输.我也写流文件,但我不想写文件,然后打开相同的文件并从中读取...所以我需要将流转换为字符串感谢所有的建议...

string stream node.js node.js-stream

7
推荐指数
1
解决办法
1万
查看次数

故障排除错误:在nodejs stream-adventure教程中连接ECONNREFUSED

我一直在研究learnyoujs和stream-adventure教程:

https://github.com/substack/stream-adventure

https://github.com/rvagg/learnyounode#learn-you-the-nodejs-for-much-win

我已经完成了第一套,大部分时间都在第二套,但我不断得到一个奇怪的错误...通常我可以让它消失.

这是命令/错误:

DEV/javascript/streamAdventure»stream-adventure运行httpserver.js

stream.js:94
  throw er; // Unhandled stream error in pipe.
        ^
Error: connect ECONNREFUSED
  at errnoException (net.js:901:11)
  at Object.afterConnect [as oncomplete] (net.js:892:19)
Run Code Online (Sandbox Code Playgroud)

这将启动但不会终止节点的进程,所以我ps aux | grep节点然后找到进程并将其杀死.

这是教程中的"工作"代码:

var http = require('http');
var through = require('through');

var server = http.createServer(function (req, res) {
    if (req.method === 'POST') {
        req.pipe(through(function (buf) {
            this.queue(buf.toString().toUpperCase());
        })).pipe(res);
    }
    else res.end('send me a POST\n');
});
server.listen(8000);
Run Code Online (Sandbox Code Playgroud)

如果我只是运行点头httpserver.js然后卷曲它,它工作正常....所以有谁有任何洞察到导致此错误的原因?

javascript http node.js node.js-stream

7
推荐指数
1
解决办法
2838
查看次数

node.js进程的额外stdio流

node.js API文档在生成子进程时使用额外的stdio(fd = 4):

// Open an extra fd=4, to interact with programs present a
// startd-style interface.
spawn('prg', [], { stdio: ['pipe', null, null, null, 'pipe'] });
Run Code Online (Sandbox Code Playgroud)

那个stdio可以通过父进程使用ChildProcess.stdio[fd].

子进程如何访问这些额外的stdios?让我们在文件描述符3(fd = 3)上使用流而不是管道.

/* parent process */

// open file for read/write
var mStream = fs.openSync('./shared-stream', 'r+');

// spawn child process with stream object as fd=3
spawn('node', ['/path/to/child.js'], {stdio: [0, 1, 2, mStream] });
Run Code Online (Sandbox Code Playgroud)

node.js node.js-stream

7
推荐指数
1
解决办法
1443
查看次数

json的node.js流数组以进行响应

我有一个REST方法,应该返回一个JSON数组,其中包含从mongodb中读取的某些元素(使用mongoose)。

它应该非常简单(在实际情况下,find方法具有参数,但这不是问题):

OutDataModel.find().stream({transform: JSON.stringify}).pipe(res);
Run Code Online (Sandbox Code Playgroud)

这种方法的问题是我没有得到有效的JSON,结果是这样的:

{"id":"1","score":11}{"id":"2","score":12}{"id":"3","score":13}
Run Code Online (Sandbox Code Playgroud)

我期待着这样:

[{"id":"1","score":11},{"id":"2","score":12},{"id":"3","score":13}]
Run Code Online (Sandbox Code Playgroud)

我还没有找到解决方案,但是我很确定会有一个简单的解决方案。

我尝试过的

没有什么让我引以为傲的,但事情就这样了。

  1. 在流之前写入'['响应。
  2. 我提供了另一个方法,而不是JSON.stringify,它调用JSON.stringify并','在末尾添加一个
  3. 在流的“结束”事件中,我写']'了响应。

仍然无法使用此“解决方案”,因为我在末尾有这样一个逗号:

 [{"id":"1","score":11},{"id":"2","score":12},{"id":"3","score":13},]
Run Code Online (Sandbox Code Playgroud)

正如我所说,我很确定应该有一个干净的解决方案,因为这应该很普遍。

此方法将有很多并发调用,因此我不想将所有内容都读取到内存中,然后再将所有内容写入响应中。每次调用都不会返回很多记录,但是所有这些记录在一起可能会非常庞大​​。使用者是一个带有spring的Java应用程序,它使用jackson解析JSON。

请让我知道该怎么做。

回答

通过按照接受的答案中的建议创建一个Transform流,使它起作用。

我的信息流看起来像这样:

var arraystream = new stream.Transform({objectMode: true});
arraystream._hasWritten = false;


arraystream._transform = function (chunk, encoding, callback) {
    console.log('_transform:' + chunk);
    if (!this._hasWritten) {
        this._hasWritten = true;
        this.push('[' + JSON.stringify(chunk));

    } else {
        this.push(',' + JSON.stringify(chunk));
    }
    callback();
};

arraystream._flush = function (callback) {
    console.log('_flush:');
    this.push(']');
    callback();

};
Run Code Online (Sandbox Code Playgroud)

以及使用它的代码:

OutDataModel.find().stream().pipe(arraystream).pipe(res);
Run Code Online (Sandbox Code Playgroud)

谢谢。

arrays rest json node.js node.js-stream

7
推荐指数
1
解决办法
3683
查看次数

Node.js"写完后"错误

从一个基本的节点应用程序开始,我无法弄清楚如何击败这个"写后结束"错误,即使在几个站点上建议的回调.

index.js:

var server = require("./server");
var router = require("./router");
var requestHandlers = require("./requestHandlers");

var handle = {}
handle["/"] = requestHandlers.start;
handle["/start"] = requestHandlers.start;
handle["/upload"] = requestHandlers.upload;

server.start(router.route, handle);
Run Code Online (Sandbox Code Playgroud)

server.js:

var http = require("http");
var url = require("url");

function start(route, handle) {
  function onRequest(request, response) {
    var postData ="";
    var pathname = url.parse(request.url).pathname;
    console.log("Request for " + pathname + " received.");

    route(handle, pathname, response);
   }

  http.createServer(onRequest).listen(8888);
  console.log("Server has started.");
}

exports.start = start;
Run Code Online (Sandbox Code Playgroud)

router.js:

function route(handle, pathname, response) {
  console.log("About …
Run Code Online (Sandbox Code Playgroud)

javascript node.js node.js-stream

7
推荐指数
1
解决办法
1万
查看次数