如何将事件发射器用作异步生成器

Ove*_*9ck 9 events asynchronous generator node.js babeljs

我正在尝试使用带有 babel 的异步生成器的简洁语法(我被节点 8 卡住了),我想知道如何将事件发射器干净地转换为异步生成器

到目前为止我得到的看起来像这样

    const { EventEmitter } = require('events')

    // defer fonction for resolving promises out of scope
    const Defer = () => {
      let resolve
      let reject
      let promise = new Promise((a, b) => {
        resolve = a
        reject = b
      })
      return {
        promise,
        reject,
        resolve
      }
    }


    // my iterator function
    function readEvents(emitter, channel) {
      const buffer = [Defer()]
      let subId = 0

      emitter.on(channel, x => {
        const promise = buffer[subId]
        subId++
        buffer.push(Defer())
        promise.resolve(x)
      })

      const gen = async function*() {
        while (true) {
          const val = await buffer[0].promise
          buffer.shift()
          subId--
          yield val
        }
      }

      return gen()
    }

    async function main () {
      const emitter = new EventEmitter()
      const iterator = readEvents(emitter, 'data')

      // this part generates events
      let i = 0
      setInterval(() => {
        emitter.emit('data', i++)
      }, 1000)

      // this part reads events
      for await (let val of iterator) {
        console.log(val)
      }
    }

    main()
Run Code Online (Sandbox Code Playgroud)

这很笨拙 - 可以简化吗?

mpe*_*pen 6

我想出了这个:

async *stream<TRecord extends object=Record<string,any>>(query: SqlFrag): AsyncGenerator<TRecord> {
    const sql = query.toSqlString();

    let results: TRecord[] = [];
    let resolve: () => void;
    let promise = new Promise(r => resolve = r);
    let done = false;

    this.pool.query(sql)
        .on('error', err => {
            throw err;
        })
        .on('result', row => {
            results.push(row);
            resolve();
            promise = new Promise(r => resolve = r);
        })
        .on('end', () => {
            done = true;
        })

    while(!done) {
        await promise;
        yield* results;
        results = [];
    }
}
Run Code Online (Sandbox Code Playgroud)

到目前为止似乎工作。

即您像在 Khanh 的解决方案中创建了一个虚拟承诺,以便您可以等待第一个结果,但是由于许多结果可能同时出现,您将它们推入一个数组并重置承诺以等待结果(或批处理结果)。如果这个承诺在等待之前被覆盖数十次并不重要。

然后我们可以一次产生所有结果,yield*并为下一批刷新数组。