我正在尝试对我的一个node-js模块进行单元测试,这些模块在流中进行大量处理.我正在尝试模拟一个流(我将写入),因为在我的模块中我有".on('data/end)"我想要触发的监听器.基本上我希望能够做到这样的事情:
var mockedStream = new require('stream').readable();
mockedStream.on('data', function withData('data') {
console.dir(data);
});
mockedStream.on('end', function() {
console.dir('goodbye');
});
mockedStream.push('hello world');
mockedStream.close();
Run Code Online (Sandbox Code Playgroud)
这会执行,但是'on'事件在我执行推送后永远不会被触发(并且.close()无效).
我可以在流上找到的所有指导使用'fs'或'net'库作为创建新流的基础(https://github.com/substack/stream-handbook),或者他们用sinon模拟它但是嘲笑变得非常冗长非常快.
有没有一种很好的方式来提供像这样的虚拟流?
我一直在尝试使用可读和转换流来处理非常大的文件.我似乎遇到的问题是,如果我不在最后放置可写流,程序似乎在返回结果之前终止.
示例: rstream.pipe(split()).pipe(tstream)
我tstream的发射器在计数器达到阈值时发出.当该阈值设置为较低的数字时,我得到一个结果,但是当它达到高时,它不会返回任何结果.如果我将它传递给文件编写器,它总是返回一个结果.我错过了一些明显的东西吗
码:
// Dependencies
var fs = require('fs');
var rstream = fs.createReadStream('file');
var wstream = fs.createWriteStream('output');
var split = require('split'); // used for separating stream by new line
var QTransformStream = require('./transform');
var qtransformstream = new QTransformStream();
qtransformstream.on('completed', function(result) {
console.log('Result: ' + result);
});
exports.getQ = function getQ(filename, callback) {
// THIS WORKS if i have a low counter for qtransformstream,
// but when it's high, I do not get a result
// rstream.pipe(split()).pipe(qtransformstream); …Run Code Online (Sandbox Code Playgroud) 我正在尝试了解节点流及其生命周期.所以,我想分割一个流的内容为n部分.下面的代码只是为了解释我的意图,并表明我已经自己尝试了一些东西.我省略了一些细节
我有一个流,它只生成一些数据(只是一系列数字):
class Stream extends Readable {
constructor() {
super({objectMode: true, highWaterMark: 1})
this.counter = 0
}
_read(size) {
if(this.counter === 30) {
this.push(null)
} else {
this.push(this.counter)
}
this.counter += 1
}
}
const stream = new Stream()
stream.pause();
Run Code Online (Sandbox Code Playgroud)
试图占用下一个块的函数:
function take(stream, count) {
const result = []
return new Promise(function(resolve) {
stream.once('readable', function() {
var chunk;
do {
chunk = stream.read()
if (_.isNull(chunk) || result.length > count) {
stream.pause()
break
}
result.push(chunk)
} while(true)
resolve(result)
})
})
} …Run Code Online (Sandbox Code Playgroud) 我目前正在尝试使用HTML视频播放器从Electron中的文件系统流式传输文件.
我想在文件下载时开始流式传输.
我不确定我目前的计划是否有效(或者如果可行的话).
我认为目前失败的地方是我在读取第一个块后生成blob url,但之后的任何块都不包含在blob url中.
这是关于我想做的事情(我知道这段代码不起作用)
const file = GrowingFile.open(downloadPath) // provides a readable stream for a file
let chunks = [];
file.on('data', (chunk) => {
chunks.push(chunk);
const blob = new Blob(chunks);
const url = URL.createObjectURL(blob);
video.src = url // continuously update the video src with a new blob url
})
Run Code Online (Sandbox Code Playgroud)
有没有办法在从它生成网址后推送到blob列表并继续使用相同的blob网址?
我四处寻找,要么找不到我想要回答的确切问题,要么我需要有人向我解释,就像我5岁.
基本上,我有一个使用Net库的Node.js脚本.我正在连接多个主机,发送命令和侦听返回数据.
var net = require('net');
var nodes = [
'HOST1,192.168.179.8',
'HOST2,192.168.179.9',
'HOST3,192.168.179.10',
'HOST4,192.168.179.11'
];
function connectToServer(tid, ip) {
var conn = net.createConnection(23, ip);
conn.on('connect', function() {
conn.write (login_string); // login string hidden in pretend variable
});
conn.on('data', function(data) {
var read = data.toString();
if (read.match(/Login Successful/)) {
console.log ("Connected to " + ip);
conn.write(command_string);
}
else if (read.match(/Command OK/)) { // command_string returned successful,
// read until /\r\nEND\r\n/
// First part of data comes in here
console.log("Got a response …Run Code Online (Sandbox Code Playgroud) 我有流,我需要将流内容输入字符串.我使用http.get从互联网流式传输.我也写流文件,但我不想写文件,然后打开相同的文件并从中读取...所以我需要将流转换为字符串感谢所有的建议...
我一直在研究learnyoujs和stream-adventure教程:
https://github.com/substack/stream-adventure
https://github.com/rvagg/learnyounode#learn-you-the-nodejs-for-much-win
我已经完成了第一套,大部分时间都在第二套,但我不断得到一个奇怪的错误...通常我可以让它消失.
这是命令/错误:
DEV/javascript/streamAdventure»stream-adventure运行httpserver.js
stream.js:94
throw er; // Unhandled stream error in pipe.
^
Error: connect ECONNREFUSED
at errnoException (net.js:901:11)
at Object.afterConnect [as oncomplete] (net.js:892:19)
Run Code Online (Sandbox Code Playgroud)
这将启动但不会终止节点的进程,所以我ps aux | grep节点然后找到进程并将其杀死.
这是教程中的"工作"代码:
var http = require('http');
var through = require('through');
var server = http.createServer(function (req, res) {
if (req.method === 'POST') {
req.pipe(through(function (buf) {
this.queue(buf.toString().toUpperCase());
})).pipe(res);
}
else res.end('send me a POST\n');
});
server.listen(8000);
Run Code Online (Sandbox Code Playgroud)
如果我只是运行点头httpserver.js然后卷曲它,它工作正常....所以有谁有任何洞察到导致此错误的原因?
node.js API文档在生成子进程时使用额外的stdio(fd = 4):
// Open an extra fd=4, to interact with programs present a
// startd-style interface.
spawn('prg', [], { stdio: ['pipe', null, null, null, 'pipe'] });
Run Code Online (Sandbox Code Playgroud)
那个stdio可以通过父进程使用ChildProcess.stdio[fd].
子进程如何访问这些额外的stdios?让我们在文件描述符3(fd = 3)上使用流而不是管道.
/* parent process */
// open file for read/write
var mStream = fs.openSync('./shared-stream', 'r+');
// spawn child process with stream object as fd=3
spawn('node', ['/path/to/child.js'], {stdio: [0, 1, 2, mStream] });
Run Code Online (Sandbox Code Playgroud) 我有一个REST方法,应该返回一个JSON数组,其中包含从mongodb中读取的某些元素(使用mongoose)。
它应该非常简单(在实际情况下,find方法具有参数,但这不是问题):
OutDataModel.find().stream({transform: JSON.stringify}).pipe(res);
Run Code Online (Sandbox Code Playgroud)
这种方法的问题是我没有得到有效的JSON,结果是这样的:
{"id":"1","score":11}{"id":"2","score":12}{"id":"3","score":13}
Run Code Online (Sandbox Code Playgroud)
我期待着这样:
[{"id":"1","score":11},{"id":"2","score":12},{"id":"3","score":13}]
Run Code Online (Sandbox Code Playgroud)
我还没有找到解决方案,但是我很确定会有一个简单的解决方案。
我尝试过的
没有什么让我引以为傲的,但事情就这样了。
'['响应。','在末尾添加一个']'了响应。仍然无法使用此“解决方案”,因为我在末尾有这样一个逗号:
[{"id":"1","score":11},{"id":"2","score":12},{"id":"3","score":13},]
Run Code Online (Sandbox Code Playgroud)
正如我所说,我很确定应该有一个干净的解决方案,因为这应该很普遍。
此方法将有很多并发调用,因此我不想将所有内容都读取到内存中,然后再将所有内容写入响应中。每次调用都不会返回很多记录,但是所有这些记录在一起可能会非常庞大。使用者是一个带有spring的Java应用程序,它使用jackson解析JSON。
请让我知道该怎么做。
回答
通过按照接受的答案中的建议创建一个Transform流,使它起作用。
我的信息流看起来像这样:
var arraystream = new stream.Transform({objectMode: true});
arraystream._hasWritten = false;
arraystream._transform = function (chunk, encoding, callback) {
console.log('_transform:' + chunk);
if (!this._hasWritten) {
this._hasWritten = true;
this.push('[' + JSON.stringify(chunk));
} else {
this.push(',' + JSON.stringify(chunk));
}
callback();
};
arraystream._flush = function (callback) {
console.log('_flush:');
this.push(']');
callback();
};
Run Code Online (Sandbox Code Playgroud)
以及使用它的代码:
OutDataModel.find().stream().pipe(arraystream).pipe(res);
Run Code Online (Sandbox Code Playgroud)
谢谢。
从一个基本的节点应用程序开始,我无法弄清楚如何击败这个"写后结束"错误,即使在几个站点上建议的回调.
index.js:
var server = require("./server");
var router = require("./router");
var requestHandlers = require("./requestHandlers");
var handle = {}
handle["/"] = requestHandlers.start;
handle["/start"] = requestHandlers.start;
handle["/upload"] = requestHandlers.upload;
server.start(router.route, handle);
Run Code Online (Sandbox Code Playgroud)
server.js:
var http = require("http");
var url = require("url");
function start(route, handle) {
function onRequest(request, response) {
var postData ="";
var pathname = url.parse(request.url).pathname;
console.log("Request for " + pathname + " received.");
route(handle, pathname, response);
}
http.createServer(onRequest).listen(8888);
console.log("Server has started.");
}
exports.start = start;
Run Code Online (Sandbox Code Playgroud)
router.js:
function route(handle, pathname, response) {
console.log("About …Run Code Online (Sandbox Code Playgroud) node.js ×10
node.js-stream ×10
javascript ×5
stream ×2
arrays ×1
electron ×1
http ×1
json ×1
rest ×1
string ×1
tcp ×1
unit-testing ×1