如何交错/合并异步迭代?

sdg*_*sdh 13 javascript iterator promise async-await

假设我有一些像这样的asnyc可迭代对象:

// Promisified sleep function
const sleep = ms => new Promise((resolve, reject) => {
  setTimeout(() => resolve(ms), ms);
});

const a = {
  [Symbol.asyncIterator]: async function * () {
    yield 'a';
    await sleep(1000);
    yield 'b';
    await sleep(2000);
    yield 'c';
  }, 
};

const b = {
  [Symbol.asyncIterator]: async function * () {
    await sleep(6000);
    yield 'i';
    yield 'j';
    await sleep(2000);
    yield 'k';
  }, 
};

const c = {
  [Symbol.asyncIterator]: async function * () {
    yield 'x';
    await sleep(2000);
    yield 'y';
    await sleep(8000);
    yield 'z';
    await sleep(10000);
    throw new Error('You have gone too far! ');
  }, 
};
Run Code Online (Sandbox Code Playgroud)

现在,假设我可以像这样连接它们:

const abcs = async function * () {
  yield * a;
  yield * b;
  yield * c;
};
Run Code Online (Sandbox Code Playgroud)

产生的(前9个)项目将是:

(async () => {
  const limit = 9;
  let i = 0; 
  const xs = [];
  for await (const x of abcs()) {
    xs.push(x);
    i++;
    if (i === limit) {
      break;
    }
  }
  console.log(xs);
})().catch(error => console.error(error));

// [ 'a', 'b', 'c', 'i', 'j', 'k', 'x', 'y', 'z' ]
Run Code Online (Sandbox Code Playgroud)

但是想象一下,我不关心的顺序,即a,bc以不同的速度产生,而且我想尽可能快地产生.

我怎样才能重写这个循环,以便x尽快产生s,忽略顺序?


它也有可能是a,bc是无限的序列,因此该解决方案不能要求的所有元素被缓冲到一个数组.

Ber*_*rgi 6

没有办法用循环语句来编写它。async/ await代码始终按顺序执行,要同时执行操作,您需要直接使用promise组合器。对于简单的承诺,存在Promise.all,对于异步迭代器,还没有任何东西,因此我们需要自己编写:

async function* combine(iterable) {
    const asyncIterators = Array.from(iterable, o => o[Symbol.asyncIterator]());
    const results = [];
    let count = asyncIterators.length;
    const never = new Promise(() => {});
    function getNext(asyncIterator, index) {
        return asyncIterator.next().then(result => ({
            index,
            result,
        }));
    }
    const nextPromises = asyncIterators.map(getNext);
    try {
        while (count) {
            const {index, result} = await Promise.race(nextPromises);
            if (result.done) {
                nextPromises[index] = never;
                results[index] = result.value;
                count--;
            } else {
                nextPromises[index] = getNext(asyncIterators[index], index);
                yield result.value;
            }
        }
    } finally {
        for (const [index, iterator] of asyncIterators.entries())
            if (nextPromises[index] != never && iterator.return != null)
                iterator.return();
        // no await here - see https://github.com/tc39/proposal-async-iteration/issues/126
    }
    return results;
}
Run Code Online (Sandbox Code Playgroud)

请注意,combine它不支持将值传递到next或通过.throw或进行取消.return

你可以这样称呼它

(async () => {
  for await (const x of combine([a, b, c])) {
    console.log(x);
  }
})().catch(console.error);
Run Code Online (Sandbox Code Playgroud)


T.J*_*der 6

如果我更改abcs为接受生成器进行处理,我会提出这个,请参阅内联注释:

const abcs = async function * (...gens) {
  // Worker function to queue up the next result
  const queueNext = async (e) => {
    e.result = null; // Release previous one as soon as possible
    e.result = await e.it.next();
    return e;
  };
  // Map the generators to source objects in a map, get and start their
  // first iteration
  const sources = new Map(gens.map(gen => [
    gen,
    queueNext({
      key: gen,
      it:  gen[Symbol.asyncIterator]()
    })
  ]));
  // While we still have any sources, race the current promise of
  // the sources we have left
  while (sources.size) {
    const winner = await Promise.race(sources.values());
    // Completed the sequence?
    if (winner.result.done) {
      // Yes, drop it from sources
      sources.delete(winner.key);
    } else {
      // No, grab the value to yield and queue up the next
      // Then yield the value
      const {value} = winner.result;
      sources.set(winner.key, queueNext(winner));
      yield value;
    }
  }
};
Run Code Online (Sandbox Code Playgroud)

实例:

const abcs = async function * (...gens) {
  // Worker function to queue up the next result
  const queueNext = async (e) => {
    e.result = null; // Release previous one as soon as possible
    e.result = await e.it.next();
    return e;
  };
  // Map the generators to source objects in a map, get and start their
  // first iteration
  const sources = new Map(gens.map(gen => [
    gen,
    queueNext({
      key: gen,
      it:  gen[Symbol.asyncIterator]()
    })
  ]));
  // While we still have any sources, race the current promise of
  // the sources we have left
  while (sources.size) {
    const winner = await Promise.race(sources.values());
    // Completed the sequence?
    if (winner.result.done) {
      // Yes, drop it from sources
      sources.delete(winner.key);
    } else {
      // No, grab the value to yield and queue up the next
      // Then yield the value
      const {value} = winner.result;
      sources.set(winner.key, queueNext(winner));
      yield value;
    }
  }
};
Run Code Online (Sandbox Code Playgroud)
// Promisified sleep function
const sleep = ms => new Promise((resolve, reject) => {
  setTimeout(() => resolve(ms), ms);
});

const a = {
  [Symbol.asyncIterator]: async function * () {
    yield 'a';
    await sleep(1000);
    yield 'b';
    await sleep(2000);
    yield 'c';
  }, 
};

const b = {
  [Symbol.asyncIterator]: async function * () {
    await sleep(6000);
    yield 'i';
    yield 'j';
    await sleep(2000);
    yield 'k';
  }, 
};

const c = {
  [Symbol.asyncIterator]: async function * () {
    yield 'x';
    await sleep(2000);
    yield 'y';
    await sleep(8000);
    yield 'z';
  }, 
};

const abcs = async function * (...gens) {
  // Worker function to queue up the next result
  const queueNext = async (e) => {
    e.result = null; // Release previous one as soon as possible
    e.result = await e.it.next();
    return e;
  };
  // Map the generators to source objects in a map, get and start their
  // first iteration
  const sources = new Map(gens.map(gen => [
    gen,
    queueNext({
      key: gen,
      it:  gen[Symbol.asyncIterator]()
    })
  ]));
  // While we still have any sources, race the current promise of
  // the sources we have left
  while (sources.size) {
    const winner = await Promise.race(sources.values());
    // Completed the sequence?
    if (winner.result.done) {
      // Yes, drop it from sources
      sources.delete(winner.key);
    } else {
      // No, grab the value to yield and queue up the next
      // Then yield the value
      const {value} = winner.result;
      sources.set(winner.key, queueNext(winner));
      yield value;
    }
  }
};

(async () => {
  console.log("start");
  for await (const x of abcs(a, b, c)) {
    console.log(x);
  }
  console.log("done");
})().catch(error => console.error(error));
Run Code Online (Sandbox Code Playgroud)