Javascript(ES6)可迭代流

Jac*_*cob 9 javascript iterator stream ecmascript-6

是否有使用ES6生成器使流可迭代的模式?

请参阅下面的"MakeStreamIterable".

import {createReadStream} from 'fs'

let fileName = 'largeFile.txt'
let readStream = createReadStream(fileName, {
  encoding: 'utf8',
  bufferSize: 1024
})
let myIterableAsyncStream = MakeStreamIterable(readStream)

for (let data of myIterableAsyncStream) {
  let str = data.toString('utf8')
  console.log(str)
}
Run Code Online (Sandbox Code Playgroud)

我对co或bluebird的协程deasync阻塞不感兴趣.

黄金是MakeStreamIterable应该是一个有效的函数.

sdg*_*uck 8

是否有使用ES6生成器使流可迭代的模式?

不,这是无法实现的,因为发电机是同步的.他们必须知道他们正在屈服什么,什么时候.目前只能通过使用某种基于回调的实现来实现异步数据源的迭代.因此,MakeStreamIterable如果你的意思是'一个有效的函数,其结果可以被赋予一个for-of循环' ,就没有办法制作'一个有效的函数'.

流是异步的

流表示在可能无限长的时间内异步接收的潜在无限量数据.如果我们看一下MDN上迭代器定义,我们可以更详细地定义它是什么使它"无法"的流:

当一个对象知道如何一次访问一个集合中的项目,同时跟踪其在该序列中的当前位置时,它就是一个迭代器.在JavaScript中,迭代器是一个提供next()方法的对象,该方法返回序列中的下一个项目.此方法返回具有两个属性的对象:donevalue.

(重点是我自己的.)

让我们从这个定义中挑选出一个iterable的属性.对象必须......

  1. 知道如何一次访问一个集合中的项目;
  2. 能够跟踪其在数据序列中的当前位置;
  3. 并提供一个方法,该方法next检索具有保存value序列中的下一个或通知迭代的属性的对象done.

流不符合上述任何标准,因为......

  1. 它无法控制何时接收数据并且无法"展望未来"以找到下一个值;
  2. 只有当流已经关闭时,它才能知道何时或是否已收到所有数据;
  3. 并且它没有实现可迭代协议,因此不公开可以使用的next方法for-of.

______

假装(eration)

我们实际上无法迭代从流中接收的数据(绝对不使用a for-of),但是我们可以构建一个假装使用Promises(yay!)并在闭包内抽象出流的事件处理程序的接口.

// MakeStreamIterable.js
export default function MakeStreamIterable (stream) {
  let collection = []
  let index = 0
  let callback
  let resolve, reject

  stream
    .on('error', err => reject && reject(err))
    .on('end', () => resolve && resolve(collection))
    .on('data', data => {
      collection.push(data)

      try {
        callback && callback(data, index++)
      } catch (err) {
        this.end()
        reject(err)
      }
    })

  function each (cb) {
    if(callback) {
      return promise
    }

    callback = (typeof cb === 'function') ? cb : null

    if (callback && !!collection) {
        collection.forEach(callback)
        index = collection.length
    }

    return promise
  }

  promise = new Promise((res, rej) => {
    resolve = res
    reject = rej
  })

  promise.each = each

  return promise
}
Run Code Online (Sandbox Code Playgroud)

我们可以像这样使用它:

import {myIterableAsyncStream} from './MakeStreamIterable'

let myIterableAsyncStream = MakeStreamIterable(readStream)

myIterableAsyncStream
  .each((data, i) => {
    let str = data.toString('utf8')
    console.log(i, str)
  })
  .then(() => console.log('completed'))
  .catch(err => console.log(err))
Run Code Online (Sandbox Code Playgroud)

有关此实施的注意事项:

  • 没有必要each立即调用'可迭代流'.
  • each被调用时,所有值接收的传递给回调一个接一个前其呼叫forEach风格.之后,所有后续数据立即传递给回调.
  • 该函数返回一个Promise,它collection在流结束时解析数据的完整,这意味着each如果提供的迭代方法each不令人满意,我们实际上根本不需要调用.
  • 我培养了把它称为迭代器的错误语义,因此是一个可怕的人类.请向有关当局报告.


Edu*_*rdo 5

很快您将能够使用Async Iterators 和 Generators。在节点 9.8 中,您可以通过使用--harmony命令行选项运行来使用它。

async function* streamAsyncIterator(stream) {
  // Get a lock on the stream
  const reader = stream.getReader();

  try {
    while (true) {
      // Read from the stream
      const {done, value} = await reader.read();
      // Exit if we're done
      if (done) return;
      // Else yield the chunk
      yield value;
    }
  }
  finally {
    reader.releaseLock();
  }
}

async function example() {
  const response = await fetch(url);

  for await (const chunk of streamAsyncIterator(response.body)) {
    // …
  }
}
Run Code Online (Sandbox Code Playgroud)

感谢 Jake Archibald 提供上述示例