Node.js:分割n部分的流内容

kha*_*iuk 8 javascript node.js node.js-stream

我正在尝试了解节点流及其生命周期.所以,我想分割一个流的内容为n部分.下面的代码只是为了解释我的意图,并表明我已经自己尝试了一些东西.我省略了一些细节

我有一个流,它只生成一些数据(只是一系列数字):

class Stream extends Readable {
  constructor() {
    super({objectMode: true, highWaterMark: 1})
    this.counter = 0
  }

  _read(size) {
    if(this.counter === 30) {
      this.push(null)
    } else {
      this.push(this.counter)
    }
    this.counter += 1
  }
}

const stream = new Stream()
stream.pause();
Run Code Online (Sandbox Code Playgroud)

试图占用下一个块的函数:

function take(stream, count) {
  const result = []
  return new Promise(function(resolve) {
    stream.once('readable', function() {
      var chunk;
      do {
        chunk = stream.read()
        if (_.isNull(chunk) || result.length > count) {
          stream.pause()
          break
        }
        result.push(chunk)
      } while(true)
      resolve(result)
    })
  })
}
Run Code Online (Sandbox Code Playgroud)

并希望像这样使用它:

take(stream, 3)
  .then(res => {
    assert.deepEqual(res, [1, 2, 3])
    return take(stream, 3)
  })
  .then(res => {
    assert.deepEqual(res, [4, 5, 6])
  })
Run Code Online (Sandbox Code Playgroud)

这样做的惯用方法是什么?

gue*_*314 7

使用ReadableStream您可以使用单个函数来检查当前数据块的元素是否等于预期结果.

创建的变量,CHUNK并且N,其中,CHUNK是元素切片或剪接从原始阵列的数量,N是通过递增变量CHUNK在每个.enqueue()内呼叫pull()呼叫.

const [data, CHUNK, result] = [[1,2,3,4,5,6], 3, []];

let N = 0;

const stream = new ReadableStream({
  pull(controller) {
    if (N < data.length)
      // slice `N, N += CHUNK` elements from `data`
      controller.enqueue(data.slice(N, N += CHUNK))
    else
      // if `N` is equal to `data.length` call `.close()` on stream
      controller.close()
  }
});

const reader = stream.getReader();

const processData = ({value, done}) => {
  // if stream is closed return `result`; `reader.closed` returns a `Promise`
  if (done) return reader.closed.then(() => result);
  if (data.slice(N - CHUNK, N).every((n, index) => n === value[index])) {
    console.log(`N: ${N}, value: [${value}]`)
    result.push(...value);
    return reader.read().then(data => processData(data))
  }
}

const readComplete = res => console.log(`result: [${res}]`);

reader.read()
.then(processData)
.then(readComplete)
.catch(err => console.log(err));
Run Code Online (Sandbox Code Playgroud)

使用链式 .then()

const [data, CHUNK, result] = [[1,2,3,4,5,6], 3, []];

let N = 0;

const stream = new ReadableStream({
  pull(controller) {
    if (N < data.length)
      // slice `N, N += CHUNK` elements from `data`
      controller.enqueue(data.slice(N, N += CHUNK))
    else
      // if `N` is equal to `data.length` call `.close()` on stream
      controller.close()
  }
});

const reader = stream.getReader();

const processData = ({value, done}) => {
  // if stream is closed return `result`; `reader.closed` returns a `Promise`
  if (done) return reader.closed.then(() => result);
  if (data.slice(N - CHUNK, N).every((n, index) => n === value[index])) {
    console.log(`N: ${N}, value: [${value}]`)
    result.push(...value);
    return reader.read().then(data => processData(data))
  }
}

const readComplete = res => console.log(`result: [${res}]`);

reader.read()
.then(({value, done}) => {
  if ([1,2,3].every((n, index) => n === value[index])) {
    console.log(`N: ${N}, value: [${value}]`)
    result.push(...value);
    return reader.read()
  }
})
.then(({value, done}) => {
  if ([4,5,6].every((n, index) => n === value[index])) {
    console.log(`N: ${N}, value: [${value}]`)
    result.push(...value);
    // return `result`; `reader.closed` returns a `Promise`
    return reader.closed.then(() => result);
  }
})
.then(readComplete)
.catch(err => console.log(err));
Run Code Online (Sandbox Code Playgroud)

另请参阅Chrome内存问题 - 文件API + AngularJS


小智 -1

我认为这可以帮助你 - https://github.com/substack/stream-handbook

这是一本非常详细的手册,其中包含适用于各种流媒体场景的示例代码,我使用它作为我自己项目的参考,到目前为止发现它很有用!/examples中也有示例代码