kha*_*iuk 8 javascript node.js node.js-stream
我正在尝试了解节点流及其生命周期.所以,我想分割一个流的内容为n部分.下面的代码只是为了解释我的意图,并表明我已经自己尝试了一些东西.我省略了一些细节
我有一个流,它只生成一些数据(只是一系列数字):
class Stream extends Readable {
constructor() {
super({objectMode: true, highWaterMark: 1})
this.counter = 0
}
_read(size) {
if(this.counter === 30) {
this.push(null)
} else {
this.push(this.counter)
}
this.counter += 1
}
}
const stream = new Stream()
stream.pause();
Run Code Online (Sandbox Code Playgroud)
试图占用下一个块的函数:
function take(stream, count) {
const result = []
return new Promise(function(resolve) {
stream.once('readable', function() {
var chunk;
do {
chunk = stream.read()
if (_.isNull(chunk) || result.length > count) {
stream.pause()
break
}
result.push(chunk)
} while(true)
resolve(result)
})
})
}
Run Code Online (Sandbox Code Playgroud)
并希望像这样使用它:
take(stream, 3)
.then(res => {
assert.deepEqual(res, [1, 2, 3])
return take(stream, 3)
})
.then(res => {
assert.deepEqual(res, [4, 5, 6])
})
Run Code Online (Sandbox Code Playgroud)
这样做的惯用方法是什么?
使用ReadableStream您可以使用单个函数来检查当前数据块的元素是否等于预期结果.
创建的变量,CHUNK并且N,其中,CHUNK是元素切片或剪接从原始阵列的数量,N是通过递增变量CHUNK在每个.enqueue()内呼叫pull()呼叫.
const [data, CHUNK, result] = [[1,2,3,4,5,6], 3, []];
let N = 0;
const stream = new ReadableStream({
pull(controller) {
if (N < data.length)
// slice `N, N += CHUNK` elements from `data`
controller.enqueue(data.slice(N, N += CHUNK))
else
// if `N` is equal to `data.length` call `.close()` on stream
controller.close()
}
});
const reader = stream.getReader();
const processData = ({value, done}) => {
// if stream is closed return `result`; `reader.closed` returns a `Promise`
if (done) return reader.closed.then(() => result);
if (data.slice(N - CHUNK, N).every((n, index) => n === value[index])) {
console.log(`N: ${N}, value: [${value}]`)
result.push(...value);
return reader.read().then(data => processData(data))
}
}
const readComplete = res => console.log(`result: [${res}]`);
reader.read()
.then(processData)
.then(readComplete)
.catch(err => console.log(err));Run Code Online (Sandbox Code Playgroud)
使用链式 .then()
const [data, CHUNK, result] = [[1,2,3,4,5,6], 3, []];
let N = 0;
const stream = new ReadableStream({
pull(controller) {
if (N < data.length)
// slice `N, N += CHUNK` elements from `data`
controller.enqueue(data.slice(N, N += CHUNK))
else
// if `N` is equal to `data.length` call `.close()` on stream
controller.close()
}
});
const reader = stream.getReader();
const processData = ({value, done}) => {
// if stream is closed return `result`; `reader.closed` returns a `Promise`
if (done) return reader.closed.then(() => result);
if (data.slice(N - CHUNK, N).every((n, index) => n === value[index])) {
console.log(`N: ${N}, value: [${value}]`)
result.push(...value);
return reader.read().then(data => processData(data))
}
}
const readComplete = res => console.log(`result: [${res}]`);
reader.read()
.then(({value, done}) => {
if ([1,2,3].every((n, index) => n === value[index])) {
console.log(`N: ${N}, value: [${value}]`)
result.push(...value);
return reader.read()
}
})
.then(({value, done}) => {
if ([4,5,6].every((n, index) => n === value[index])) {
console.log(`N: ${N}, value: [${value}]`)
result.push(...value);
// return `result`; `reader.closed` returns a `Promise`
return reader.closed.then(() => result);
}
})
.then(readComplete)
.catch(err => console.log(err));Run Code Online (Sandbox Code Playgroud)
另请参阅Chrome内存问题 - 文件API + AngularJS
小智 -1
我认为这可以帮助你 - https://github.com/substack/stream-handbook
这是一本非常详细的手册,其中包含适用于各种流媒体场景的示例代码,我使用它作为我自己项目的参考,到目前为止发现它很有用!/examples中也有示例代码