var fs = require('fs');
var file = fs.createReadStream('./zeros.txt');
var dataSize = 0;
file.on('readable', function () {
var data = file.read(10);
console.log('readable size = ', data.length);
console.log(data.toString());
});
Run Code Online (Sandbox Code Playgroud)
文件"zeros.txt"包含700个字符"0"
据我所知,在调用read(10)之后,stream必须停止并等待新的read()调用.但是,调用的结果是:
readable size = 10
0000000000
readable size = 10
0000000000
Run Code Online (Sandbox Code Playgroud) 我正在尝试提取.tar文件(从目录打包),然后检查解压缩目录中的文件名称.我正在使用tar-fs来提取tar文件,然后使用fs.createReadStream来操作数据.这是我到目前为止所得到的:
fs.createReadStream(req.files.file.path)
.pipe(tar.extract(req.files.file.path + '0'))
.on('error', function() {
errorMessage = 'Failed to extract file. Please make sure to upload a tar file.';
})
.on('entry', function(header, stream, callback) {
console.error(header);
stream.on('end', function() {
console.error("this is working");
});
})
.on('end', function() {
//the one did not get called
console.error('end');
})
;
Run Code Online (Sandbox Code Playgroud)
我希望提取整个文件夹,然后检查文件名.好吧,我还没到那么远......
根据我的理解,我在管道后面有一个可读的流.可读流有结束事件吗?我的问题是,为什么end代码中的事件没有被调用?
谢谢!
我有一个子节点进程,每24小时在CRON上运行一次.当进程开始时,它会读取一些排队的数据并将数据推送到某些转换流中.然后,该流充当反向多路复用器,并将流分成多个流,最终解析但它们都是异步的.
所有这些流完成后,我需要终止我创建的子进程.我的问题是,你怎么知道所有的流都完成了?
我曾尝试使用EventEmitter的'finish'事件,但是当第一个反向多路复用流完成时(好像是满口:) :)这似乎被捕获了.
基于承诺的方法.所以这种方法有效,但我认为有一种更简单的方法来做这样的事情.基本上,这导致为每个反向多路复用管道创建承诺,并且当每个管道完成时我们解决该承诺.然后,当所有承诺都已解决时,会触发一个事件,我们会在其他地方捕获该事件以终止该过程.
我正在考虑在nodejs中构建一个应用程序,它需要流式传输包含整数数组的大(> GB)文件.最重要的是,阵列需要以最佳方式进行序列化,因此不是基于ascii,理想情况下使用8位用于较小的整数(这将是绝大多数数据),但仍然能够代表更大的数字.
这个问题可能不仅仅是关于nodejs,而是如何在nodejs中解决这个问题?是否有现成的流式文件解决方案,其中包含来自磁盘的自定义字节编码?或者更好,整数数组?
理想情况下,即使使用ssd,流的每个部分的解码也应该是磁盘绑定而不是cpu绑定.
是否可以使用 Node.js 流构建 zip 存档并在创建该存档时通过对 HTTP GET 请求的响应将该 zip 存档提供给客户端/用户?我正在寻找一种解决方案,最好避免将整个 zip 缓冲到服务器上的内存中。
假设我创建了一个名为的转换流Parser,它可以像普通流一样写入,但作为对象流读取。我正在将readable事件用于使用此转换流的代码:
var parser = new Parser();
parser.on('readable', function () {
var data = parser.read();
console.log(data);
});
Run Code Online (Sandbox Code Playgroud)
在这个事件处理程序中,我必须重复调用parser.read()吗?或者,是否会readable针对从我的转换流中推送的每个对象自行触发?
我有我通过侦听处理流data,error和end事件,我调用一个函数来处理每一个data第一流中的事件.当然,处理数据的函数会调用其他回调,使其异步.那么当处理流中的数据时,如何开始执行更多代码?end在流中侦听事件并不意味着异步data处理功能已完成.
当我执行下一个语句时,如何确保完成流数据处理功能?
这是一个例子:
function updateAccountStream (accountStream, callThisOnlyAfterAllAccountsAreMigrated) {
var self = this;
var promises = [];
accountStream
.on('data', function (account) {
migrateAccount.bind(self)(account, finishMigration);
})
.on('error', function (err) {
return console.log(err);
})
.on('end', function () {
console.log("Finished updating account stream (but finishMigration is still running!!!)");
callThisOnlyAfterAllAccountsAreMigrated() // finishMigration is still running!
});
}
var migrateAccount = function (oldAccount, callback) {
executeSomeAction(oldAccount, function(err, newAccount) {
if (err) return console.log("error received:", …Run Code Online (Sandbox Code Playgroud) 我有一个可能已经或可能尚未结束的流。如果它已经结束,我不想永远坐在那里等待end事件。我如何检查?
我很好奇我的 PassThrough 流以及为什么在我通过管道将其关闭的资源后它没有关闭。我将它用作中介,一个资源需要一个 ReadableStream,我需要向用户传递一个 WriteableStream 以允许他们编写底层资源。起初,双工流看起来很理想,但需要一些实现,然后我找到了 PassThrough 流。
编辑:这里问题的最佳描述:https : //gist.github.com/four43/46fd38fd0c929b14deb6f1744b63026a
原始示例: 看看这个:
const fs = require('fs');
const stream = require('stream');
const passThrough = new stream.PassThrough({allowHalfOpen: false});
const writeStream = new fs.createWriteStream('/tmp/output.txt');
passThrough.pipe(writeStream)
.on('end', () => console.log('full-end'))
.on('close', () => console.log('full-close'))
.on('unpipe', () => console.log('full-unpipe'))
.on('finish', () => console.log('full-finish'));
passThrough
.on('end', () => console.log('passThrough-end'))
.on('close', () => console.log('passThrough-close'))
.on('unpipe', () => console.log('passThrough-unpipe'))
.on('finish', () => console.log('passThrough-finish'));
passThrough.end('hello world');
Run Code Online (Sandbox Code Playgroud)
实际输出:
passThrough-finish
passThrough-end
full-unpipe
full-finish
full-close
Run Code Online (Sandbox Code Playgroud)
似乎写端完成了它的工作,但 PassThrough 流的“读取”端不会传播关闭,即使“allowHalfOpen”选项被传递为 false(我可以验证调试器中的选项)。 …
我需要将可读流“暂停”一定秒数,然后再次恢复。可读流正在通过管道传输到转换流,所以我不能使用常规pause和resume方法,我不得不使用unpipe和pipe。在转换流中,我能够检测到pipe事件,然后unpipe在可读流上执行,然后在几秒钟后pipe再次执行以恢复它(我希望)。
这是代码:
import {Transform, Readable} from 'stream';
const alphaTransform = new class extends Transform {
constructor() {
super({
objectMode: true,
transform: (chunk: string | Buffer, encoding: string, callback: Function) => {
let transformed: IterableIterator<string>;
if (Buffer.isBuffer(chunk)) {
transformed = function* () {
for (const val of chunk) {
yield String.fromCharCode(val);
}
}();
} else {
transformed = chunk[Symbol.iterator]();
}
callback(null,
Array.from(transformed).map(s => s.toUpperCase()).join(''));
}
}); …Run Code Online (Sandbox Code Playgroud) node.js ×10
node.js-stream ×10
asynchronous ×2
javascript ×2
arrays ×1
ecmascript-6 ×1
promise ×1
stream ×1
tar ×1
typescript ×1
zip ×1